Mercurial > repos > jjohnson > query_tabular
comparison query_tabular.py @ 6:03842a4f71c6 draft
Uploaded
author | jjohnson |
---|---|
date | Fri, 17 Feb 2017 15:20:24 -0500 |
parents | 19ae309ec53c |
children | 72c32037fa1e |
comparison
equal
deleted
inserted
replaced
5:19ae309ec53c | 6:03842a4f71c6 |
---|---|
11 | 11 |
12 """ | 12 """ |
13 TODO: | 13 TODO: |
14 - could read column names from comment lines, but issues with legal names | 14 - could read column names from comment lines, but issues with legal names |
15 - could add some transformations on tabular columns, | 15 - could add some transformations on tabular columns, |
16 filter - skip_regex | |
16 e.g. a regex to format date/time strings | 17 e.g. a regex to format date/time strings |
17 format: { | 18 format: { |
18 c2 : re.sub('pat', 'sub', c2) | 19 c2 : re.sub('pat', 'sub', c2) |
19 c3 : len(c3) | 20 c3 : len(c3) |
20 } | 21 } |
21 def format(colname,val, expr): | 22 def format(colname,val, expr): |
23 normalize input list columns | |
24 iterate over list values creating one row per iteration | |
25 option for input line_num column | |
26 create associated table | |
27 fk, name, value # e.g. PSM table with list of proteins containing peptide | |
28 fk, name, value[, value] # if multiple columns similarly indexed, e.g. vcf | |
22 - column_defs dict of columns to create from tabular input | 29 - column_defs dict of columns to create from tabular input |
23 column_defs : { 'name1' : 'expr', 'name2' : 'expr'} | 30 column_defs : { 'name1' : 'expr', 'name2' : 'expr'} |
24 - allow multiple queries and outputs | 31 - allow multiple queries and outputs |
32 repeat min - max with up to max conditional outputs | |
33 | |
25 - add a --json input for table definitions (or yaml) | 34 - add a --json input for table definitions (or yaml) |
26 JSON config: | 35 JSON config: |
27 { tables : [ | 36 { tables : [ |
28 { file_path : '/home/galaxy/dataset_101.dat', | 37 { file_path : '/home/galaxy/dataset_101.dat', |
29 table_name : 't1', | 38 table_name : 't1', |
33 unique: ['c1'], | 42 unique: ['c1'], |
34 index: ['c2','c3'] | 43 index: ['c2','c3'] |
35 }, | 44 }, |
36 { file_path : '/home/galaxy/dataset_102.dat', | 45 { file_path : '/home/galaxy/dataset_102.dat', |
37 table_name : 'gff', | 46 table_name : 'gff', |
38 column_names : ['seqname',,,'start','end'] | 47 column_names : ['seqname',,'date','start','end'] |
39 comment_lines : 1 | 48 comment_lines : 1 |
40 load_named_columns : True | 49 load_named_columns : True |
50 filters : [{'filter': 'regex', 'pattern': '#peptide', 'action': 'exclude_match'}, | |
51 {'filter': 'replace', 'column': 3, 'replace': 'gi[|]', 'pattern': ''}] | |
41 }, | 52 }, |
42 { file_path : '/home/galaxy/dataset_103.dat', | 53 { file_path : '/home/galaxy/dataset_103.dat', |
43 table_name : 'test', | 54 table_name : 'test', |
44 column_names : ['c1', 'c2', 'c3'] | 55 column_names : ['c1', 'c2', 'c3'] |
45 } | 56 } |
46 ] | 57 ] |
47 } | 58 } |
48 """ | 59 """ |
49 | 60 |
50 tables_query = \ | 61 |
51 "SELECT name, sql FROM sqlite_master WHERE type='table' ORDER BY name" | 62 class LineFilter( object ): |
63 def __init__(self,source,filter_dict): | |
64 self.source = source | |
65 self.filter_dict = filter_dict | |
66 print >> sys.stderr, 'LineFilter %s' % filter_dict if filter_dict else 'NONE' | |
67 self.func = lambda l: l.rstrip('\r\n') if l else None | |
68 if not filter_dict: | |
69 return | |
70 if filter_dict['filter'] == 'regex': | |
71 rgx = re.compile(filter_dict['pattern']) | |
72 if filter_dict['action'] == 'exclude_match': | |
73 self.func = lambda l: l if not rgx.match(l) else None | |
74 elif filter_dict['action'] == 'include_match': | |
75 self.func = lambda l: l if rgx.match(l) else None | |
76 elif filter_dict['action'] == 'exclude_find': | |
77 self.func = lambda l: l if not rgx.search(l) else None | |
78 elif filter_dict['action'] == 'include_find': | |
79 self.func = lambda l: l if rgx.search(l) else None | |
80 elif filter_dict['filter'] == 'replace': | |
81 p = filter_dict['pattern'] | |
82 r = filter_dict['replace'] | |
83 c = int(filter_dict['column']) - 1 | |
84 self.func = lambda l: '\t'.join([x if i != c else re.sub(p,r,x) for i,x in enumerate(l.split('\t'))]) | |
85 def __iter__(self): | |
86 return self | |
87 def next(self): | |
88 for i,next_line in enumerate(self.source): | |
89 line = self.func(next_line) | |
90 if line: | |
91 return line | |
92 raise StopIteration | |
93 | |
94 | |
95 class TabularReader: | |
96 """ | |
97 Tabular file iterator. Returns a list | |
98 """ | |
99 def __init__(self, file_path, skip=0, comment_char=None, col_idx=None, filters=None): | |
100 self.skip = skip | |
101 self.comment_char = comment_char | |
102 self.col_idx = col_idx | |
103 self.filters = filters | |
104 self.tsv_file = open(file_path) | |
105 if skip and skip > 0: | |
106 for i in range(5): | |
107 if not self.tsv_file.readline(): | |
108 break | |
109 source = LineFilter(self.tsv_file,None) | |
110 if comment_char: | |
111 source = LineFilter(source,{"filter": "regex", "pattern": comment_char, "action": "exclude_match"}) | |
112 if filters: | |
113 for f in filters: | |
114 source = LineFilter(source,f) | |
115 self.source = source | |
116 def __iter__(self): | |
117 return self | |
118 def next(self): | |
119 ''' Iteration ''' | |
120 for i,line in enumerate(self.source): | |
121 fields = line.rstrip('\r\n').split('\t') | |
122 if self.col_idx: | |
123 fields = [fields[i] for i in self.col_idx] | |
124 return fields | |
125 raise StopIteration | |
52 | 126 |
53 | 127 |
54 def getValueType(val): | 128 def getValueType(val): |
55 if val or 0. == val: | 129 if val or 0. == val: |
56 try: | 130 try: |
64 return 'TEXT' | 138 return 'TEXT' |
65 return None | 139 return None |
66 | 140 |
67 | 141 |
68 def get_column_def(file_path, table_name, skip=0, comment_char='#', | 142 def get_column_def(file_path, table_name, skip=0, comment_char='#', |
69 column_names=None, max_lines=100,load_named_columns=False): | 143 column_names=None, max_lines=100,load_named_columns=False,filters=None): |
70 col_pref = ['TEXT', 'REAL', 'INTEGER', None] | 144 col_pref = ['TEXT', 'REAL', 'INTEGER', None] |
71 col_types = [] | 145 col_types = [] |
72 col_idx = None | 146 col_idx = None |
73 data_lines = 0 | 147 data_lines = 0 |
74 | |
75 try: | 148 try: |
76 with open(file_path, "r") as fh: | 149 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=None, filters=filters) |
77 for linenum, line in enumerate(fh): | 150 for linenum, fields in enumerate(tr): |
78 if linenum < skip: | 151 if linenum > max_lines: |
79 continue | 152 break |
80 if line.startswith(comment_char): | 153 try: |
81 continue | 154 while len(col_types) < len(fields): |
82 data_lines += 1 | 155 col_types.append(None) |
83 try: | 156 for i, val in enumerate(fields): |
84 fields = line.split('\t') | 157 colType = getValueType(val) |
85 while len(col_types) < len(fields): | 158 if col_pref.index(colType) < col_pref.index(col_types[i]): |
86 col_types.append(None) | 159 col_types[i] = colType |
87 for i, val in enumerate(fields): | 160 except Exception, e: |
88 colType = getValueType(val) | 161 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) |
89 if col_pref.index(colType) < col_pref.index(col_types[i]): | |
90 col_types[i] = colType | |
91 except Exception, e: | |
92 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) | |
93 except Exception, e: | 162 except Exception, e: |
94 print >> sys.stderr, 'Failed: %s' % (e) | 163 print >> sys.stderr, 'Failed: %s' % (e) |
95 for i,col_type in enumerate(col_types): | 164 for i,col_type in enumerate(col_types): |
96 if not col_type: | 165 if not col_type: |
97 col_types[i] = 'TEXT' | 166 col_types[i] = 'TEXT' |
115 for i, col_name in enumerate(col_names): | 184 for i, col_name in enumerate(col_names): |
116 col_def.append('%s %s' % (col_names[i], col_types[i])) | 185 col_def.append('%s %s' % (col_names[i], col_types[i])) |
117 return col_names, col_types, col_def, col_idx | 186 return col_names, col_types, col_def, col_idx |
118 | 187 |
119 | 188 |
120 def create_table(conn, file_path, table_name, skip=0, comment_char='#', pkey_autoincr=None, column_names=None,load_named_columns=False,unique_indexes=[],indexes=[]): | 189 def create_table(conn, file_path, table_name, skip=0, comment_char='#', pkey_autoincr=None, column_names=None,load_named_columns=False,filters=None,unique_indexes=[],indexes=[]): |
121 col_names, col_types, col_def, col_idx = get_column_def(file_path, table_name, skip=skip, comment_char=comment_char, column_names=column_names,load_named_columns=load_named_columns) | 190 |
191 col_names, col_types, col_def, col_idx = get_column_def(file_path, table_name, skip=skip, comment_char=comment_char, | |
192 column_names=column_names,load_named_columns=load_named_columns,filters=filters) | |
122 col_func = [float if t == 'REAL' else int if t == 'INTEGER' else str for t in col_types] | 193 col_func = [float if t == 'REAL' else int if t == 'INTEGER' else str for t in col_types] |
123 table_def = 'CREATE TABLE %s (\n %s%s\n);' % ( | 194 table_def = 'CREATE TABLE %s (\n %s%s\n);' % ( |
124 table_name, | 195 table_name, |
125 '%s INTEGER PRIMARY KEY AUTOINCREMENT,' % pkey_autoincr if pkey_autoincr else '', | 196 '%s INTEGER PRIMARY KEY AUTOINCREMENT,' % pkey_autoincr if pkey_autoincr else '', |
126 ', \n '.join(col_def)) | 197 ', \n '.join(col_def)) |
127 # print >> sys.stdout, table_def | 198 # print >> sys.stdout, table_def |
128 insert_stmt = 'INSERT INTO %s(%s) VALUES(%s)' % (table_name, ','.join(col_names), ','.join(["?" for x in col_names])) | 199 insert_stmt = 'INSERT INTO %s(%s) VALUES(%s)' % (table_name, ','.join(col_names), ','.join(["?" for x in col_names])) |
129 # print >> sys.stdout, insert_stmt | 200 # print >> sys.stdout, insert_stmt |
140 for i,index in enumerate(indexes): | 211 for i,index in enumerate(indexes): |
141 index_name='idx_%s_%d' % (table_name,i) | 212 index_name='idx_%s_%d' % (table_name,i) |
142 index_columns = index.split(',') | 213 index_columns = index.split(',') |
143 create_index(conn, table_name, index_name, index_columns) | 214 create_index(conn, table_name, index_name, index_columns) |
144 c = conn.cursor() | 215 c = conn.cursor() |
145 with open(file_path, "r") as fh: | 216 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=col_idx, filters=filters) |
146 for linenum, line in enumerate(fh): | 217 for linenum, fields in enumerate(tr): |
147 if linenum < skip or line.startswith(comment_char): | 218 data_lines += 1 |
148 continue | 219 try: |
149 data_lines += 1 | 220 if col_idx: |
150 try: | 221 fields = [fields[i] for i in col_idx] |
151 fields = line.rstrip('\r\n').split('\t') | 222 vals = [col_func[i](x) if x else None for i, x in enumerate(fields)] |
152 if col_idx: | 223 c.execute(insert_stmt, vals) |
153 fields = [fields[i] for i in col_idx] | 224 except Exception, e: |
154 vals = [col_func[i](x) if x else None for i, x in enumerate(fields)] | 225 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) |
155 c.execute(insert_stmt, vals) | |
156 except Exception, e: | |
157 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) | |
158 conn.commit() | 226 conn.commit() |
159 c.close() | 227 c.close() |
160 except Exception, e: | 228 except Exception, e: |
161 print >> sys.stderr, 'Failed: %s' % (e) | 229 print >> sys.stderr, 'Failed: %s' % (e) |
162 exit(1) | 230 exit(1) |
231 | |
163 | 232 |
164 def create_index(conn, table_name, index_name, index_columns, unique=False): | 233 def create_index(conn, table_name, index_name, index_columns, unique=False): |
165 index_def = "CREATE %s INDEX %s on %s(%s)" % ('UNIQUE' if unique else '', index_name, table_name, ','.join(index_columns)) | 234 index_def = "CREATE %s INDEX %s on %s(%s)" % ('UNIQUE' if unique else '', index_name, table_name, ','.join(index_columns)) |
166 c = conn.cursor() | 235 c = conn.cursor() |
167 c.execute(index_def) | 236 c.execute(index_def) |
168 conn.commit() | 237 conn.commit() |
169 c.close() | 238 c.close() |
239 | |
170 | 240 |
171 def regex_match(expr, item): | 241 def regex_match(expr, item): |
172 return re.match(expr, item) is not None | 242 return re.match(expr, item) is not None |
173 | 243 |
174 | 244 |
235 if 'tables' in tdef: | 305 if 'tables' in tdef: |
236 for ti, table in enumerate(tdef['tables']): | 306 for ti, table in enumerate(tdef['tables']): |
237 path = table['file_path'] | 307 path = table['file_path'] |
238 table_name = table['table_name'] if 'table_name' in table else 't%d' % (ti + 1) | 308 table_name = table['table_name'] if 'table_name' in table else 't%d' % (ti + 1) |
239 comment_lines = table['comment_lines'] if 'comment_lines' in table else 0 | 309 comment_lines = table['comment_lines'] if 'comment_lines' in table else 0 |
310 comment_char = table['comment_char'] if 'comment_char' in table else None | |
240 column_names = table['column_names'] if 'column_names' in table else None | 311 column_names = table['column_names'] if 'column_names' in table else None |
241 if column_names: | 312 if column_names: |
242 load_named_columns = table['load_named_columns'] if 'load_named_columns' in table else False | 313 load_named_columns = table['load_named_columns'] if 'load_named_columns' in table else False |
243 else: | 314 else: |
244 load_named_columns = False | 315 load_named_columns = False |
245 unique_indexes = table['unique'] if 'unique' in table else [] | 316 unique_indexes = table['unique'] if 'unique' in table else [] |
246 indexes = table['index'] if 'index' in table else [] | 317 indexes = table['index'] if 'index' in table else [] |
318 filters = table['filters'] if 'filters' in table else None | |
247 pkey_autoincr = table['pkey_autoincr'] if 'pkey_autoincr' in table else None | 319 pkey_autoincr = table['pkey_autoincr'] if 'pkey_autoincr' in table else None |
248 create_table(conn, path, table_name, pkey_autoincr=pkey_autoincr, column_names=column_names, | 320 create_table(conn, path, table_name, pkey_autoincr=pkey_autoincr, column_names=column_names, |
249 skip=comment_lines, load_named_columns=load_named_columns, | 321 skip=comment_lines, comment_char=comment_char, load_named_columns=load_named_columns, |
250 unique_indexes=unique_indexes, indexes=indexes) | 322 filters=filters,unique_indexes=unique_indexes, indexes=indexes) |
251 except Exception, exc: | 323 except Exception, exc: |
252 print >> sys.stderr, "Error: %s" % exc | 324 print >> sys.stderr, "Error: %s" % exc |
253 conn.close() | 325 conn.close() |
254 | 326 |
255 query = None | 327 query = None |
260 query += line | 332 query += line |
261 elif (options.query is not None): | 333 elif (options.query is not None): |
262 query = options.query | 334 query = options.query |
263 | 335 |
264 if (query is None): | 336 if (query is None): |
337 tables_query = \ | |
338 "SELECT name, sql FROM sqlite_master WHERE type='table' ORDER BY name" | |
265 try: | 339 try: |
266 conn = get_connection(options.sqlitedb) | 340 conn = get_connection(options.sqlitedb) |
267 c = conn.cursor() | 341 c = conn.cursor() |
268 rslt = c.execute(tables_query).fetchall() | 342 rslt = c.execute(tables_query).fetchall() |
269 for table, sql in rslt: | 343 for table, sql in rslt: |