Mercurial > repos > galaxyp > filter_by_fasta_ids
view filter_by_fasta_ids.py @ 1:8d15aebf55fd draft
planemo upload commit 88309fbfadbafe82f2d8fb7b96468799f2421e30
author | galaxyp |
---|---|
date | Tue, 24 May 2016 13:05:22 -0400 |
parents | |
children | 1bd985f14938 |
line wrap: on
line source
#!/usr/bin/env python """ A script to build specific fasta databases """ from __future__ import print_function import optparse # ===================================== Iterator =============================== class Sequence: ''' Holds protein sequence information ''' def __init__(self): self.header = "" self.sequence_parts = [] def get_sequence(self): return "".join([line.rstrip().replace('\n', '').replace('\r', '') for line in self.sequence_parts]) class FASTAReader: """ FASTA db iterator. Returns a single FASTA sequence object. """ def __init__(self, fasta_name): self.fasta_file = open(fasta_name) self.next_line = self.fasta_file.readline() def __iter__(self): return self def __next__(self): ''' Iteration ''' next_line = self.next_line if not next_line: raise StopIteration seq = Sequence() seq.header = next_line.rstrip().replace('\n', '').replace('\r', '') next_line = self.fasta_file.readline() while next_line and next_line[0] != '>': seq.sequence_parts.append(next_line) next_line = self.fasta_file.readline() self.next_line = next_line return seq # Python 2/3 compat next = __next__ def target_match(target, search_entry): ''' Matches ''' search_entry = search_entry.upper() for atarget in target: if search_entry.find(atarget) > -1: return atarget return None def main(): ''' the main function''' parser = optparse.OptionParser() parser.add_option('--dedup', dest='dedup', action='store_true', default=False, help='Whether to remove duplicate sequences') (options, args) = parser.parse_args() targets = [] with open(args[0]) as f_target: for line in f_target.readlines(): targets.append(">%s" % line.strip().upper()) print('Read target file, now looking for %d sequences.' % len(targets)) work_summary = {'wanted': len(targets), 'found': 0} if options.dedup: used_sequences = set() work_summary['duplicates'] = 0 homd_db = FASTAReader(args[1]) with open(args[2], "w") as output: for entry in homd_db: target_matched_results = target_match(targets, entry.header) if target_matched_results: work_summary['found'] += 1 targets.remove(target_matched_results) sequence = entry.get_sequence() if options.dedup: if sequence in used_sequences: work_summary['duplicates'] += 1 continue else: used_sequences.add(sequence) print(entry.header, file=output) print(sequence, file=output) print('Completed filtering.') for parm, count in work_summary.items(): print('%s ==> %d' % (parm, count)) if __name__ == "__main__": main()