comparison filter_by_fasta_ids.py @ 2:1bd985f14938 draft

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/filter_by_fasta_ids commit 2bc87e917c91a3b7a43996a0f3752b8992c0c749
author galaxyp
date Sat, 28 Apr 2018 03:49:28 -0400
parents 8d15aebf55fd
children 3c623e81be77
comparison
equal deleted inserted replaced
1:8d15aebf55fd 2:1bd985f14938
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 """ A script to build specific fasta databases """ 2 """ A script to build specific fasta databases """
3 from __future__ import print_function 3 from __future__ import print_function
4 import optparse 4
5 import argparse
6 import re
7 import sys
5 8
6 9
7 # ===================================== Iterator =============================== 10 class Sequence(object):
8 class Sequence: 11 def __init__(self, header, sequence_parts):
9 ''' Holds protein sequence information ''' 12 self.header = header
10 def __init__(self): 13 self.sequence_parts = sequence_parts
11 self.header = "" 14 self._sequence = None
12 self.sequence_parts = []
13 15
14 def get_sequence(self): 16 @property
15 return "".join([line.rstrip().replace('\n', '').replace('\r', '') for line in self.sequence_parts]) 17 def sequence(self):
18 if self._sequence is None:
19 self._sequence = ''.join(self.sequence_parts)
20 return self._sequence
21
22 def print(self, fh=sys.stdout):
23 print(self.header, file=fh)
24 for line in self.sequence_parts:
25 print(line, file=fh)
16 26
17 27
18 class FASTAReader: 28 def FASTAReader_gen(fasta_filename):
19 """ 29 with open(fasta_filename) as fasta_file:
20 FASTA db iterator. Returns a single FASTA sequence object. 30 line = fasta_file.readline()
21 """ 31 while True:
22 def __init__(self, fasta_name): 32 if not line:
23 self.fasta_file = open(fasta_name) 33 return
24 self.next_line = self.fasta_file.readline() 34 assert line.startswith('>'), "FASTA headers must start with >"
25 35 header = line.rstrip()
26 def __iter__(self): 36 sequence_parts = []
27 return self 37 line = fasta_file.readline()
28 38 while line and line[0] != '>':
29 def __next__(self): 39 sequence_parts.append(line.rstrip())
30 ''' Iteration ''' 40 line = fasta_file.readline()
31 next_line = self.next_line 41 yield Sequence(header, sequence_parts)
32 if not next_line:
33 raise StopIteration
34
35 seq = Sequence()
36 seq.header = next_line.rstrip().replace('\n', '').replace('\r', '')
37
38 next_line = self.fasta_file.readline()
39 while next_line and next_line[0] != '>':
40 seq.sequence_parts.append(next_line)
41 next_line = self.fasta_file.readline()
42 self.next_line = next_line
43 return seq
44
45 # Python 2/3 compat
46 next = __next__
47 42
48 43
49 def target_match(target, search_entry): 44 def target_match(targets, header):
50 ''' Matches ''' 45 ''' Matches '''
51 search_entry = search_entry.upper() 46 # Remove '>' and initial spaces from the header
52 for atarget in target: 47 header = header[1:].lstrip().upper()
53 if search_entry.find(atarget) > -1: 48 # Search for an exact match among the targets
54 return atarget 49 if header in targets:
50 return header
51 # Try to find an exact match for the first "word" in the header
52 header = header.split()[0]
53 if header in targets:
54 return header
55 return None 55 return None
56 56
57 57
58 def main(): 58 def main():
59 ''' the main function''' 59 ''' the main function'''
60 60
61 parser = optparse.OptionParser() 61 parser = argparse.ArgumentParser()
62 parser.add_option('--dedup', dest='dedup', action='store_true', default=False, help='Whether to remove duplicate sequences') 62 parser.add_argument('-i', required=True, help='Path to input FASTA file')
63 (options, args) = parser.parse_args() 63 parser.add_argument('-o', required=True, help='Path to output FASTA file')
64 parser.add_argument('-d', help='Path to discarded entries file')
65 header_criteria = parser.add_mutually_exclusive_group()
66 header_criteria.add_argument('--id_list', help='Path to the ID list file')
67 header_criteria.add_argument('--header_regexp', help='Regular expression pattern the header should match')
68 sequence_criteria = parser.add_mutually_exclusive_group()
69 sequence_criteria.add_argument('--min_length', type=int, help='Minimum sequence length')
70 sequence_criteria.add_argument('--sequence_regexp', help='Regular expression pattern the header should match')
71 parser.add_argument('--max_length', type=int, help='Maximum sequence length')
72 parser.add_argument('--dedup', action='store_true', default=False, help='Whether to remove duplicate sequences')
73 options = parser.parse_args()
64 74
65 targets = [] 75 if options.min_length is not None and options.max_length is None:
76 options.max_length = sys.maxsize
77 if options.header_regexp:
78 regexp = re.compile(options.header_regexp)
79 if options.sequence_regexp:
80 regexp = re.compile(options.sequence_regexp)
66 81
67 with open(args[0]) as f_target: 82 work_summary = {'found': 0}
68 for line in f_target.readlines():
69 targets.append(">%s" % line.strip().upper())
70 83
71 print('Read target file, now looking for %d sequences.' % len(targets))
72
73 work_summary = {'wanted': len(targets), 'found': 0}
74 if options.dedup: 84 if options.dedup:
75 used_sequences = set() 85 used_sequences = set()
76 work_summary['duplicates'] = 0 86 work_summary['duplicates'] = 0
77 homd_db = FASTAReader(args[1])
78 87
79 with open(args[2], "w") as output: 88 if options.id_list:
89 targets = []
90 with open(options.id_list) as f_target:
91 for line in f_target.readlines():
92 targets.append(line.strip().upper())
93 work_summary['wanted'] = len(targets)
94
95 homd_db = FASTAReader_gen(options.i)
96 if options.d:
97 discarded = open(options.d, 'w')
98
99 with open(options.o, "w") as output:
80 for entry in homd_db: 100 for entry in homd_db:
81 target_matched_results = target_match(targets, entry.header) 101 print_entry = True
82 if target_matched_results: 102 if options.id_list:
83 work_summary['found'] += 1 103 target_matched_results = target_match(targets, entry.header)
84 targets.remove(target_matched_results) 104 if target_matched_results:
85 sequence = entry.get_sequence() 105 work_summary['found'] += 1
106 targets.remove(target_matched_results)
107 else:
108 print_entry = False
109 elif options.header_regexp:
110 if regexp.search(entry.header) is None:
111 print_entry = False
112 if options.min_length is not None:
113 sequence_length = len(entry.sequence)
114 if not(options.min_length <= sequence_length <= options.max_length):
115 print_entry = False
116 elif options.sequence_regexp:
117 if regexp.search(entry.sequence) is None:
118 print_entry = False
119 if print_entry:
86 if options.dedup: 120 if options.dedup:
87 if sequence in used_sequences: 121 if entry.sequence in used_sequences:
88 work_summary['duplicates'] += 1 122 work_summary['duplicates'] += 1
89 continue 123 continue
90 else: 124 else:
91 used_sequences.add(sequence) 125 used_sequences.add(entry.sequence)
92 print(entry.header, file=output) 126 entry.print(output)
93 print(sequence, file=output) 127 elif options.d:
128 entry.print(discarded)
94 129
95 print('Completed filtering.')
96 for parm, count in work_summary.items(): 130 for parm, count in work_summary.items():
97 print('%s ==> %d' % (parm, count)) 131 print('%s ==> %d' % (parm, count))
98 132
133
99 if __name__ == "__main__": 134 if __name__ == "__main__":
100 main() 135 main()