Mercurial > repos > iuc > filter_tabular
comparison query_db.py @ 0:6fbd9d25ceef draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/query_tabular commit 74915fc9cee746bbce1c4b507e13231259de177d
author | iuc |
---|---|
date | Tue, 18 Jul 2017 09:06:47 -0400 |
parents | |
children | cd2a99849f8b |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:6fbd9d25ceef |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 from __future__ import print_function | |
4 | |
5 import re | |
6 import sqlite3 as sqlite | |
7 import sys | |
8 | |
9 | |
10 TABLE_QUERY = \ | |
11 """ | |
12 SELECT name, sql | |
13 FROM sqlite_master | |
14 WHERE type='table' | |
15 ORDER BY name | |
16 """ | |
17 | |
18 | |
19 def regex_match(expr, item): | |
20 return re.match(expr, item) is not None | |
21 | |
22 | |
23 def regex_search(expr, item): | |
24 return re.search(expr, item) is not None | |
25 | |
26 | |
27 def regex_sub(expr, replace, item): | |
28 return re.sub(expr, replace, item) | |
29 | |
30 | |
31 def get_connection(sqlitedb_path, addfunctions=True): | |
32 conn = sqlite.connect(sqlitedb_path) | |
33 if addfunctions: | |
34 conn.create_function("re_match", 2, regex_match) | |
35 conn.create_function("re_search", 2, regex_search) | |
36 conn.create_function("re_sub", 3, regex_sub) | |
37 return conn | |
38 | |
39 | |
40 def describe_tables(conn, outputFile): | |
41 try: | |
42 c = conn.cursor() | |
43 tables_query = TABLE_QUERY | |
44 rslt = c.execute(tables_query).fetchall() | |
45 for table, sql in rslt: | |
46 print("Table %s:" % table, file=outputFile) | |
47 try: | |
48 col_query = 'SELECT * FROM %s LIMIT 0' % table | |
49 cur = conn.cursor().execute(col_query) | |
50 cols = [col[0] for col in cur.description] | |
51 print(" Columns: %s" % cols, file=outputFile) | |
52 except Exception as exc: | |
53 print("Warning: %s" % exc, file=sys.stderr) | |
54 except Exception as e: | |
55 exit('Error: %s' % (e)) | |
56 exit(0) | |
57 | |
58 | |
59 def run_query(conn, query, outputFile, no_header=False): | |
60 cur = conn.cursor() | |
61 results = cur.execute(query) | |
62 if not no_header: | |
63 outputFile.write("#%s\n" % '\t'.join( | |
64 [str(col[0]) for col in cur.description])) | |
65 for i, row in enumerate(results): | |
66 outputFile.write("%s\n" % '\t'.join( | |
67 [str(val) if val is not None else '' for val in row])) |