Mercurial > repos > iuc > sqlite_to_tabular
comparison filters.py @ 15:c4d18aa4ec4a draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/query_tabular commit dd35055c76d86fe98985b5825c1751efb8208242
author | iuc |
---|---|
date | Thu, 27 Jun 2024 17:23:20 +0000 |
parents | c29d2f80a066 |
children |
comparison
equal
deleted
inserted
replaced
14:f768f6183386 | 15:c4d18aa4ec4a |
---|---|
9 | 9 |
10 class LineFilter(object): | 10 class LineFilter(object): |
11 def __init__(self, source, filter_dict): | 11 def __init__(self, source, filter_dict): |
12 self.source = source | 12 self.source = source |
13 self.filter_dict = filter_dict | 13 self.filter_dict = filter_dict |
14 self.func = lambda i, l: l.rstrip('\r\n') if l else None | 14 self.func = lambda i, line: line.rstrip('\r\n') if line else None |
15 self.src_lines = [] | 15 self.src_lines = [] |
16 self.src_line_cnt = 0 | 16 self.src_line_cnt = 0 |
17 | 17 |
18 def xint(x): | 18 def xint(x): |
19 if isinstance(x, int): | 19 if isinstance(x, int): |
26 if not filter_dict: | 26 if not filter_dict: |
27 return | 27 return |
28 if filter_dict['filter'] == 'regex': | 28 if filter_dict['filter'] == 'regex': |
29 rgx = re.compile(filter_dict['pattern']) | 29 rgx = re.compile(filter_dict['pattern']) |
30 if filter_dict['action'] == 'exclude_match': | 30 if filter_dict['action'] == 'exclude_match': |
31 self.func = lambda i, l: l if not rgx.match(l) else None | 31 self.func = lambda i, line: line if not rgx.match(line) else None |
32 elif filter_dict['action'] == 'include_match': | 32 elif filter_dict['action'] == 'include_match': |
33 self.func = lambda i, l: l if rgx.match(l) else None | 33 self.func = lambda i, line: line if rgx.match(line) else None |
34 elif filter_dict['action'] == 'exclude_find': | 34 elif filter_dict['action'] == 'exclude_find': |
35 self.func = lambda i, l: l if not rgx.search(l) else None | 35 self.func = lambda i, line: line if not rgx.search(line) else None |
36 elif filter_dict['action'] == 'include_find': | 36 elif filter_dict['action'] == 'include_find': |
37 self.func = lambda i, l: l if rgx.search(l) else None | 37 self.func = lambda i, line: line if rgx.search(line) else None |
38 elif filter_dict['filter'] == 'select_columns': | 38 elif filter_dict['filter'] == 'select_columns': |
39 cols = [int(c) - 1 for c in filter_dict['columns']] | 39 cols = [int(c) - 1 for c in filter_dict['columns']] |
40 self.func = lambda i, l: self.select_columns(l, cols) | 40 self.func = lambda i, line: self.select_columns(line, cols) |
41 elif filter_dict['filter'] == 'select_column_slices': | 41 elif filter_dict['filter'] == 'select_column_slices': |
42 cols = [x if isinstance(x, int) else [y if y is not None else None for y in [xint(k) for k in x.split(':')]] for x in [xint(c) for c in filter_dict['columns']]] | 42 cols = [x if isinstance(x, int) else [y if y is not None else None for y in [xint(k) for k in x.split(':')]] for x in [xint(c) for c in filter_dict['columns']]] |
43 if all([isinstance(x, int) for x in cols]): | 43 if all([isinstance(x, int) for x in cols]): |
44 self.func = lambda i, l: self.select_columns(l, cols) | 44 self.func = lambda i, line: self.select_columns(line, cols) |
45 else: | 45 else: |
46 cols = [slice(x[0], x[1], x[2] if len(x) > 2 else None) if isinstance(x, list) else x for x in cols] | 46 cols = [slice(x[0], x[1], x[2] if len(x) > 2 else None) if isinstance(x, list) else x for x in cols] |
47 self.func = lambda i, l: self.select_slices(l, cols) | 47 self.func = lambda i, line: self.select_slices(line, cols) |
48 elif filter_dict['filter'] == 'replace': | 48 elif filter_dict['filter'] == 'replace': |
49 p = filter_dict['pattern'] | 49 p = filter_dict['pattern'] |
50 r = filter_dict['replace'] | 50 r = filter_dict['replace'] |
51 c = int(filter_dict['column']) - 1 | 51 c = int(filter_dict['column']) - 1 |
52 if 'add' not in filter_dict\ | 52 if 'add' not in filter_dict\ |
53 or filter_dict['add'] not in ['prepend', | 53 or filter_dict['add'] not in ['prepend', |
54 'append', | 54 'append', |
55 'before', | 55 'before', |
56 'after']: | 56 'after']: |
57 self.func = lambda i, l: '\t'.join( | 57 self.func = lambda i, line: '\t'.join( |
58 [x if j != c else re.sub(p, r, x) | 58 [x if j != c else re.sub(p, r, x) |
59 for j, x in enumerate(l.split('\t'))]) | 59 for j, x in enumerate(line.split('\t'))]) |
60 else: | 60 else: |
61 a = 0 if filter_dict['add'] == 'prepend'\ | 61 a = 0 if filter_dict['add'] == 'prepend'\ |
62 else min(0, c - 1) if filter_dict['add'] == 'before'\ | 62 else min(0, c - 1) if filter_dict['add'] == 'before'\ |
63 else c + 1 if filter_dict['add'] == 'after'\ | 63 else c + 1 if filter_dict['add'] == 'after'\ |
64 else None | 64 else None |
65 self.func = lambda i, l: self.replace_add(l, p, r, c, a) | 65 self.func = lambda i, line: self.replace_add(line, p, r, c, a) |
66 elif filter_dict['filter'] == 'prepend_line_num': | 66 elif filter_dict['filter'] == 'prepend_line_num': |
67 self.func = lambda i, l: '%d\t%s' % (i, l) | 67 self.func = lambda i, line: '%d\t%s' % (i, line) |
68 elif filter_dict['filter'] == 'append_line_num': | 68 elif filter_dict['filter'] == 'append_line_num': |
69 self.func = lambda i, l: '%s\t%d' % (l.rstrip('\r\n'), i) | 69 self.func = lambda i, line: '%s\t%d' % (line.rstrip('\r\n'), i) |
70 elif filter_dict['filter'] == 'prepend_text': | 70 elif filter_dict['filter'] == 'prepend_text': |
71 s = filter_dict['column_text'] | 71 s = filter_dict['column_text'] |
72 self.func = lambda i, l: '%s\t%s' % (s, l) | 72 self.func = lambda i, line: '%s\t%s' % (s, line) |
73 elif filter_dict['filter'] == 'append_text': | 73 elif filter_dict['filter'] == 'append_text': |
74 s = filter_dict['column_text'] | 74 s = filter_dict['column_text'] |
75 self.func = lambda i, l: '%s\t%s' % (l.rstrip('\r\n'), s) | 75 self.func = lambda i, line: '%s\t%s' % (line.rstrip('\r\n'), s) |
76 elif filter_dict['filter'] == 'skip': | 76 elif filter_dict['filter'] == 'skip': |
77 cnt = filter_dict['count'] | 77 cnt = filter_dict['count'] |
78 self.func = lambda i, l: l if i > cnt else None | 78 self.func = lambda i, line: line if i > cnt else None |
79 elif filter_dict['filter'] == 'normalize': | 79 elif filter_dict['filter'] == 'normalize': |
80 cols = [int(c) - 1 for c in filter_dict['columns']] | 80 cols = [int(c) - 1 for c in filter_dict['columns']] |
81 sep = filter_dict['separator'] | 81 sep = filter_dict['separator'] |
82 self.func = lambda i, l: self.normalize(l, cols, sep) | 82 self.func = lambda i, line: self.normalize(line, cols, sep) |
83 | 83 |
84 def __iter__(self): | 84 def __iter__(self): |
85 return self | 85 return self |
86 | 86 |
87 def __next__(self): | 87 def __next__(self): |