Mercurial > repos > jjohnson > query_tabular
comparison query_tabular.py @ 6:03842a4f71c6 draft
Uploaded
| author | jjohnson |
|---|---|
| date | Fri, 17 Feb 2017 15:20:24 -0500 |
| parents | 19ae309ec53c |
| children | 72c32037fa1e |
comparison
equal
deleted
inserted
replaced
| 5:19ae309ec53c | 6:03842a4f71c6 |
|---|---|
| 11 | 11 |
| 12 """ | 12 """ |
| 13 TODO: | 13 TODO: |
| 14 - could read column names from comment lines, but issues with legal names | 14 - could read column names from comment lines, but issues with legal names |
| 15 - could add some transformations on tabular columns, | 15 - could add some transformations on tabular columns, |
| 16 filter - skip_regex | |
| 16 e.g. a regex to format date/time strings | 17 e.g. a regex to format date/time strings |
| 17 format: { | 18 format: { |
| 18 c2 : re.sub('pat', 'sub', c2) | 19 c2 : re.sub('pat', 'sub', c2) |
| 19 c3 : len(c3) | 20 c3 : len(c3) |
| 20 } | 21 } |
| 21 def format(colname,val, expr): | 22 def format(colname,val, expr): |
| 23 normalize input list columns | |
| 24 iterate over list values creating one row per iteration | |
| 25 option for input line_num column | |
| 26 create associated table | |
| 27 fk, name, value # e.g. PSM table with list of proteins containing peptide | |
| 28 fk, name, value[, value] # if multiple columns similarly indexed, e.g. vcf | |
| 22 - column_defs dict of columns to create from tabular input | 29 - column_defs dict of columns to create from tabular input |
| 23 column_defs : { 'name1' : 'expr', 'name2' : 'expr'} | 30 column_defs : { 'name1' : 'expr', 'name2' : 'expr'} |
| 24 - allow multiple queries and outputs | 31 - allow multiple queries and outputs |
| 32 repeat min - max with up to max conditional outputs | |
| 33 | |
| 25 - add a --json input for table definitions (or yaml) | 34 - add a --json input for table definitions (or yaml) |
| 26 JSON config: | 35 JSON config: |
| 27 { tables : [ | 36 { tables : [ |
| 28 { file_path : '/home/galaxy/dataset_101.dat', | 37 { file_path : '/home/galaxy/dataset_101.dat', |
| 29 table_name : 't1', | 38 table_name : 't1', |
| 33 unique: ['c1'], | 42 unique: ['c1'], |
| 34 index: ['c2','c3'] | 43 index: ['c2','c3'] |
| 35 }, | 44 }, |
| 36 { file_path : '/home/galaxy/dataset_102.dat', | 45 { file_path : '/home/galaxy/dataset_102.dat', |
| 37 table_name : 'gff', | 46 table_name : 'gff', |
| 38 column_names : ['seqname',,,'start','end'] | 47 column_names : ['seqname',,'date','start','end'] |
| 39 comment_lines : 1 | 48 comment_lines : 1 |
| 40 load_named_columns : True | 49 load_named_columns : True |
| 50 filters : [{'filter': 'regex', 'pattern': '#peptide', 'action': 'exclude_match'}, | |
| 51 {'filter': 'replace', 'column': 3, 'replace': 'gi[|]', 'pattern': ''}] | |
| 41 }, | 52 }, |
| 42 { file_path : '/home/galaxy/dataset_103.dat', | 53 { file_path : '/home/galaxy/dataset_103.dat', |
| 43 table_name : 'test', | 54 table_name : 'test', |
| 44 column_names : ['c1', 'c2', 'c3'] | 55 column_names : ['c1', 'c2', 'c3'] |
| 45 } | 56 } |
| 46 ] | 57 ] |
| 47 } | 58 } |
| 48 """ | 59 """ |
| 49 | 60 |
| 50 tables_query = \ | 61 |
| 51 "SELECT name, sql FROM sqlite_master WHERE type='table' ORDER BY name" | 62 class LineFilter( object ): |
| 63 def __init__(self,source,filter_dict): | |
| 64 self.source = source | |
| 65 self.filter_dict = filter_dict | |
| 66 print >> sys.stderr, 'LineFilter %s' % filter_dict if filter_dict else 'NONE' | |
| 67 self.func = lambda l: l.rstrip('\r\n') if l else None | |
| 68 if not filter_dict: | |
| 69 return | |
| 70 if filter_dict['filter'] == 'regex': | |
| 71 rgx = re.compile(filter_dict['pattern']) | |
| 72 if filter_dict['action'] == 'exclude_match': | |
| 73 self.func = lambda l: l if not rgx.match(l) else None | |
| 74 elif filter_dict['action'] == 'include_match': | |
| 75 self.func = lambda l: l if rgx.match(l) else None | |
| 76 elif filter_dict['action'] == 'exclude_find': | |
| 77 self.func = lambda l: l if not rgx.search(l) else None | |
| 78 elif filter_dict['action'] == 'include_find': | |
| 79 self.func = lambda l: l if rgx.search(l) else None | |
| 80 elif filter_dict['filter'] == 'replace': | |
| 81 p = filter_dict['pattern'] | |
| 82 r = filter_dict['replace'] | |
| 83 c = int(filter_dict['column']) - 1 | |
| 84 self.func = lambda l: '\t'.join([x if i != c else re.sub(p,r,x) for i,x in enumerate(l.split('\t'))]) | |
| 85 def __iter__(self): | |
| 86 return self | |
| 87 def next(self): | |
| 88 for i,next_line in enumerate(self.source): | |
| 89 line = self.func(next_line) | |
| 90 if line: | |
| 91 return line | |
| 92 raise StopIteration | |
| 93 | |
| 94 | |
| 95 class TabularReader: | |
| 96 """ | |
| 97 Tabular file iterator. Returns a list | |
| 98 """ | |
| 99 def __init__(self, file_path, skip=0, comment_char=None, col_idx=None, filters=None): | |
| 100 self.skip = skip | |
| 101 self.comment_char = comment_char | |
| 102 self.col_idx = col_idx | |
| 103 self.filters = filters | |
| 104 self.tsv_file = open(file_path) | |
| 105 if skip and skip > 0: | |
| 106 for i in range(5): | |
| 107 if not self.tsv_file.readline(): | |
| 108 break | |
| 109 source = LineFilter(self.tsv_file,None) | |
| 110 if comment_char: | |
| 111 source = LineFilter(source,{"filter": "regex", "pattern": comment_char, "action": "exclude_match"}) | |
| 112 if filters: | |
| 113 for f in filters: | |
| 114 source = LineFilter(source,f) | |
| 115 self.source = source | |
| 116 def __iter__(self): | |
| 117 return self | |
| 118 def next(self): | |
| 119 ''' Iteration ''' | |
| 120 for i,line in enumerate(self.source): | |
| 121 fields = line.rstrip('\r\n').split('\t') | |
| 122 if self.col_idx: | |
| 123 fields = [fields[i] for i in self.col_idx] | |
| 124 return fields | |
| 125 raise StopIteration | |
| 52 | 126 |
| 53 | 127 |
| 54 def getValueType(val): | 128 def getValueType(val): |
| 55 if val or 0. == val: | 129 if val or 0. == val: |
| 56 try: | 130 try: |
| 64 return 'TEXT' | 138 return 'TEXT' |
| 65 return None | 139 return None |
| 66 | 140 |
| 67 | 141 |
| 68 def get_column_def(file_path, table_name, skip=0, comment_char='#', | 142 def get_column_def(file_path, table_name, skip=0, comment_char='#', |
| 69 column_names=None, max_lines=100,load_named_columns=False): | 143 column_names=None, max_lines=100,load_named_columns=False,filters=None): |
| 70 col_pref = ['TEXT', 'REAL', 'INTEGER', None] | 144 col_pref = ['TEXT', 'REAL', 'INTEGER', None] |
| 71 col_types = [] | 145 col_types = [] |
| 72 col_idx = None | 146 col_idx = None |
| 73 data_lines = 0 | 147 data_lines = 0 |
| 74 | |
| 75 try: | 148 try: |
| 76 with open(file_path, "r") as fh: | 149 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=None, filters=filters) |
| 77 for linenum, line in enumerate(fh): | 150 for linenum, fields in enumerate(tr): |
| 78 if linenum < skip: | 151 if linenum > max_lines: |
| 79 continue | 152 break |
| 80 if line.startswith(comment_char): | 153 try: |
| 81 continue | 154 while len(col_types) < len(fields): |
| 82 data_lines += 1 | 155 col_types.append(None) |
| 83 try: | 156 for i, val in enumerate(fields): |
| 84 fields = line.split('\t') | 157 colType = getValueType(val) |
| 85 while len(col_types) < len(fields): | 158 if col_pref.index(colType) < col_pref.index(col_types[i]): |
| 86 col_types.append(None) | 159 col_types[i] = colType |
| 87 for i, val in enumerate(fields): | 160 except Exception, e: |
| 88 colType = getValueType(val) | 161 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) |
| 89 if col_pref.index(colType) < col_pref.index(col_types[i]): | |
| 90 col_types[i] = colType | |
| 91 except Exception, e: | |
| 92 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) | |
| 93 except Exception, e: | 162 except Exception, e: |
| 94 print >> sys.stderr, 'Failed: %s' % (e) | 163 print >> sys.stderr, 'Failed: %s' % (e) |
| 95 for i,col_type in enumerate(col_types): | 164 for i,col_type in enumerate(col_types): |
| 96 if not col_type: | 165 if not col_type: |
| 97 col_types[i] = 'TEXT' | 166 col_types[i] = 'TEXT' |
| 115 for i, col_name in enumerate(col_names): | 184 for i, col_name in enumerate(col_names): |
| 116 col_def.append('%s %s' % (col_names[i], col_types[i])) | 185 col_def.append('%s %s' % (col_names[i], col_types[i])) |
| 117 return col_names, col_types, col_def, col_idx | 186 return col_names, col_types, col_def, col_idx |
| 118 | 187 |
| 119 | 188 |
| 120 def create_table(conn, file_path, table_name, skip=0, comment_char='#', pkey_autoincr=None, column_names=None,load_named_columns=False,unique_indexes=[],indexes=[]): | 189 def create_table(conn, file_path, table_name, skip=0, comment_char='#', pkey_autoincr=None, column_names=None,load_named_columns=False,filters=None,unique_indexes=[],indexes=[]): |
| 121 col_names, col_types, col_def, col_idx = get_column_def(file_path, table_name, skip=skip, comment_char=comment_char, column_names=column_names,load_named_columns=load_named_columns) | 190 |
| 191 col_names, col_types, col_def, col_idx = get_column_def(file_path, table_name, skip=skip, comment_char=comment_char, | |
| 192 column_names=column_names,load_named_columns=load_named_columns,filters=filters) | |
| 122 col_func = [float if t == 'REAL' else int if t == 'INTEGER' else str for t in col_types] | 193 col_func = [float if t == 'REAL' else int if t == 'INTEGER' else str for t in col_types] |
| 123 table_def = 'CREATE TABLE %s (\n %s%s\n);' % ( | 194 table_def = 'CREATE TABLE %s (\n %s%s\n);' % ( |
| 124 table_name, | 195 table_name, |
| 125 '%s INTEGER PRIMARY KEY AUTOINCREMENT,' % pkey_autoincr if pkey_autoincr else '', | 196 '%s INTEGER PRIMARY KEY AUTOINCREMENT,' % pkey_autoincr if pkey_autoincr else '', |
| 126 ', \n '.join(col_def)) | 197 ', \n '.join(col_def)) |
| 127 # print >> sys.stdout, table_def | 198 # print >> sys.stdout, table_def |
| 128 insert_stmt = 'INSERT INTO %s(%s) VALUES(%s)' % (table_name, ','.join(col_names), ','.join(["?" for x in col_names])) | 199 insert_stmt = 'INSERT INTO %s(%s) VALUES(%s)' % (table_name, ','.join(col_names), ','.join(["?" for x in col_names])) |
| 129 # print >> sys.stdout, insert_stmt | 200 # print >> sys.stdout, insert_stmt |
| 140 for i,index in enumerate(indexes): | 211 for i,index in enumerate(indexes): |
| 141 index_name='idx_%s_%d' % (table_name,i) | 212 index_name='idx_%s_%d' % (table_name,i) |
| 142 index_columns = index.split(',') | 213 index_columns = index.split(',') |
| 143 create_index(conn, table_name, index_name, index_columns) | 214 create_index(conn, table_name, index_name, index_columns) |
| 144 c = conn.cursor() | 215 c = conn.cursor() |
| 145 with open(file_path, "r") as fh: | 216 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=col_idx, filters=filters) |
| 146 for linenum, line in enumerate(fh): | 217 for linenum, fields in enumerate(tr): |
| 147 if linenum < skip or line.startswith(comment_char): | 218 data_lines += 1 |
| 148 continue | 219 try: |
| 149 data_lines += 1 | 220 if col_idx: |
| 150 try: | 221 fields = [fields[i] for i in col_idx] |
| 151 fields = line.rstrip('\r\n').split('\t') | 222 vals = [col_func[i](x) if x else None for i, x in enumerate(fields)] |
| 152 if col_idx: | 223 c.execute(insert_stmt, vals) |
| 153 fields = [fields[i] for i in col_idx] | 224 except Exception, e: |
| 154 vals = [col_func[i](x) if x else None for i, x in enumerate(fields)] | 225 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) |
| 155 c.execute(insert_stmt, vals) | |
| 156 except Exception, e: | |
| 157 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) | |
| 158 conn.commit() | 226 conn.commit() |
| 159 c.close() | 227 c.close() |
| 160 except Exception, e: | 228 except Exception, e: |
| 161 print >> sys.stderr, 'Failed: %s' % (e) | 229 print >> sys.stderr, 'Failed: %s' % (e) |
| 162 exit(1) | 230 exit(1) |
| 231 | |
| 163 | 232 |
| 164 def create_index(conn, table_name, index_name, index_columns, unique=False): | 233 def create_index(conn, table_name, index_name, index_columns, unique=False): |
| 165 index_def = "CREATE %s INDEX %s on %s(%s)" % ('UNIQUE' if unique else '', index_name, table_name, ','.join(index_columns)) | 234 index_def = "CREATE %s INDEX %s on %s(%s)" % ('UNIQUE' if unique else '', index_name, table_name, ','.join(index_columns)) |
| 166 c = conn.cursor() | 235 c = conn.cursor() |
| 167 c.execute(index_def) | 236 c.execute(index_def) |
| 168 conn.commit() | 237 conn.commit() |
| 169 c.close() | 238 c.close() |
| 239 | |
| 170 | 240 |
| 171 def regex_match(expr, item): | 241 def regex_match(expr, item): |
| 172 return re.match(expr, item) is not None | 242 return re.match(expr, item) is not None |
| 173 | 243 |
| 174 | 244 |
| 235 if 'tables' in tdef: | 305 if 'tables' in tdef: |
| 236 for ti, table in enumerate(tdef['tables']): | 306 for ti, table in enumerate(tdef['tables']): |
| 237 path = table['file_path'] | 307 path = table['file_path'] |
| 238 table_name = table['table_name'] if 'table_name' in table else 't%d' % (ti + 1) | 308 table_name = table['table_name'] if 'table_name' in table else 't%d' % (ti + 1) |
| 239 comment_lines = table['comment_lines'] if 'comment_lines' in table else 0 | 309 comment_lines = table['comment_lines'] if 'comment_lines' in table else 0 |
| 310 comment_char = table['comment_char'] if 'comment_char' in table else None | |
| 240 column_names = table['column_names'] if 'column_names' in table else None | 311 column_names = table['column_names'] if 'column_names' in table else None |
| 241 if column_names: | 312 if column_names: |
| 242 load_named_columns = table['load_named_columns'] if 'load_named_columns' in table else False | 313 load_named_columns = table['load_named_columns'] if 'load_named_columns' in table else False |
| 243 else: | 314 else: |
| 244 load_named_columns = False | 315 load_named_columns = False |
| 245 unique_indexes = table['unique'] if 'unique' in table else [] | 316 unique_indexes = table['unique'] if 'unique' in table else [] |
| 246 indexes = table['index'] if 'index' in table else [] | 317 indexes = table['index'] if 'index' in table else [] |
| 318 filters = table['filters'] if 'filters' in table else None | |
| 247 pkey_autoincr = table['pkey_autoincr'] if 'pkey_autoincr' in table else None | 319 pkey_autoincr = table['pkey_autoincr'] if 'pkey_autoincr' in table else None |
| 248 create_table(conn, path, table_name, pkey_autoincr=pkey_autoincr, column_names=column_names, | 320 create_table(conn, path, table_name, pkey_autoincr=pkey_autoincr, column_names=column_names, |
| 249 skip=comment_lines, load_named_columns=load_named_columns, | 321 skip=comment_lines, comment_char=comment_char, load_named_columns=load_named_columns, |
| 250 unique_indexes=unique_indexes, indexes=indexes) | 322 filters=filters,unique_indexes=unique_indexes, indexes=indexes) |
| 251 except Exception, exc: | 323 except Exception, exc: |
| 252 print >> sys.stderr, "Error: %s" % exc | 324 print >> sys.stderr, "Error: %s" % exc |
| 253 conn.close() | 325 conn.close() |
| 254 | 326 |
| 255 query = None | 327 query = None |
| 260 query += line | 332 query += line |
| 261 elif (options.query is not None): | 333 elif (options.query is not None): |
| 262 query = options.query | 334 query = options.query |
| 263 | 335 |
| 264 if (query is None): | 336 if (query is None): |
| 337 tables_query = \ | |
| 338 "SELECT name, sql FROM sqlite_master WHERE type='table' ORDER BY name" | |
| 265 try: | 339 try: |
| 266 conn = get_connection(options.sqlitedb) | 340 conn = get_connection(options.sqlitedb) |
| 267 c = conn.cursor() | 341 c = conn.cursor() |
| 268 rslt = c.execute(tables_query).fetchall() | 342 rslt = c.execute(tables_query).fetchall() |
| 269 for table, sql in rslt: | 343 for table, sql in rslt: |
