Mercurial > repos > galaxyp > utilities
view filter_by_an_id.py @ 0:9156a440afed draft default tip
Improved some datatype handling
author | galaxyp |
---|---|
date | Thu, 20 Jun 2013 11:09:24 -0400 |
parents | |
children |
line wrap: on
line source
""" A script to build specific fasta databases """ import sys import logging #===================================== Iterator =============================== class Sequence: ''' Holds protein sequence information ''' def __init__(self): self.header = "" self.sequence_parts = [] def get_sequence(self): return "".join([line.rstrip().replace('\n','').replace('\r','') for line in self.sequence_parts]) class FASTAReader: """ FASTA db iterator. Returns a single FASTA sequence object. """ def __init__(self, fasta_name): self.fasta_file = open(fasta_name) self.next_line = self.fasta_file.readline() def __iter__(self): return self def next(self): ''' Iteration ''' #while True: # line = self.fasta_file.readline() # if not line: # raise StopIteration # if line[0] == '>': # break next_line = self.next_line if not next_line: raise StopIteration seq = Sequence() seq.header = next_line.rstrip().replace('\n','').replace('\r','') next_line = self.fasta_file.readline() while next_line and next_line[0] != '>': #tail = self.fasta_file.tell() #line = self.fasta_file.readline() #if not line: # break #if line[0] == '>': # self.fasta_file.seek(tail) # break seq.sequence_parts.append(next_line) next_line = self.fasta_file.readline() self.next_line = next_line return seq #============================================================================== def target_match(target, search_entry): ''' Matches ''' search_entry = search_entry.upper() for atarget in target: if search_entry.find(atarget) > -1: return atarget return None def main(): ''' the main function''' logging.basicConfig(filename='filter_fasta_log', level=logging.INFO, format='%(asctime)s :: %(levelname)s :: %(message)s',) used_sequences = set() work_summary = {'wanted': 0, 'found':0, 'duplicates':0} targets = [] f_target = open(sys.argv[1]) for line in f_target.readlines(): targets.append(">%s" % line.strip().upper()) f_target.close() logging.info('Read target file and am now looking for %d %s', len(targets), 'sequences.') work_summary['wanted'] = len(targets) homd_db = FASTAReader(sys.argv[2]) i = 0 output = open(sys.argv[3], "w") try: for entry in homd_db: target_matched_results = target_match(targets, entry.header) if target_matched_results: work_summary['found'] += 1 targets.remove(target_matched_results) sequence = entry.get_sequence() if sequence in used_sequences: work_summary['duplicates'] += 1 else: used_sequences.add(sequence) print >>output, entry.header print >>output, sequence finally: output.close() logging.info('Completed filtering') for parm, count in work_summary.iteritems(): logging.info('%s ==> %d', parm, count) if __name__ == "__main__": main()