Mercurial > repos > iuc > filter_tabular
comparison filters.py @ 15:90f657745fea draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/query_tabular commit dd35055c76d86fe98985b5825c1751efb8208242
| author | iuc |
|---|---|
| date | Thu, 27 Jun 2024 17:23:47 +0000 |
| parents | 4d5aae46f850 |
| children |
comparison
equal
deleted
inserted
replaced
| 14:557ec8d7087d | 15:90f657745fea |
|---|---|
| 9 | 9 |
| 10 class LineFilter(object): | 10 class LineFilter(object): |
| 11 def __init__(self, source, filter_dict): | 11 def __init__(self, source, filter_dict): |
| 12 self.source = source | 12 self.source = source |
| 13 self.filter_dict = filter_dict | 13 self.filter_dict = filter_dict |
| 14 self.func = lambda i, l: l.rstrip('\r\n') if l else None | 14 self.func = lambda i, line: line.rstrip('\r\n') if line else None |
| 15 self.src_lines = [] | 15 self.src_lines = [] |
| 16 self.src_line_cnt = 0 | 16 self.src_line_cnt = 0 |
| 17 | 17 |
| 18 def xint(x): | 18 def xint(x): |
| 19 if isinstance(x, int): | 19 if isinstance(x, int): |
| 26 if not filter_dict: | 26 if not filter_dict: |
| 27 return | 27 return |
| 28 if filter_dict['filter'] == 'regex': | 28 if filter_dict['filter'] == 'regex': |
| 29 rgx = re.compile(filter_dict['pattern']) | 29 rgx = re.compile(filter_dict['pattern']) |
| 30 if filter_dict['action'] == 'exclude_match': | 30 if filter_dict['action'] == 'exclude_match': |
| 31 self.func = lambda i, l: l if not rgx.match(l) else None | 31 self.func = lambda i, line: line if not rgx.match(line) else None |
| 32 elif filter_dict['action'] == 'include_match': | 32 elif filter_dict['action'] == 'include_match': |
| 33 self.func = lambda i, l: l if rgx.match(l) else None | 33 self.func = lambda i, line: line if rgx.match(line) else None |
| 34 elif filter_dict['action'] == 'exclude_find': | 34 elif filter_dict['action'] == 'exclude_find': |
| 35 self.func = lambda i, l: l if not rgx.search(l) else None | 35 self.func = lambda i, line: line if not rgx.search(line) else None |
| 36 elif filter_dict['action'] == 'include_find': | 36 elif filter_dict['action'] == 'include_find': |
| 37 self.func = lambda i, l: l if rgx.search(l) else None | 37 self.func = lambda i, line: line if rgx.search(line) else None |
| 38 elif filter_dict['filter'] == 'select_columns': | 38 elif filter_dict['filter'] == 'select_columns': |
| 39 cols = [int(c) - 1 for c in filter_dict['columns']] | 39 cols = [int(c) - 1 for c in filter_dict['columns']] |
| 40 self.func = lambda i, l: self.select_columns(l, cols) | 40 self.func = lambda i, line: self.select_columns(line, cols) |
| 41 elif filter_dict['filter'] == 'select_column_slices': | 41 elif filter_dict['filter'] == 'select_column_slices': |
| 42 cols = [x if isinstance(x, int) else [y if y is not None else None for y in [xint(k) for k in x.split(':')]] for x in [xint(c) for c in filter_dict['columns']]] | 42 cols = [x if isinstance(x, int) else [y if y is not None else None for y in [xint(k) for k in x.split(':')]] for x in [xint(c) for c in filter_dict['columns']]] |
| 43 if all([isinstance(x, int) for x in cols]): | 43 if all([isinstance(x, int) for x in cols]): |
| 44 self.func = lambda i, l: self.select_columns(l, cols) | 44 self.func = lambda i, line: self.select_columns(line, cols) |
| 45 else: | 45 else: |
| 46 cols = [slice(x[0], x[1], x[2] if len(x) > 2 else None) if isinstance(x, list) else x for x in cols] | 46 cols = [slice(x[0], x[1], x[2] if len(x) > 2 else None) if isinstance(x, list) else x for x in cols] |
| 47 self.func = lambda i, l: self.select_slices(l, cols) | 47 self.func = lambda i, line: self.select_slices(line, cols) |
| 48 elif filter_dict['filter'] == 'replace': | 48 elif filter_dict['filter'] == 'replace': |
| 49 p = filter_dict['pattern'] | 49 p = filter_dict['pattern'] |
| 50 r = filter_dict['replace'] | 50 r = filter_dict['replace'] |
| 51 c = int(filter_dict['column']) - 1 | 51 c = int(filter_dict['column']) - 1 |
| 52 if 'add' not in filter_dict\ | 52 if 'add' not in filter_dict\ |
| 53 or filter_dict['add'] not in ['prepend', | 53 or filter_dict['add'] not in ['prepend', |
| 54 'append', | 54 'append', |
| 55 'before', | 55 'before', |
| 56 'after']: | 56 'after']: |
| 57 self.func = lambda i, l: '\t'.join( | 57 self.func = lambda i, line: '\t'.join( |
| 58 [x if j != c else re.sub(p, r, x) | 58 [x if j != c else re.sub(p, r, x) |
| 59 for j, x in enumerate(l.split('\t'))]) | 59 for j, x in enumerate(line.split('\t'))]) |
| 60 else: | 60 else: |
| 61 a = 0 if filter_dict['add'] == 'prepend'\ | 61 a = 0 if filter_dict['add'] == 'prepend'\ |
| 62 else min(0, c - 1) if filter_dict['add'] == 'before'\ | 62 else min(0, c - 1) if filter_dict['add'] == 'before'\ |
| 63 else c + 1 if filter_dict['add'] == 'after'\ | 63 else c + 1 if filter_dict['add'] == 'after'\ |
| 64 else None | 64 else None |
| 65 self.func = lambda i, l: self.replace_add(l, p, r, c, a) | 65 self.func = lambda i, line: self.replace_add(line, p, r, c, a) |
| 66 elif filter_dict['filter'] == 'prepend_line_num': | 66 elif filter_dict['filter'] == 'prepend_line_num': |
| 67 self.func = lambda i, l: '%d\t%s' % (i, l) | 67 self.func = lambda i, line: '%d\t%s' % (i, line) |
| 68 elif filter_dict['filter'] == 'append_line_num': | 68 elif filter_dict['filter'] == 'append_line_num': |
| 69 self.func = lambda i, l: '%s\t%d' % (l.rstrip('\r\n'), i) | 69 self.func = lambda i, line: '%s\t%d' % (line.rstrip('\r\n'), i) |
| 70 elif filter_dict['filter'] == 'prepend_text': | 70 elif filter_dict['filter'] == 'prepend_text': |
| 71 s = filter_dict['column_text'] | 71 s = filter_dict['column_text'] |
| 72 self.func = lambda i, l: '%s\t%s' % (s, l) | 72 self.func = lambda i, line: '%s\t%s' % (s, line) |
| 73 elif filter_dict['filter'] == 'append_text': | 73 elif filter_dict['filter'] == 'append_text': |
| 74 s = filter_dict['column_text'] | 74 s = filter_dict['column_text'] |
| 75 self.func = lambda i, l: '%s\t%s' % (l.rstrip('\r\n'), s) | 75 self.func = lambda i, line: '%s\t%s' % (line.rstrip('\r\n'), s) |
| 76 elif filter_dict['filter'] == 'skip': | 76 elif filter_dict['filter'] == 'skip': |
| 77 cnt = filter_dict['count'] | 77 cnt = filter_dict['count'] |
| 78 self.func = lambda i, l: l if i > cnt else None | 78 self.func = lambda i, line: line if i > cnt else None |
| 79 elif filter_dict['filter'] == 'normalize': | 79 elif filter_dict['filter'] == 'normalize': |
| 80 cols = [int(c) - 1 for c in filter_dict['columns']] | 80 cols = [int(c) - 1 for c in filter_dict['columns']] |
| 81 sep = filter_dict['separator'] | 81 sep = filter_dict['separator'] |
| 82 self.func = lambda i, l: self.normalize(l, cols, sep) | 82 self.func = lambda i, line: self.normalize(line, cols, sep) |
| 83 | 83 |
| 84 def __iter__(self): | 84 def __iter__(self): |
| 85 return self | 85 return self |
| 86 | 86 |
| 87 def __next__(self): | 87 def __next__(self): |
