comparison query_tabular.py @ 6:03842a4f71c6 draft

Uploaded
author jjohnson
date Fri, 17 Feb 2017 15:20:24 -0500
parents 19ae309ec53c
children 72c32037fa1e
comparison
equal deleted inserted replaced
5:19ae309ec53c 6:03842a4f71c6
11 11
12 """ 12 """
13 TODO: 13 TODO:
14 - could read column names from comment lines, but issues with legal names 14 - could read column names from comment lines, but issues with legal names
15 - could add some transformations on tabular columns, 15 - could add some transformations on tabular columns,
16 filter - skip_regex
16 e.g. a regex to format date/time strings 17 e.g. a regex to format date/time strings
17 format: { 18 format: {
18 c2 : re.sub('pat', 'sub', c2) 19 c2 : re.sub('pat', 'sub', c2)
19 c3 : len(c3) 20 c3 : len(c3)
20 } 21 }
21 def format(colname,val, expr): 22 def format(colname,val, expr):
23 normalize input list columns
24 iterate over list values creating one row per iteration
25 option for input line_num column
26 create associated table
27 fk, name, value # e.g. PSM table with list of proteins containing peptide
28 fk, name, value[, value] # if multiple columns similarly indexed, e.g. vcf
22 - column_defs dict of columns to create from tabular input 29 - column_defs dict of columns to create from tabular input
23 column_defs : { 'name1' : 'expr', 'name2' : 'expr'} 30 column_defs : { 'name1' : 'expr', 'name2' : 'expr'}
24 - allow multiple queries and outputs 31 - allow multiple queries and outputs
32 repeat min - max with up to max conditional outputs
33
25 - add a --json input for table definitions (or yaml) 34 - add a --json input for table definitions (or yaml)
26 JSON config: 35 JSON config:
27 { tables : [ 36 { tables : [
28 { file_path : '/home/galaxy/dataset_101.dat', 37 { file_path : '/home/galaxy/dataset_101.dat',
29 table_name : 't1', 38 table_name : 't1',
33 unique: ['c1'], 42 unique: ['c1'],
34 index: ['c2','c3'] 43 index: ['c2','c3']
35 }, 44 },
36 { file_path : '/home/galaxy/dataset_102.dat', 45 { file_path : '/home/galaxy/dataset_102.dat',
37 table_name : 'gff', 46 table_name : 'gff',
38 column_names : ['seqname',,,'start','end'] 47 column_names : ['seqname',,'date','start','end']
39 comment_lines : 1 48 comment_lines : 1
40 load_named_columns : True 49 load_named_columns : True
50 filters : [{'filter': 'regex', 'pattern': '#peptide', 'action': 'exclude_match'},
51 {'filter': 'replace', 'column': 3, 'replace': 'gi[|]', 'pattern': ''}]
41 }, 52 },
42 { file_path : '/home/galaxy/dataset_103.dat', 53 { file_path : '/home/galaxy/dataset_103.dat',
43 table_name : 'test', 54 table_name : 'test',
44 column_names : ['c1', 'c2', 'c3'] 55 column_names : ['c1', 'c2', 'c3']
45 } 56 }
46 ] 57 ]
47 } 58 }
48 """ 59 """
49 60
50 tables_query = \ 61
51 "SELECT name, sql FROM sqlite_master WHERE type='table' ORDER BY name" 62 class LineFilter( object ):
63 def __init__(self,source,filter_dict):
64 self.source = source
65 self.filter_dict = filter_dict
66 print >> sys.stderr, 'LineFilter %s' % filter_dict if filter_dict else 'NONE'
67 self.func = lambda l: l.rstrip('\r\n') if l else None
68 if not filter_dict:
69 return
70 if filter_dict['filter'] == 'regex':
71 rgx = re.compile(filter_dict['pattern'])
72 if filter_dict['action'] == 'exclude_match':
73 self.func = lambda l: l if not rgx.match(l) else None
74 elif filter_dict['action'] == 'include_match':
75 self.func = lambda l: l if rgx.match(l) else None
76 elif filter_dict['action'] == 'exclude_find':
77 self.func = lambda l: l if not rgx.search(l) else None
78 elif filter_dict['action'] == 'include_find':
79 self.func = lambda l: l if rgx.search(l) else None
80 elif filter_dict['filter'] == 'replace':
81 p = filter_dict['pattern']
82 r = filter_dict['replace']
83 c = int(filter_dict['column']) - 1
84 self.func = lambda l: '\t'.join([x if i != c else re.sub(p,r,x) for i,x in enumerate(l.split('\t'))])
85 def __iter__(self):
86 return self
87 def next(self):
88 for i,next_line in enumerate(self.source):
89 line = self.func(next_line)
90 if line:
91 return line
92 raise StopIteration
93
94
95 class TabularReader:
96 """
97 Tabular file iterator. Returns a list
98 """
99 def __init__(self, file_path, skip=0, comment_char=None, col_idx=None, filters=None):
100 self.skip = skip
101 self.comment_char = comment_char
102 self.col_idx = col_idx
103 self.filters = filters
104 self.tsv_file = open(file_path)
105 if skip and skip > 0:
106 for i in range(5):
107 if not self.tsv_file.readline():
108 break
109 source = LineFilter(self.tsv_file,None)
110 if comment_char:
111 source = LineFilter(source,{"filter": "regex", "pattern": comment_char, "action": "exclude_match"})
112 if filters:
113 for f in filters:
114 source = LineFilter(source,f)
115 self.source = source
116 def __iter__(self):
117 return self
118 def next(self):
119 ''' Iteration '''
120 for i,line in enumerate(self.source):
121 fields = line.rstrip('\r\n').split('\t')
122 if self.col_idx:
123 fields = [fields[i] for i in self.col_idx]
124 return fields
125 raise StopIteration
52 126
53 127
54 def getValueType(val): 128 def getValueType(val):
55 if val or 0. == val: 129 if val or 0. == val:
56 try: 130 try:
64 return 'TEXT' 138 return 'TEXT'
65 return None 139 return None
66 140
67 141
68 def get_column_def(file_path, table_name, skip=0, comment_char='#', 142 def get_column_def(file_path, table_name, skip=0, comment_char='#',
69 column_names=None, max_lines=100,load_named_columns=False): 143 column_names=None, max_lines=100,load_named_columns=False,filters=None):
70 col_pref = ['TEXT', 'REAL', 'INTEGER', None] 144 col_pref = ['TEXT', 'REAL', 'INTEGER', None]
71 col_types = [] 145 col_types = []
72 col_idx = None 146 col_idx = None
73 data_lines = 0 147 data_lines = 0
74
75 try: 148 try:
76 with open(file_path, "r") as fh: 149 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=None, filters=filters)
77 for linenum, line in enumerate(fh): 150 for linenum, fields in enumerate(tr):
78 if linenum < skip: 151 if linenum > max_lines:
79 continue 152 break
80 if line.startswith(comment_char): 153 try:
81 continue 154 while len(col_types) < len(fields):
82 data_lines += 1 155 col_types.append(None)
83 try: 156 for i, val in enumerate(fields):
84 fields = line.split('\t') 157 colType = getValueType(val)
85 while len(col_types) < len(fields): 158 if col_pref.index(colType) < col_pref.index(col_types[i]):
86 col_types.append(None) 159 col_types[i] = colType
87 for i, val in enumerate(fields): 160 except Exception, e:
88 colType = getValueType(val) 161 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e)
89 if col_pref.index(colType) < col_pref.index(col_types[i]):
90 col_types[i] = colType
91 except Exception, e:
92 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e)
93 except Exception, e: 162 except Exception, e:
94 print >> sys.stderr, 'Failed: %s' % (e) 163 print >> sys.stderr, 'Failed: %s' % (e)
95 for i,col_type in enumerate(col_types): 164 for i,col_type in enumerate(col_types):
96 if not col_type: 165 if not col_type:
97 col_types[i] = 'TEXT' 166 col_types[i] = 'TEXT'
115 for i, col_name in enumerate(col_names): 184 for i, col_name in enumerate(col_names):
116 col_def.append('%s %s' % (col_names[i], col_types[i])) 185 col_def.append('%s %s' % (col_names[i], col_types[i]))
117 return col_names, col_types, col_def, col_idx 186 return col_names, col_types, col_def, col_idx
118 187
119 188
120 def create_table(conn, file_path, table_name, skip=0, comment_char='#', pkey_autoincr=None, column_names=None,load_named_columns=False,unique_indexes=[],indexes=[]): 189 def create_table(conn, file_path, table_name, skip=0, comment_char='#', pkey_autoincr=None, column_names=None,load_named_columns=False,filters=None,unique_indexes=[],indexes=[]):
121 col_names, col_types, col_def, col_idx = get_column_def(file_path, table_name, skip=skip, comment_char=comment_char, column_names=column_names,load_named_columns=load_named_columns) 190
191 col_names, col_types, col_def, col_idx = get_column_def(file_path, table_name, skip=skip, comment_char=comment_char,
192 column_names=column_names,load_named_columns=load_named_columns,filters=filters)
122 col_func = [float if t == 'REAL' else int if t == 'INTEGER' else str for t in col_types] 193 col_func = [float if t == 'REAL' else int if t == 'INTEGER' else str for t in col_types]
123 table_def = 'CREATE TABLE %s (\n %s%s\n);' % ( 194 table_def = 'CREATE TABLE %s (\n %s%s\n);' % (
124 table_name, 195 table_name,
125 '%s INTEGER PRIMARY KEY AUTOINCREMENT,' % pkey_autoincr if pkey_autoincr else '', 196 '%s INTEGER PRIMARY KEY AUTOINCREMENT,' % pkey_autoincr if pkey_autoincr else '',
126 ', \n '.join(col_def)) 197 ', \n '.join(col_def))
127 # print >> sys.stdout, table_def 198 # print >> sys.stdout, table_def
128 insert_stmt = 'INSERT INTO %s(%s) VALUES(%s)' % (table_name, ','.join(col_names), ','.join(["?" for x in col_names])) 199 insert_stmt = 'INSERT INTO %s(%s) VALUES(%s)' % (table_name, ','.join(col_names), ','.join(["?" for x in col_names]))
129 # print >> sys.stdout, insert_stmt 200 # print >> sys.stdout, insert_stmt
140 for i,index in enumerate(indexes): 211 for i,index in enumerate(indexes):
141 index_name='idx_%s_%d' % (table_name,i) 212 index_name='idx_%s_%d' % (table_name,i)
142 index_columns = index.split(',') 213 index_columns = index.split(',')
143 create_index(conn, table_name, index_name, index_columns) 214 create_index(conn, table_name, index_name, index_columns)
144 c = conn.cursor() 215 c = conn.cursor()
145 with open(file_path, "r") as fh: 216 tr = TabularReader(file_path,skip=skip, comment_char=comment_char, col_idx=col_idx, filters=filters)
146 for linenum, line in enumerate(fh): 217 for linenum, fields in enumerate(tr):
147 if linenum < skip or line.startswith(comment_char): 218 data_lines += 1
148 continue 219 try:
149 data_lines += 1 220 if col_idx:
150 try: 221 fields = [fields[i] for i in col_idx]
151 fields = line.rstrip('\r\n').split('\t') 222 vals = [col_func[i](x) if x else None for i, x in enumerate(fields)]
152 if col_idx: 223 c.execute(insert_stmt, vals)
153 fields = [fields[i] for i in col_idx] 224 except Exception, e:
154 vals = [col_func[i](x) if x else None for i, x in enumerate(fields)] 225 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e)
155 c.execute(insert_stmt, vals)
156 except Exception, e:
157 print >> sys.stderr, 'Failed at line: %d err: %s' % (linenum, e)
158 conn.commit() 226 conn.commit()
159 c.close() 227 c.close()
160 except Exception, e: 228 except Exception, e:
161 print >> sys.stderr, 'Failed: %s' % (e) 229 print >> sys.stderr, 'Failed: %s' % (e)
162 exit(1) 230 exit(1)
231
163 232
164 def create_index(conn, table_name, index_name, index_columns, unique=False): 233 def create_index(conn, table_name, index_name, index_columns, unique=False):
165 index_def = "CREATE %s INDEX %s on %s(%s)" % ('UNIQUE' if unique else '', index_name, table_name, ','.join(index_columns)) 234 index_def = "CREATE %s INDEX %s on %s(%s)" % ('UNIQUE' if unique else '', index_name, table_name, ','.join(index_columns))
166 c = conn.cursor() 235 c = conn.cursor()
167 c.execute(index_def) 236 c.execute(index_def)
168 conn.commit() 237 conn.commit()
169 c.close() 238 c.close()
239
170 240
171 def regex_match(expr, item): 241 def regex_match(expr, item):
172 return re.match(expr, item) is not None 242 return re.match(expr, item) is not None
173 243
174 244
235 if 'tables' in tdef: 305 if 'tables' in tdef:
236 for ti, table in enumerate(tdef['tables']): 306 for ti, table in enumerate(tdef['tables']):
237 path = table['file_path'] 307 path = table['file_path']
238 table_name = table['table_name'] if 'table_name' in table else 't%d' % (ti + 1) 308 table_name = table['table_name'] if 'table_name' in table else 't%d' % (ti + 1)
239 comment_lines = table['comment_lines'] if 'comment_lines' in table else 0 309 comment_lines = table['comment_lines'] if 'comment_lines' in table else 0
310 comment_char = table['comment_char'] if 'comment_char' in table else None
240 column_names = table['column_names'] if 'column_names' in table else None 311 column_names = table['column_names'] if 'column_names' in table else None
241 if column_names: 312 if column_names:
242 load_named_columns = table['load_named_columns'] if 'load_named_columns' in table else False 313 load_named_columns = table['load_named_columns'] if 'load_named_columns' in table else False
243 else: 314 else:
244 load_named_columns = False 315 load_named_columns = False
245 unique_indexes = table['unique'] if 'unique' in table else [] 316 unique_indexes = table['unique'] if 'unique' in table else []
246 indexes = table['index'] if 'index' in table else [] 317 indexes = table['index'] if 'index' in table else []
318 filters = table['filters'] if 'filters' in table else None
247 pkey_autoincr = table['pkey_autoincr'] if 'pkey_autoincr' in table else None 319 pkey_autoincr = table['pkey_autoincr'] if 'pkey_autoincr' in table else None
248 create_table(conn, path, table_name, pkey_autoincr=pkey_autoincr, column_names=column_names, 320 create_table(conn, path, table_name, pkey_autoincr=pkey_autoincr, column_names=column_names,
249 skip=comment_lines, load_named_columns=load_named_columns, 321 skip=comment_lines, comment_char=comment_char, load_named_columns=load_named_columns,
250 unique_indexes=unique_indexes, indexes=indexes) 322 filters=filters,unique_indexes=unique_indexes, indexes=indexes)
251 except Exception, exc: 323 except Exception, exc:
252 print >> sys.stderr, "Error: %s" % exc 324 print >> sys.stderr, "Error: %s" % exc
253 conn.close() 325 conn.close()
254 326
255 query = None 327 query = None
260 query += line 332 query += line
261 elif (options.query is not None): 333 elif (options.query is not None):
262 query = options.query 334 query = options.query
263 335
264 if (query is None): 336 if (query is None):
337 tables_query = \
338 "SELECT name, sql FROM sqlite_master WHERE type='table' ORDER BY name"
265 try: 339 try:
266 conn = get_connection(options.sqlitedb) 340 conn = get_connection(options.sqlitedb)
267 c = conn.cursor() 341 c = conn.cursor()
268 rslt = c.execute(tables_query).fetchall() 342 rslt = c.execute(tables_query).fetchall()
269 for table, sql in rslt: 343 for table, sql in rslt: