comparison query_tabular.py @ 20:ab27c4bd14b9 draft

Uploaded
author jjohnson
date Fri, 14 Jul 2017 11:39:27 -0400
parents b9f797bf4f38
children bed5018e7ae3
comparison
equal deleted inserted replaced
19:9d9ab2c69014 20:ab27c4bd14b9
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 """ 2
3 """ 3 from __future__ import print_function
4
5 import json
6 import optparse
7 import os.path
4 import sys 8 import sys
5 import re 9
6 import os.path 10 from load_db import create_table
7 import json 11
8 import sqlite3 as sqlite 12 from query_db import describe_tables, get_connection, run_query
9 import optparse 13
10 from optparse import OptionParser
11 14
12 """ 15 """
13 TODO:
14 - could read column names from comment lines, but issues with legal names
15 - could add some transformations on tabular columns,
16 filter - skip_regex
17 e.g. a regex to format date/time strings
18 format: {
19 c2 : re.sub('pat', 'sub', c2)
20 c3 : len(c3)
21 }
22 def format(colname,val, expr):
23 normalize input list columns
24 iterate over list values creating one row per iteration
25 option for input line_num column
26 create associated table
27 fk, name, value # e.g. PSM table with list of proteins containing peptide
28 fk, name, value[, value] # if multiple columns similarly indexed, e.g. vcf
29 - column_defs dict of columns to create from tabular input
30 column_defs : { 'name1' : 'expr', 'name2' : 'expr'}
31 - allow multiple queries and outputs
32 repeat min - max with up to max conditional outputs
33
34 - add a --json input for table definitions (or yaml)
35 JSON config: 16 JSON config:
36 { tables : [ 17 { tables : [
37 { file_path : '/home/galaxy/dataset_101.dat', 18 { file_path : '/home/galaxy/dataset_101.dat',
38 table_name : 't1', 19 table_name : 't1',
39 column_names : ['c1', 'c2', 'c3'], 20 column_names : ['c1','c2','c3'],
40 pkey_autoincr : 'id' 21 pkey_autoincr : 'id'
41 comment_lines : 1 22 comment_lines : 1
42 unique: ['c1'], 23 unique: ['c1'],
43 index: ['c2','c3'] 24 index: ['c2', 'c3']
44 }, 25 },
45 { file_path : '/home/galaxy/dataset_102.dat', 26 { file_path : '/home/galaxy/dataset_102.dat',
46 table_name : 'gff', 27 table_name : 'gff',
47 column_names : ['seqname',,'date','start','end'] 28 column_names : ['seqname',,'date','start','end']
48 comment_lines : 1 29 comment_lines : 1
49 load_named_columns : True 30 load_named_columns : True
50 filters : [{'filter': 'regex', 'pattern': '#peptide', 'action': 'exclude_match'}, 31 filters : [{'filter': 'regex', 'pattern': '#peptide',
51 {'filter': 'replace', 'column': 3, 'replace': 'gi[|]', 'pattern': ''}] 32 'action': 'exclude_match'},
33 {'filter': 'replace', 'column': 3,
34 'replace': 'gi[|]', 'pattern': ''}]
52 }, 35 },
53 { file_path : '/home/galaxy/dataset_103.dat', 36 { file_path : '/home/galaxy/dataset_103.dat',
54 table_name : 'test', 37 table_name : 'test',
55 column_names : ['c1', 'c2', 'c3'] 38 column_names : ['c1', 'c2', 'c3']
56 } 39 }
57 ] 40 ]
58 } 41 }
59 """ 42 """
60 43
61 44
62 class LineFilter( object ):
63 def __init__(self,source,filter_dict):
64 self.source = source
65 self.filter_dict = filter_dict
66 # print >> sys.stderr, 'LineFilter %s' % filter_dict if filter_dict else 'NONE'
67 self.func = lambda i,l: l.rstrip('\r\n') if l else None
68 self.src_lines = []
69 self.src_line_cnt = 0
70 if not filter_dict:
71 return
72 if filter_dict['filter'] == 'regex':
73 rgx = re.compile(filter_dict['pattern'])
74 if filter_dict['action'] == 'exclude_match':
75 self.func = lambda i,l: l if not rgx.match(l) else None
76 elif filter_dict['action'] == 'include_match':
77 self.func = lambda i,l: l if rgx.match(l) else None
78 elif filter_dict['action'] == 'exclude_find':
79 self.func = lambda i,l: l if not rgx.search(l) else None
80 elif filter_dict['action'] == 'include_find':
81 self.func = lambda i,l: l if rgx.search(l) else None
82 elif filter_dict['filter'] == 'select_columns':
83 cols = [int(c) - 1 for c in filter_dict['columns']]
84 self.func = lambda i,l: self.select_columns(l,cols)
85 elif filter_dict['filter'] == 'replace':
86 p = filter_dict['pattern']
87 r = filter_dict['replace']
88 c = int(filter_dict['column']) - 1
89 self.func = lambda i,l: '\t'.join([x if i != c else re.sub(p,r,x) for i,x in enumerate(l.split('\t'))])
90 elif filter_dict['filter'] == 'prepend_line_num':
91 self.func = lambda i,l: '%d\t%s' % (i,l)
92 elif filter_dict['filter'] == 'append_line_num':
93 self.func = lambda i,l: '%s\t%d' % (l.rstrip('\r\n'),i)
94 elif filter_dict['filter'] == 'prepend_text':
95 s = filter_dict['column_text']
96 self.func = lambda i,l: '%s\t%s' % (s,l)
97 elif filter_dict['filter'] == 'append_text':
98 s = filter_dict['column_text']
99 self.func = lambda i,l: '%s\t%s' % (l.rstrip('\r\n'),s)
100 elif filter_dict['filter'] == 'skip':
101 cnt = filter_dict['count']
102 self.func = lambda i,l: l if i > cnt else None
103 elif filter_dict['filter'] == 'normalize':
104 cols = [int(c) - 1 for c in filter_dict['columns']]
105 sep = filter_dict['separator']
106 self.func = lambda i,l: self.normalize(l,cols,sep)
107 def __iter__(self):
108 return self
109 def select_columns(self,line,cols):
110 fields = line.split('\t')
111 return '\t'.join([fields[x] for x in cols])
112 def normalize(self,line,split_cols,sep):
113 lines = []
114 fields = line.rstrip('\r\n').split('\t')
115 split_fields = dict()
116 cnt = 0
117 for c in split_cols:
118 if c < len(fields):
119 split_fields[c] = fields[c].split(sep)
120 cnt = max(cnt, len(split_fields[c]))
121 if cnt == 0:
122 lines.append('\t'.join(fields))
123 else:
124 for n in range(0, cnt):
125 flds = [x if c not in split_cols else split_fields[c][n] if n < len(split_fields[c]) else '' for (c, x) in enumerate(fields)]
126 lines.append('\t'.join(flds))
127 return lines
128 def get_lines(self):
129 for i,next_line in enumerate(self.source):
130 self.src_line_cnt += 1
131 line = self.func(self.src_line_cnt,next_line)
132 # print >> sys.stderr, 'LineFilter %s: %d %s' % (str(self.filter_dict),self.src_line_cnt,line)
133 if line:
134 if isinstance(line,list):
135 self.src_lines.extend(line)
136 else:
137 self.src_lines.append(line)
138 return
139 def next(self):
140 if not self.src_lines:
141 self.get_lines()
142 if self.src_lines:
143 return self.src_lines.pop(0)
144 raise StopIteration
145
146
147 class TabularReader:
148 """
149 Tabular file iterator. Returns a list
150 """
151 def __init__(self, file_path, skip=0, comment_char=None, col_idx=None, filters=None):
152 self.skip = skip
153 self.comment_char = comment_char
154 self.col_idx = col_idx
155 self.filters = filters
156 self.tsv_file = open(file_path)
157 if skip and skip > 0:
158 for i in range(skip):
159 if not self.tsv_file.readline():
160 break
161 source = LineFilter(self.tsv_file,None)
162 if comment_char:
163 source = LineFilter(source,{"filter": "regex", "pattern": comment_char, "action": "exclude_match"})
164 if filters:
165 for f in filters:
166 source = LineFilter(source,f)
167 self.source = source
168 def __iter__(self):
169 return self
170 def next(self):
171 ''' Iteration '''
172 for i,line in enumerate(self.source):
173 fields = line.rstrip('\r\n').split('\t')
174 if self.col_idx:
175 fields = [fields[i] for i in self.col_idx]
176 return fields
177 raise StopIteration
178
179
180 def getValueType(val):
181 if val or 0. == val:
182 try:
183 int(val)
184 return 'INTEGER'
185 except:
186 try:
187 float(val)
188 return 'REAL'
189 except:
190 return 'TEXT'
191 return None
192
193
194 def get_column_def(file_path, table_name, skip=0, comment_char='#',
195 column_names=None, max_lines=100,load_named_columns=False,filters=None):
196 col_pref = ['TEXT', 'REAL', 'INTEGER', None]
197 col_types = []
198 col_idx = None
199 data_lines = 0
200 try:
201 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=None, filters=filters)
202 for linenum, fields in enumerate(tr):
203 if linenum > max_lines:
204 break
205 try:
206 while len(col_types) < len(fields):
207 col_types.append(None)
208 for i, val in enumerate(fields):
209 colType = getValueType(val)
210 if col_pref.index(colType) < col_pref.index(col_types[i]):
211 col_types[i] = colType
212 except Exception, e:
213 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e)
214 except Exception, e:
215 print >> sys.stderr, 'Failed: %s' % (e)
216 for i,col_type in enumerate(col_types):
217 if not col_type:
218 col_types[i] = 'TEXT'
219 if column_names:
220 col_names = []
221 if load_named_columns:
222 col_idx = []
223 for i, cname in enumerate([cn.strip() for cn in column_names.split(',')]):
224 if cname != '':
225 col_idx.append(i)
226 col_names.append(cname)
227 col_types = [col_types[i] for i in col_idx]
228 else:
229 col_names = ['c%d' % i for i in range(1, len(col_types) + 1)]
230 for i, cname in enumerate([cn.strip() for cn in column_names.split(',')]):
231 if cname and i < len(col_names):
232 col_names[i] = cname
233 else:
234 col_names = ['c%d' % i for i in range(1, len(col_types) + 1)]
235 col_def = []
236 for i, col_name in enumerate(col_names):
237 col_def.append('%s %s' % (col_names[i], col_types[i]))
238 return col_names, col_types, col_def, col_idx
239
240
241 def create_table(conn, file_path, table_name, skip=0, comment_char='#', pkey_autoincr=None, column_names=None,load_named_columns=False,filters=None,unique_indexes=[],indexes=[]):
242
243 col_names, col_types, col_def, col_idx = get_column_def(file_path, table_name, skip=skip, comment_char=comment_char,
244 column_names=column_names,load_named_columns=load_named_columns,filters=filters)
245 col_func = [float if t == 'REAL' else int if t == 'INTEGER' else str for t in col_types]
246 table_def = 'CREATE TABLE %s (\n %s%s\n);' % (
247 table_name,
248 '%s INTEGER PRIMARY KEY AUTOINCREMENT,' % pkey_autoincr if pkey_autoincr else '',
249 ', \n '.join(col_def))
250 # print >> sys.stdout, table_def
251 insert_stmt = 'INSERT INTO %s(%s) VALUES(%s)' % (table_name, ','.join(col_names), ','.join(["?" for x in col_names]))
252 # print >> sys.stdout, insert_stmt
253 data_lines = 0
254 try:
255 c = conn.cursor()
256 c.execute(table_def)
257 conn.commit()
258 c.close()
259 for i,index in enumerate(unique_indexes):
260 index_name='idx_uniq_%s_%d' % (table_name,i)
261 index_columns = index.split(',')
262 create_index(conn, table_name, index_name, index_columns,unique=True)
263 for i,index in enumerate(indexes):
264 index_name='idx_%s_%d' % (table_name,i)
265 index_columns = index.split(',')
266 create_index(conn, table_name, index_name, index_columns)
267 c = conn.cursor()
268 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=col_idx, filters=filters)
269 for linenum, fields in enumerate(tr):
270 data_lines += 1
271 try:
272 vals = [col_func[i](x) if x else None for i, x in enumerate(fields)]
273 c.execute(insert_stmt, vals)
274 except Exception, e:
275 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e)
276 conn.commit()
277 c.close()
278 except Exception, e:
279 print >> sys.stderr, 'Failed: %s' % (e)
280 exit(1)
281
282
283 def create_index(conn, table_name, index_name, index_columns, unique=False):
284 index_def = "CREATE %s INDEX %s on %s(%s)" % ('UNIQUE' if unique else '', index_name, table_name, ','.join(index_columns))
285 c = conn.cursor()
286 c.execute(index_def)
287 conn.commit()
288 c.close()
289
290
291 def regex_match(expr, item):
292 return re.match(expr, item) is not None
293
294
295 def regex_search(expr, item):
296 return re.search(expr, item) is not None
297
298
299 def regex_sub(expr, replace, item):
300 return re.sub(expr, replace, item)
301
302
303 def get_connection(sqlitedb_path, addfunctions=False):
304 conn = sqlite.connect(sqlitedb_path)
305 if addfunctions:
306 conn.create_function("re_match", 2, regex_match)
307 conn.create_function("re_search", 2, regex_search)
308 conn.create_function("re_sub", 3, regex_sub)
309 return conn
310
311
312 def __main__(): 45 def __main__():
313 # Parse Command Line 46 # Parse Command Line
314 parser = optparse.OptionParser() 47 parser = optparse.OptionParser()
315 parser.add_option('-s', '--sqlitedb', dest='sqlitedb', default=None, help='The SQLite Database') 48 parser.add_option('-s', '--sqlitedb', dest='sqlitedb', default=None,
316 parser.add_option('-t', '--table', dest='tables', action="append", default=[], help='Tabular file: file_path[=table_name[:column_name, ...]') 49 help='The SQLite Database')
317 parser.add_option('-j', '--jsonfile', dest='jsonfile', default=None, help='Tabular file: file_path[=table_name[:column_name, ...]') 50 parser.add_option('-j', '--jsonfile', dest='jsonfile', default=None,
318 parser.add_option('-q', '--query', dest='query', default=None, help='SQL query') 51 help='JSON dict of table specifications')
319 parser.add_option('-Q', '--query_file', dest='query_file', default=None, help='SQL query file') 52 parser.add_option('-q', '--query', dest='query', default=None,
320 parser.add_option('-n', '--no_header', dest='no_header', action='store_true', default=False, help='Include a column headers line') 53 help='SQL query')
321 parser.add_option('-o', '--output', dest='output', default=None, help='Output file for query results') 54 parser.add_option('-Q', '--query_file', dest='query_file', default=None,
55 help='SQL query file')
56 parser.add_option('-n', '--no_header', dest='no_header', default=False,
57 action='store_true',
58 help='Include a column headers line')
59 parser.add_option('-o', '--output', dest='output', default=None,
60 help='Output file for query results')
322 (options, args) = parser.parse_args() 61 (options, args) = parser.parse_args()
323 62
324 def run_query(query,outputFile):
325 conn = get_connection(options.sqlitedb, addfunctions=True)
326 cur = conn.cursor()
327 results = cur.execute(query)
328 if not options.no_header:
329 outputFile.write("#%s\n" % '\t'.join([str(col[0]) for col in cur.description]))
330 # yield [col[0] for col in cur.description]
331 for i, row in enumerate(results):
332 # yield [val for val in row]
333 outputFile.write("%s\n" % '\t'.join([str(val) if val is not None else '' for val in row]))
334
335 # open sqlite connection
336 conn = get_connection(options.sqlitedb)
337 # determine output destination 63 # determine output destination
338 if options.output is not None: 64 if options.output is not None:
339 try: 65 try:
340 outputPath = os.path.abspath(options.output) 66 outputPath = os.path.abspath(options.output)
341 outputFile = open(outputPath, 'w') 67 outputFile = open(outputPath, 'w')
342 except Exception, e: 68 except Exception as e:
343 print >> sys.stderr, "failed: %s" % e 69 print("failed: %s" % e, file=sys.stderr)
344 exit(3) 70 exit(3)
345 else: 71 else:
346 outputFile = sys.stdout 72 outputFile = sys.stdout
347 73
348 # get table defs 74 def _create_table(ti, table):
349 if options.tables: 75 path = table['file_path']
350 for ti, table in enumerate(options.tables): 76 table_name =\
351 table_name = 't%d' % (ti + 1) 77 table['table_name'] if 'table_name' in table else 't%d' % (ti + 1)
352 column_names = None 78 comment_lines =\
353 fields = table.split('=') 79 table['comment_lines'] if 'comment_lines' in table else 0
354 path = fields[0] 80 comment_char =\
355 if len(fields) > 1: 81 table['comment_char'] if 'comment_char' in table else None
356 names = fields[1].split(':') 82 column_names =\
357 table_name = names[0] if names[0] else table_name 83 table['column_names'] if 'column_names' in table else None
358 if len(names) > 1: 84 if column_names:
359 column_names = names[1] 85 load_named_columns =\
360 # print >> sys.stdout, '%s %s' % (table_name, path) 86 table['load_named_columns']\
361 create_table(conn, path, table_name, column_names=column_names) 87 if 'load_named_columns' in table else False
88 else:
89 load_named_columns = False
90 unique_indexes = table['unique'] if 'unique' in table else []
91 indexes = table['index'] if 'index' in table else []
92 filters = table['filters'] if 'filters' in table else None
93 pkey_autoincr = \
94 table['pkey_autoincr'] if 'pkey_autoincr' in table else None
95 create_table(get_connection(options.sqlitedb), path, table_name,
96 pkey_autoincr=pkey_autoincr,
97 column_names=column_names,
98 skip=comment_lines,
99 comment_char=comment_char,
100 load_named_columns=load_named_columns,
101 filters=filters,
102 unique_indexes=unique_indexes,
103 indexes=indexes)
104
362 if options.jsonfile: 105 if options.jsonfile:
363 try: 106 try:
364 fh = open(options.jsonfile) 107 fh = open(options.jsonfile)
365 tdef = json.load(fh) 108 tdef = json.load(fh)
366 if 'tables' in tdef: 109 if 'tables' in tdef:
367 for ti, table in enumerate(tdef['tables']): 110 for ti, table in enumerate(tdef['tables']):
368 path = table['file_path'] 111 _create_table(ti, table)
369 table_name = table['table_name'] if 'table_name' in table else 't%d' % (ti + 1) 112 except Exception as exc:
370 comment_lines = table['comment_lines'] if 'comment_lines' in table else 0 113 print("Error: %s" % exc, file=sys.stderr)
371 comment_char = table['comment_char'] if 'comment_char' in table else None
372 column_names = table['column_names'] if 'column_names' in table else None
373 if column_names:
374 load_named_columns = table['load_named_columns'] if 'load_named_columns' in table else False
375 else:
376 load_named_columns = False
377 unique_indexes = table['unique'] if 'unique' in table else []
378 indexes = table['index'] if 'index' in table else []
379 filters = table['filters'] if 'filters' in table else None
380 pkey_autoincr = table['pkey_autoincr'] if 'pkey_autoincr' in table else None
381 create_table(conn, path, table_name, pkey_autoincr=pkey_autoincr, column_names=column_names,
382 skip=comment_lines, comment_char=comment_char, load_named_columns=load_named_columns,
383 filters=filters,unique_indexes=unique_indexes, indexes=indexes)
384 except Exception, exc:
385 print >> sys.stderr, "Error: %s" % exc
386 conn.close()
387 114
388 query = None 115 query = None
389 if (options.query_file is not None): 116 if (options.query_file is not None):
390 with open(options.query_file, 'r') as fh: 117 with open(options.query_file, 'r') as fh:
391 query = '' 118 query = ''
393 query += line 120 query += line
394 elif (options.query is not None): 121 elif (options.query is not None):
395 query = options.query 122 query = options.query
396 123
397 if (query is None): 124 if (query is None):
398 tables_query = \
399 "SELECT name, sql FROM sqlite_master WHERE type='table' ORDER BY name"
400 try: 125 try:
401 conn = get_connection(options.sqlitedb) 126 describe_tables(get_connection(options.sqlitedb), outputFile)
402 c = conn.cursor() 127 except Exception as exc:
403 rslt = c.execute(tables_query).fetchall() 128 print("Error: %s" % exc, file=sys.stderr)
404 for table, sql in rslt: 129 else:
405 print >> sys.stderr, "Table %s:" % table 130 try:
406 try: 131 run_query(get_connection(options.sqlitedb), query, outputFile,
407 col_query = 'SELECT * FROM %s LIMIT 0' % table 132 no_header=options.no_header)
408 cur = conn.cursor().execute(col_query) 133 except Exception as exc:
409 cols = [col[0] for col in cur.description] 134 print("Error: %s" % exc, file=sys.stderr)
410 print >> sys.stderr, " Columns: %s" % cols 135 exit(1)
411 except Exception, exc: 136
412 print >> sys.stderr, "Error: %s" % exc
413 except Exception, exc:
414 print >> sys.stderr, "Error: %s" % exc
415 exit(0)
416 # if not sqlite.is_read_only_query(query):
417 # print >> sys.stderr, "Error: Must be a read only query"
418 # exit(2)
419 try:
420 run_query(query,outputFile)
421 except Exception, exc:
422 print >> sys.stderr, "Error: %s" % exc
423 exit(1)
424 137
425 if __name__ == "__main__": 138 if __name__ == "__main__":
426 __main__() 139 __main__()