Mercurial > repos > galaxyp > filter_by_fasta_ids
comparison filter_by_fasta_ids.py @ 2:1bd985f14938 draft
planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/filter_by_fasta_ids commit 2bc87e917c91a3b7a43996a0f3752b8992c0c749
author | galaxyp |
---|---|
date | Sat, 28 Apr 2018 03:49:28 -0400 |
parents | 8d15aebf55fd |
children | 3c623e81be77 |
comparison
equal
deleted
inserted
replaced
1:8d15aebf55fd | 2:1bd985f14938 |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 """ A script to build specific fasta databases """ | 2 """ A script to build specific fasta databases """ |
3 from __future__ import print_function | 3 from __future__ import print_function |
4 import optparse | 4 |
5 import argparse | |
6 import re | |
7 import sys | |
5 | 8 |
6 | 9 |
7 # ===================================== Iterator =============================== | 10 class Sequence(object): |
8 class Sequence: | 11 def __init__(self, header, sequence_parts): |
9 ''' Holds protein sequence information ''' | 12 self.header = header |
10 def __init__(self): | 13 self.sequence_parts = sequence_parts |
11 self.header = "" | 14 self._sequence = None |
12 self.sequence_parts = [] | |
13 | 15 |
14 def get_sequence(self): | 16 @property |
15 return "".join([line.rstrip().replace('\n', '').replace('\r', '') for line in self.sequence_parts]) | 17 def sequence(self): |
18 if self._sequence is None: | |
19 self._sequence = ''.join(self.sequence_parts) | |
20 return self._sequence | |
21 | |
22 def print(self, fh=sys.stdout): | |
23 print(self.header, file=fh) | |
24 for line in self.sequence_parts: | |
25 print(line, file=fh) | |
16 | 26 |
17 | 27 |
18 class FASTAReader: | 28 def FASTAReader_gen(fasta_filename): |
19 """ | 29 with open(fasta_filename) as fasta_file: |
20 FASTA db iterator. Returns a single FASTA sequence object. | 30 line = fasta_file.readline() |
21 """ | 31 while True: |
22 def __init__(self, fasta_name): | 32 if not line: |
23 self.fasta_file = open(fasta_name) | 33 return |
24 self.next_line = self.fasta_file.readline() | 34 assert line.startswith('>'), "FASTA headers must start with >" |
25 | 35 header = line.rstrip() |
26 def __iter__(self): | 36 sequence_parts = [] |
27 return self | 37 line = fasta_file.readline() |
28 | 38 while line and line[0] != '>': |
29 def __next__(self): | 39 sequence_parts.append(line.rstrip()) |
30 ''' Iteration ''' | 40 line = fasta_file.readline() |
31 next_line = self.next_line | 41 yield Sequence(header, sequence_parts) |
32 if not next_line: | |
33 raise StopIteration | |
34 | |
35 seq = Sequence() | |
36 seq.header = next_line.rstrip().replace('\n', '').replace('\r', '') | |
37 | |
38 next_line = self.fasta_file.readline() | |
39 while next_line and next_line[0] != '>': | |
40 seq.sequence_parts.append(next_line) | |
41 next_line = self.fasta_file.readline() | |
42 self.next_line = next_line | |
43 return seq | |
44 | |
45 # Python 2/3 compat | |
46 next = __next__ | |
47 | 42 |
48 | 43 |
49 def target_match(target, search_entry): | 44 def target_match(targets, header): |
50 ''' Matches ''' | 45 ''' Matches ''' |
51 search_entry = search_entry.upper() | 46 # Remove '>' and initial spaces from the header |
52 for atarget in target: | 47 header = header[1:].lstrip().upper() |
53 if search_entry.find(atarget) > -1: | 48 # Search for an exact match among the targets |
54 return atarget | 49 if header in targets: |
50 return header | |
51 # Try to find an exact match for the first "word" in the header | |
52 header = header.split()[0] | |
53 if header in targets: | |
54 return header | |
55 return None | 55 return None |
56 | 56 |
57 | 57 |
58 def main(): | 58 def main(): |
59 ''' the main function''' | 59 ''' the main function''' |
60 | 60 |
61 parser = optparse.OptionParser() | 61 parser = argparse.ArgumentParser() |
62 parser.add_option('--dedup', dest='dedup', action='store_true', default=False, help='Whether to remove duplicate sequences') | 62 parser.add_argument('-i', required=True, help='Path to input FASTA file') |
63 (options, args) = parser.parse_args() | 63 parser.add_argument('-o', required=True, help='Path to output FASTA file') |
64 parser.add_argument('-d', help='Path to discarded entries file') | |
65 header_criteria = parser.add_mutually_exclusive_group() | |
66 header_criteria.add_argument('--id_list', help='Path to the ID list file') | |
67 header_criteria.add_argument('--header_regexp', help='Regular expression pattern the header should match') | |
68 sequence_criteria = parser.add_mutually_exclusive_group() | |
69 sequence_criteria.add_argument('--min_length', type=int, help='Minimum sequence length') | |
70 sequence_criteria.add_argument('--sequence_regexp', help='Regular expression pattern the header should match') | |
71 parser.add_argument('--max_length', type=int, help='Maximum sequence length') | |
72 parser.add_argument('--dedup', action='store_true', default=False, help='Whether to remove duplicate sequences') | |
73 options = parser.parse_args() | |
64 | 74 |
65 targets = [] | 75 if options.min_length is not None and options.max_length is None: |
76 options.max_length = sys.maxsize | |
77 if options.header_regexp: | |
78 regexp = re.compile(options.header_regexp) | |
79 if options.sequence_regexp: | |
80 regexp = re.compile(options.sequence_regexp) | |
66 | 81 |
67 with open(args[0]) as f_target: | 82 work_summary = {'found': 0} |
68 for line in f_target.readlines(): | |
69 targets.append(">%s" % line.strip().upper()) | |
70 | 83 |
71 print('Read target file, now looking for %d sequences.' % len(targets)) | |
72 | |
73 work_summary = {'wanted': len(targets), 'found': 0} | |
74 if options.dedup: | 84 if options.dedup: |
75 used_sequences = set() | 85 used_sequences = set() |
76 work_summary['duplicates'] = 0 | 86 work_summary['duplicates'] = 0 |
77 homd_db = FASTAReader(args[1]) | |
78 | 87 |
79 with open(args[2], "w") as output: | 88 if options.id_list: |
89 targets = [] | |
90 with open(options.id_list) as f_target: | |
91 for line in f_target.readlines(): | |
92 targets.append(line.strip().upper()) | |
93 work_summary['wanted'] = len(targets) | |
94 | |
95 homd_db = FASTAReader_gen(options.i) | |
96 if options.d: | |
97 discarded = open(options.d, 'w') | |
98 | |
99 with open(options.o, "w") as output: | |
80 for entry in homd_db: | 100 for entry in homd_db: |
81 target_matched_results = target_match(targets, entry.header) | 101 print_entry = True |
82 if target_matched_results: | 102 if options.id_list: |
83 work_summary['found'] += 1 | 103 target_matched_results = target_match(targets, entry.header) |
84 targets.remove(target_matched_results) | 104 if target_matched_results: |
85 sequence = entry.get_sequence() | 105 work_summary['found'] += 1 |
106 targets.remove(target_matched_results) | |
107 else: | |
108 print_entry = False | |
109 elif options.header_regexp: | |
110 if regexp.search(entry.header) is None: | |
111 print_entry = False | |
112 if options.min_length is not None: | |
113 sequence_length = len(entry.sequence) | |
114 if not(options.min_length <= sequence_length <= options.max_length): | |
115 print_entry = False | |
116 elif options.sequence_regexp: | |
117 if regexp.search(entry.sequence) is None: | |
118 print_entry = False | |
119 if print_entry: | |
86 if options.dedup: | 120 if options.dedup: |
87 if sequence in used_sequences: | 121 if entry.sequence in used_sequences: |
88 work_summary['duplicates'] += 1 | 122 work_summary['duplicates'] += 1 |
89 continue | 123 continue |
90 else: | 124 else: |
91 used_sequences.add(sequence) | 125 used_sequences.add(entry.sequence) |
92 print(entry.header, file=output) | 126 entry.print(output) |
93 print(sequence, file=output) | 127 elif options.d: |
128 entry.print(discarded) | |
94 | 129 |
95 print('Completed filtering.') | |
96 for parm, count in work_summary.items(): | 130 for parm, count in work_summary.items(): |
97 print('%s ==> %d' % (parm, count)) | 131 print('%s ==> %d' % (parm, count)) |
98 | 132 |
133 | |
99 if __name__ == "__main__": | 134 if __name__ == "__main__": |
100 main() | 135 main() |