Mercurial > repos > jjohnson > query_tabular
comparison query_tabular.py @ 20:ab27c4bd14b9 draft
Uploaded
author | jjohnson |
---|---|
date | Fri, 14 Jul 2017 11:39:27 -0400 |
parents | b9f797bf4f38 |
children | bed5018e7ae3 |
comparison
equal
deleted
inserted
replaced
19:9d9ab2c69014 | 20:ab27c4bd14b9 |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 """ | 2 |
3 """ | 3 from __future__ import print_function |
4 | |
5 import json | |
6 import optparse | |
7 import os.path | |
4 import sys | 8 import sys |
5 import re | 9 |
6 import os.path | 10 from load_db import create_table |
7 import json | 11 |
8 import sqlite3 as sqlite | 12 from query_db import describe_tables, get_connection, run_query |
9 import optparse | 13 |
10 from optparse import OptionParser | |
11 | 14 |
12 """ | 15 """ |
13 TODO: | |
14 - could read column names from comment lines, but issues with legal names | |
15 - could add some transformations on tabular columns, | |
16 filter - skip_regex | |
17 e.g. a regex to format date/time strings | |
18 format: { | |
19 c2 : re.sub('pat', 'sub', c2) | |
20 c3 : len(c3) | |
21 } | |
22 def format(colname,val, expr): | |
23 normalize input list columns | |
24 iterate over list values creating one row per iteration | |
25 option for input line_num column | |
26 create associated table | |
27 fk, name, value # e.g. PSM table with list of proteins containing peptide | |
28 fk, name, value[, value] # if multiple columns similarly indexed, e.g. vcf | |
29 - column_defs dict of columns to create from tabular input | |
30 column_defs : { 'name1' : 'expr', 'name2' : 'expr'} | |
31 - allow multiple queries and outputs | |
32 repeat min - max with up to max conditional outputs | |
33 | |
34 - add a --json input for table definitions (or yaml) | |
35 JSON config: | 16 JSON config: |
36 { tables : [ | 17 { tables : [ |
37 { file_path : '/home/galaxy/dataset_101.dat', | 18 { file_path : '/home/galaxy/dataset_101.dat', |
38 table_name : 't1', | 19 table_name : 't1', |
39 column_names : ['c1', 'c2', 'c3'], | 20 column_names : ['c1','c2','c3'], |
40 pkey_autoincr : 'id' | 21 pkey_autoincr : 'id' |
41 comment_lines : 1 | 22 comment_lines : 1 |
42 unique: ['c1'], | 23 unique: ['c1'], |
43 index: ['c2','c3'] | 24 index: ['c2', 'c3'] |
44 }, | 25 }, |
45 { file_path : '/home/galaxy/dataset_102.dat', | 26 { file_path : '/home/galaxy/dataset_102.dat', |
46 table_name : 'gff', | 27 table_name : 'gff', |
47 column_names : ['seqname',,'date','start','end'] | 28 column_names : ['seqname',,'date','start','end'] |
48 comment_lines : 1 | 29 comment_lines : 1 |
49 load_named_columns : True | 30 load_named_columns : True |
50 filters : [{'filter': 'regex', 'pattern': '#peptide', 'action': 'exclude_match'}, | 31 filters : [{'filter': 'regex', 'pattern': '#peptide', |
51 {'filter': 'replace', 'column': 3, 'replace': 'gi[|]', 'pattern': ''}] | 32 'action': 'exclude_match'}, |
33 {'filter': 'replace', 'column': 3, | |
34 'replace': 'gi[|]', 'pattern': ''}] | |
52 }, | 35 }, |
53 { file_path : '/home/galaxy/dataset_103.dat', | 36 { file_path : '/home/galaxy/dataset_103.dat', |
54 table_name : 'test', | 37 table_name : 'test', |
55 column_names : ['c1', 'c2', 'c3'] | 38 column_names : ['c1', 'c2', 'c3'] |
56 } | 39 } |
57 ] | 40 ] |
58 } | 41 } |
59 """ | 42 """ |
60 | 43 |
61 | 44 |
62 class LineFilter( object ): | |
63 def __init__(self,source,filter_dict): | |
64 self.source = source | |
65 self.filter_dict = filter_dict | |
66 # print >> sys.stderr, 'LineFilter %s' % filter_dict if filter_dict else 'NONE' | |
67 self.func = lambda i,l: l.rstrip('\r\n') if l else None | |
68 self.src_lines = [] | |
69 self.src_line_cnt = 0 | |
70 if not filter_dict: | |
71 return | |
72 if filter_dict['filter'] == 'regex': | |
73 rgx = re.compile(filter_dict['pattern']) | |
74 if filter_dict['action'] == 'exclude_match': | |
75 self.func = lambda i,l: l if not rgx.match(l) else None | |
76 elif filter_dict['action'] == 'include_match': | |
77 self.func = lambda i,l: l if rgx.match(l) else None | |
78 elif filter_dict['action'] == 'exclude_find': | |
79 self.func = lambda i,l: l if not rgx.search(l) else None | |
80 elif filter_dict['action'] == 'include_find': | |
81 self.func = lambda i,l: l if rgx.search(l) else None | |
82 elif filter_dict['filter'] == 'select_columns': | |
83 cols = [int(c) - 1 for c in filter_dict['columns']] | |
84 self.func = lambda i,l: self.select_columns(l,cols) | |
85 elif filter_dict['filter'] == 'replace': | |
86 p = filter_dict['pattern'] | |
87 r = filter_dict['replace'] | |
88 c = int(filter_dict['column']) - 1 | |
89 self.func = lambda i,l: '\t'.join([x if i != c else re.sub(p,r,x) for i,x in enumerate(l.split('\t'))]) | |
90 elif filter_dict['filter'] == 'prepend_line_num': | |
91 self.func = lambda i,l: '%d\t%s' % (i,l) | |
92 elif filter_dict['filter'] == 'append_line_num': | |
93 self.func = lambda i,l: '%s\t%d' % (l.rstrip('\r\n'),i) | |
94 elif filter_dict['filter'] == 'prepend_text': | |
95 s = filter_dict['column_text'] | |
96 self.func = lambda i,l: '%s\t%s' % (s,l) | |
97 elif filter_dict['filter'] == 'append_text': | |
98 s = filter_dict['column_text'] | |
99 self.func = lambda i,l: '%s\t%s' % (l.rstrip('\r\n'),s) | |
100 elif filter_dict['filter'] == 'skip': | |
101 cnt = filter_dict['count'] | |
102 self.func = lambda i,l: l if i > cnt else None | |
103 elif filter_dict['filter'] == 'normalize': | |
104 cols = [int(c) - 1 for c in filter_dict['columns']] | |
105 sep = filter_dict['separator'] | |
106 self.func = lambda i,l: self.normalize(l,cols,sep) | |
107 def __iter__(self): | |
108 return self | |
109 def select_columns(self,line,cols): | |
110 fields = line.split('\t') | |
111 return '\t'.join([fields[x] for x in cols]) | |
112 def normalize(self,line,split_cols,sep): | |
113 lines = [] | |
114 fields = line.rstrip('\r\n').split('\t') | |
115 split_fields = dict() | |
116 cnt = 0 | |
117 for c in split_cols: | |
118 if c < len(fields): | |
119 split_fields[c] = fields[c].split(sep) | |
120 cnt = max(cnt, len(split_fields[c])) | |
121 if cnt == 0: | |
122 lines.append('\t'.join(fields)) | |
123 else: | |
124 for n in range(0, cnt): | |
125 flds = [x if c not in split_cols else split_fields[c][n] if n < len(split_fields[c]) else '' for (c, x) in enumerate(fields)] | |
126 lines.append('\t'.join(flds)) | |
127 return lines | |
128 def get_lines(self): | |
129 for i,next_line in enumerate(self.source): | |
130 self.src_line_cnt += 1 | |
131 line = self.func(self.src_line_cnt,next_line) | |
132 # print >> sys.stderr, 'LineFilter %s: %d %s' % (str(self.filter_dict),self.src_line_cnt,line) | |
133 if line: | |
134 if isinstance(line,list): | |
135 self.src_lines.extend(line) | |
136 else: | |
137 self.src_lines.append(line) | |
138 return | |
139 def next(self): | |
140 if not self.src_lines: | |
141 self.get_lines() | |
142 if self.src_lines: | |
143 return self.src_lines.pop(0) | |
144 raise StopIteration | |
145 | |
146 | |
147 class TabularReader: | |
148 """ | |
149 Tabular file iterator. Returns a list | |
150 """ | |
151 def __init__(self, file_path, skip=0, comment_char=None, col_idx=None, filters=None): | |
152 self.skip = skip | |
153 self.comment_char = comment_char | |
154 self.col_idx = col_idx | |
155 self.filters = filters | |
156 self.tsv_file = open(file_path) | |
157 if skip and skip > 0: | |
158 for i in range(skip): | |
159 if not self.tsv_file.readline(): | |
160 break | |
161 source = LineFilter(self.tsv_file,None) | |
162 if comment_char: | |
163 source = LineFilter(source,{"filter": "regex", "pattern": comment_char, "action": "exclude_match"}) | |
164 if filters: | |
165 for f in filters: | |
166 source = LineFilter(source,f) | |
167 self.source = source | |
168 def __iter__(self): | |
169 return self | |
170 def next(self): | |
171 ''' Iteration ''' | |
172 for i,line in enumerate(self.source): | |
173 fields = line.rstrip('\r\n').split('\t') | |
174 if self.col_idx: | |
175 fields = [fields[i] for i in self.col_idx] | |
176 return fields | |
177 raise StopIteration | |
178 | |
179 | |
180 def getValueType(val): | |
181 if val or 0. == val: | |
182 try: | |
183 int(val) | |
184 return 'INTEGER' | |
185 except: | |
186 try: | |
187 float(val) | |
188 return 'REAL' | |
189 except: | |
190 return 'TEXT' | |
191 return None | |
192 | |
193 | |
194 def get_column_def(file_path, table_name, skip=0, comment_char='#', | |
195 column_names=None, max_lines=100,load_named_columns=False,filters=None): | |
196 col_pref = ['TEXT', 'REAL', 'INTEGER', None] | |
197 col_types = [] | |
198 col_idx = None | |
199 data_lines = 0 | |
200 try: | |
201 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=None, filters=filters) | |
202 for linenum, fields in enumerate(tr): | |
203 if linenum > max_lines: | |
204 break | |
205 try: | |
206 while len(col_types) < len(fields): | |
207 col_types.append(None) | |
208 for i, val in enumerate(fields): | |
209 colType = getValueType(val) | |
210 if col_pref.index(colType) < col_pref.index(col_types[i]): | |
211 col_types[i] = colType | |
212 except Exception, e: | |
213 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) | |
214 except Exception, e: | |
215 print >> sys.stderr, 'Failed: %s' % (e) | |
216 for i,col_type in enumerate(col_types): | |
217 if not col_type: | |
218 col_types[i] = 'TEXT' | |
219 if column_names: | |
220 col_names = [] | |
221 if load_named_columns: | |
222 col_idx = [] | |
223 for i, cname in enumerate([cn.strip() for cn in column_names.split(',')]): | |
224 if cname != '': | |
225 col_idx.append(i) | |
226 col_names.append(cname) | |
227 col_types = [col_types[i] for i in col_idx] | |
228 else: | |
229 col_names = ['c%d' % i for i in range(1, len(col_types) + 1)] | |
230 for i, cname in enumerate([cn.strip() for cn in column_names.split(',')]): | |
231 if cname and i < len(col_names): | |
232 col_names[i] = cname | |
233 else: | |
234 col_names = ['c%d' % i for i in range(1, len(col_types) + 1)] | |
235 col_def = [] | |
236 for i, col_name in enumerate(col_names): | |
237 col_def.append('%s %s' % (col_names[i], col_types[i])) | |
238 return col_names, col_types, col_def, col_idx | |
239 | |
240 | |
241 def create_table(conn, file_path, table_name, skip=0, comment_char='#', pkey_autoincr=None, column_names=None,load_named_columns=False,filters=None,unique_indexes=[],indexes=[]): | |
242 | |
243 col_names, col_types, col_def, col_idx = get_column_def(file_path, table_name, skip=skip, comment_char=comment_char, | |
244 column_names=column_names,load_named_columns=load_named_columns,filters=filters) | |
245 col_func = [float if t == 'REAL' else int if t == 'INTEGER' else str for t in col_types] | |
246 table_def = 'CREATE TABLE %s (\n %s%s\n);' % ( | |
247 table_name, | |
248 '%s INTEGER PRIMARY KEY AUTOINCREMENT,' % pkey_autoincr if pkey_autoincr else '', | |
249 ', \n '.join(col_def)) | |
250 # print >> sys.stdout, table_def | |
251 insert_stmt = 'INSERT INTO %s(%s) VALUES(%s)' % (table_name, ','.join(col_names), ','.join(["?" for x in col_names])) | |
252 # print >> sys.stdout, insert_stmt | |
253 data_lines = 0 | |
254 try: | |
255 c = conn.cursor() | |
256 c.execute(table_def) | |
257 conn.commit() | |
258 c.close() | |
259 for i,index in enumerate(unique_indexes): | |
260 index_name='idx_uniq_%s_%d' % (table_name,i) | |
261 index_columns = index.split(',') | |
262 create_index(conn, table_name, index_name, index_columns,unique=True) | |
263 for i,index in enumerate(indexes): | |
264 index_name='idx_%s_%d' % (table_name,i) | |
265 index_columns = index.split(',') | |
266 create_index(conn, table_name, index_name, index_columns) | |
267 c = conn.cursor() | |
268 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=col_idx, filters=filters) | |
269 for linenum, fields in enumerate(tr): | |
270 data_lines += 1 | |
271 try: | |
272 vals = [col_func[i](x) if x else None for i, x in enumerate(fields)] | |
273 c.execute(insert_stmt, vals) | |
274 except Exception, e: | |
275 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e) | |
276 conn.commit() | |
277 c.close() | |
278 except Exception, e: | |
279 print >> sys.stderr, 'Failed: %s' % (e) | |
280 exit(1) | |
281 | |
282 | |
283 def create_index(conn, table_name, index_name, index_columns, unique=False): | |
284 index_def = "CREATE %s INDEX %s on %s(%s)" % ('UNIQUE' if unique else '', index_name, table_name, ','.join(index_columns)) | |
285 c = conn.cursor() | |
286 c.execute(index_def) | |
287 conn.commit() | |
288 c.close() | |
289 | |
290 | |
291 def regex_match(expr, item): | |
292 return re.match(expr, item) is not None | |
293 | |
294 | |
295 def regex_search(expr, item): | |
296 return re.search(expr, item) is not None | |
297 | |
298 | |
299 def regex_sub(expr, replace, item): | |
300 return re.sub(expr, replace, item) | |
301 | |
302 | |
303 def get_connection(sqlitedb_path, addfunctions=False): | |
304 conn = sqlite.connect(sqlitedb_path) | |
305 if addfunctions: | |
306 conn.create_function("re_match", 2, regex_match) | |
307 conn.create_function("re_search", 2, regex_search) | |
308 conn.create_function("re_sub", 3, regex_sub) | |
309 return conn | |
310 | |
311 | |
312 def __main__(): | 45 def __main__(): |
313 # Parse Command Line | 46 # Parse Command Line |
314 parser = optparse.OptionParser() | 47 parser = optparse.OptionParser() |
315 parser.add_option('-s', '--sqlitedb', dest='sqlitedb', default=None, help='The SQLite Database') | 48 parser.add_option('-s', '--sqlitedb', dest='sqlitedb', default=None, |
316 parser.add_option('-t', '--table', dest='tables', action="append", default=[], help='Tabular file: file_path[=table_name[:column_name, ...]') | 49 help='The SQLite Database') |
317 parser.add_option('-j', '--jsonfile', dest='jsonfile', default=None, help='Tabular file: file_path[=table_name[:column_name, ...]') | 50 parser.add_option('-j', '--jsonfile', dest='jsonfile', default=None, |
318 parser.add_option('-q', '--query', dest='query', default=None, help='SQL query') | 51 help='JSON dict of table specifications') |
319 parser.add_option('-Q', '--query_file', dest='query_file', default=None, help='SQL query file') | 52 parser.add_option('-q', '--query', dest='query', default=None, |
320 parser.add_option('-n', '--no_header', dest='no_header', action='store_true', default=False, help='Include a column headers line') | 53 help='SQL query') |
321 parser.add_option('-o', '--output', dest='output', default=None, help='Output file for query results') | 54 parser.add_option('-Q', '--query_file', dest='query_file', default=None, |
55 help='SQL query file') | |
56 parser.add_option('-n', '--no_header', dest='no_header', default=False, | |
57 action='store_true', | |
58 help='Include a column headers line') | |
59 parser.add_option('-o', '--output', dest='output', default=None, | |
60 help='Output file for query results') | |
322 (options, args) = parser.parse_args() | 61 (options, args) = parser.parse_args() |
323 | 62 |
324 def run_query(query,outputFile): | |
325 conn = get_connection(options.sqlitedb, addfunctions=True) | |
326 cur = conn.cursor() | |
327 results = cur.execute(query) | |
328 if not options.no_header: | |
329 outputFile.write("#%s\n" % '\t'.join([str(col[0]) for col in cur.description])) | |
330 # yield [col[0] for col in cur.description] | |
331 for i, row in enumerate(results): | |
332 # yield [val for val in row] | |
333 outputFile.write("%s\n" % '\t'.join([str(val) if val is not None else '' for val in row])) | |
334 | |
335 # open sqlite connection | |
336 conn = get_connection(options.sqlitedb) | |
337 # determine output destination | 63 # determine output destination |
338 if options.output is not None: | 64 if options.output is not None: |
339 try: | 65 try: |
340 outputPath = os.path.abspath(options.output) | 66 outputPath = os.path.abspath(options.output) |
341 outputFile = open(outputPath, 'w') | 67 outputFile = open(outputPath, 'w') |
342 except Exception, e: | 68 except Exception as e: |
343 print >> sys.stderr, "failed: %s" % e | 69 print("failed: %s" % e, file=sys.stderr) |
344 exit(3) | 70 exit(3) |
345 else: | 71 else: |
346 outputFile = sys.stdout | 72 outputFile = sys.stdout |
347 | 73 |
348 # get table defs | 74 def _create_table(ti, table): |
349 if options.tables: | 75 path = table['file_path'] |
350 for ti, table in enumerate(options.tables): | 76 table_name =\ |
351 table_name = 't%d' % (ti + 1) | 77 table['table_name'] if 'table_name' in table else 't%d' % (ti + 1) |
352 column_names = None | 78 comment_lines =\ |
353 fields = table.split('=') | 79 table['comment_lines'] if 'comment_lines' in table else 0 |
354 path = fields[0] | 80 comment_char =\ |
355 if len(fields) > 1: | 81 table['comment_char'] if 'comment_char' in table else None |
356 names = fields[1].split(':') | 82 column_names =\ |
357 table_name = names[0] if names[0] else table_name | 83 table['column_names'] if 'column_names' in table else None |
358 if len(names) > 1: | 84 if column_names: |
359 column_names = names[1] | 85 load_named_columns =\ |
360 # print >> sys.stdout, '%s %s' % (table_name, path) | 86 table['load_named_columns']\ |
361 create_table(conn, path, table_name, column_names=column_names) | 87 if 'load_named_columns' in table else False |
88 else: | |
89 load_named_columns = False | |
90 unique_indexes = table['unique'] if 'unique' in table else [] | |
91 indexes = table['index'] if 'index' in table else [] | |
92 filters = table['filters'] if 'filters' in table else None | |
93 pkey_autoincr = \ | |
94 table['pkey_autoincr'] if 'pkey_autoincr' in table else None | |
95 create_table(get_connection(options.sqlitedb), path, table_name, | |
96 pkey_autoincr=pkey_autoincr, | |
97 column_names=column_names, | |
98 skip=comment_lines, | |
99 comment_char=comment_char, | |
100 load_named_columns=load_named_columns, | |
101 filters=filters, | |
102 unique_indexes=unique_indexes, | |
103 indexes=indexes) | |
104 | |
362 if options.jsonfile: | 105 if options.jsonfile: |
363 try: | 106 try: |
364 fh = open(options.jsonfile) | 107 fh = open(options.jsonfile) |
365 tdef = json.load(fh) | 108 tdef = json.load(fh) |
366 if 'tables' in tdef: | 109 if 'tables' in tdef: |
367 for ti, table in enumerate(tdef['tables']): | 110 for ti, table in enumerate(tdef['tables']): |
368 path = table['file_path'] | 111 _create_table(ti, table) |
369 table_name = table['table_name'] if 'table_name' in table else 't%d' % (ti + 1) | 112 except Exception as exc: |
370 comment_lines = table['comment_lines'] if 'comment_lines' in table else 0 | 113 print("Error: %s" % exc, file=sys.stderr) |
371 comment_char = table['comment_char'] if 'comment_char' in table else None | |
372 column_names = table['column_names'] if 'column_names' in table else None | |
373 if column_names: | |
374 load_named_columns = table['load_named_columns'] if 'load_named_columns' in table else False | |
375 else: | |
376 load_named_columns = False | |
377 unique_indexes = table['unique'] if 'unique' in table else [] | |
378 indexes = table['index'] if 'index' in table else [] | |
379 filters = table['filters'] if 'filters' in table else None | |
380 pkey_autoincr = table['pkey_autoincr'] if 'pkey_autoincr' in table else None | |
381 create_table(conn, path, table_name, pkey_autoincr=pkey_autoincr, column_names=column_names, | |
382 skip=comment_lines, comment_char=comment_char, load_named_columns=load_named_columns, | |
383 filters=filters,unique_indexes=unique_indexes, indexes=indexes) | |
384 except Exception, exc: | |
385 print >> sys.stderr, "Error: %s" % exc | |
386 conn.close() | |
387 | 114 |
388 query = None | 115 query = None |
389 if (options.query_file is not None): | 116 if (options.query_file is not None): |
390 with open(options.query_file, 'r') as fh: | 117 with open(options.query_file, 'r') as fh: |
391 query = '' | 118 query = '' |
393 query += line | 120 query += line |
394 elif (options.query is not None): | 121 elif (options.query is not None): |
395 query = options.query | 122 query = options.query |
396 | 123 |
397 if (query is None): | 124 if (query is None): |
398 tables_query = \ | |
399 "SELECT name, sql FROM sqlite_master WHERE type='table' ORDER BY name" | |
400 try: | 125 try: |
401 conn = get_connection(options.sqlitedb) | 126 describe_tables(get_connection(options.sqlitedb), outputFile) |
402 c = conn.cursor() | 127 except Exception as exc: |
403 rslt = c.execute(tables_query).fetchall() | 128 print("Error: %s" % exc, file=sys.stderr) |
404 for table, sql in rslt: | 129 else: |
405 print >> sys.stderr, "Table %s:" % table | 130 try: |
406 try: | 131 run_query(get_connection(options.sqlitedb), query, outputFile, |
407 col_query = 'SELECT * FROM %s LIMIT 0' % table | 132 no_header=options.no_header) |
408 cur = conn.cursor().execute(col_query) | 133 except Exception as exc: |
409 cols = [col[0] for col in cur.description] | 134 print("Error: %s" % exc, file=sys.stderr) |
410 print >> sys.stderr, " Columns: %s" % cols | 135 exit(1) |
411 except Exception, exc: | 136 |
412 print >> sys.stderr, "Error: %s" % exc | |
413 except Exception, exc: | |
414 print >> sys.stderr, "Error: %s" % exc | |
415 exit(0) | |
416 # if not sqlite.is_read_only_query(query): | |
417 # print >> sys.stderr, "Error: Must be a read only query" | |
418 # exit(2) | |
419 try: | |
420 run_query(query,outputFile) | |
421 except Exception, exc: | |
422 print >> sys.stderr, "Error: %s" % exc | |
423 exit(1) | |
424 | 137 |
425 if __name__ == "__main__": | 138 if __name__ == "__main__": |
426 __main__() | 139 __main__() |