Mercurial > repos > galaxyp > filter_by_fasta_ids
diff filter_by_fasta_ids.py @ 2:1bd985f14938 draft
planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/filter_by_fasta_ids commit 2bc87e917c91a3b7a43996a0f3752b8992c0c749
author | galaxyp |
---|---|
date | Sat, 28 Apr 2018 03:49:28 -0400 |
parents | 8d15aebf55fd |
children | 3c623e81be77 |
line wrap: on
line diff
--- a/filter_by_fasta_ids.py Tue May 24 13:05:22 2016 -0400 +++ b/filter_by_fasta_ids.py Sat Apr 28 03:49:28 2018 -0400 @@ -1,100 +1,135 @@ #!/usr/bin/env python """ A script to build specific fasta databases """ from __future__ import print_function -import optparse + +import argparse +import re +import sys -# ===================================== Iterator =============================== -class Sequence: - ''' Holds protein sequence information ''' - def __init__(self): - self.header = "" - self.sequence_parts = [] +class Sequence(object): + def __init__(self, header, sequence_parts): + self.header = header + self.sequence_parts = sequence_parts + self._sequence = None - def get_sequence(self): - return "".join([line.rstrip().replace('\n', '').replace('\r', '') for line in self.sequence_parts]) + @property + def sequence(self): + if self._sequence is None: + self._sequence = ''.join(self.sequence_parts) + return self._sequence + + def print(self, fh=sys.stdout): + print(self.header, file=fh) + for line in self.sequence_parts: + print(line, file=fh) -class FASTAReader: - """ - FASTA db iterator. Returns a single FASTA sequence object. - """ - def __init__(self, fasta_name): - self.fasta_file = open(fasta_name) - self.next_line = self.fasta_file.readline() - - def __iter__(self): - return self - - def __next__(self): - ''' Iteration ''' - next_line = self.next_line - if not next_line: - raise StopIteration - - seq = Sequence() - seq.header = next_line.rstrip().replace('\n', '').replace('\r', '') - - next_line = self.fasta_file.readline() - while next_line and next_line[0] != '>': - seq.sequence_parts.append(next_line) - next_line = self.fasta_file.readline() - self.next_line = next_line - return seq - - # Python 2/3 compat - next = __next__ +def FASTAReader_gen(fasta_filename): + with open(fasta_filename) as fasta_file: + line = fasta_file.readline() + while True: + if not line: + return + assert line.startswith('>'), "FASTA headers must start with >" + header = line.rstrip() + sequence_parts = [] + line = fasta_file.readline() + while line and line[0] != '>': + sequence_parts.append(line.rstrip()) + line = fasta_file.readline() + yield Sequence(header, sequence_parts) -def target_match(target, search_entry): +def target_match(targets, header): ''' Matches ''' - search_entry = search_entry.upper() - for atarget in target: - if search_entry.find(atarget) > -1: - return atarget + # Remove '>' and initial spaces from the header + header = header[1:].lstrip().upper() + # Search for an exact match among the targets + if header in targets: + return header + # Try to find an exact match for the first "word" in the header + header = header.split()[0] + if header in targets: + return header return None def main(): ''' the main function''' - parser = optparse.OptionParser() - parser.add_option('--dedup', dest='dedup', action='store_true', default=False, help='Whether to remove duplicate sequences') - (options, args) = parser.parse_args() - - targets = [] + parser = argparse.ArgumentParser() + parser.add_argument('-i', required=True, help='Path to input FASTA file') + parser.add_argument('-o', required=True, help='Path to output FASTA file') + parser.add_argument('-d', help='Path to discarded entries file') + header_criteria = parser.add_mutually_exclusive_group() + header_criteria.add_argument('--id_list', help='Path to the ID list file') + header_criteria.add_argument('--header_regexp', help='Regular expression pattern the header should match') + sequence_criteria = parser.add_mutually_exclusive_group() + sequence_criteria.add_argument('--min_length', type=int, help='Minimum sequence length') + sequence_criteria.add_argument('--sequence_regexp', help='Regular expression pattern the header should match') + parser.add_argument('--max_length', type=int, help='Maximum sequence length') + parser.add_argument('--dedup', action='store_true', default=False, help='Whether to remove duplicate sequences') + options = parser.parse_args() - with open(args[0]) as f_target: - for line in f_target.readlines(): - targets.append(">%s" % line.strip().upper()) + if options.min_length is not None and options.max_length is None: + options.max_length = sys.maxsize + if options.header_regexp: + regexp = re.compile(options.header_regexp) + if options.sequence_regexp: + regexp = re.compile(options.sequence_regexp) - print('Read target file, now looking for %d sequences.' % len(targets)) + work_summary = {'found': 0} - work_summary = {'wanted': len(targets), 'found': 0} if options.dedup: used_sequences = set() work_summary['duplicates'] = 0 - homd_db = FASTAReader(args[1]) - with open(args[2], "w") as output: + if options.id_list: + targets = [] + with open(options.id_list) as f_target: + for line in f_target.readlines(): + targets.append(line.strip().upper()) + work_summary['wanted'] = len(targets) + + homd_db = FASTAReader_gen(options.i) + if options.d: + discarded = open(options.d, 'w') + + with open(options.o, "w") as output: for entry in homd_db: - target_matched_results = target_match(targets, entry.header) - if target_matched_results: - work_summary['found'] += 1 - targets.remove(target_matched_results) - sequence = entry.get_sequence() + print_entry = True + if options.id_list: + target_matched_results = target_match(targets, entry.header) + if target_matched_results: + work_summary['found'] += 1 + targets.remove(target_matched_results) + else: + print_entry = False + elif options.header_regexp: + if regexp.search(entry.header) is None: + print_entry = False + if options.min_length is not None: + sequence_length = len(entry.sequence) + if not(options.min_length <= sequence_length <= options.max_length): + print_entry = False + elif options.sequence_regexp: + if regexp.search(entry.sequence) is None: + print_entry = False + if print_entry: if options.dedup: - if sequence in used_sequences: + if entry.sequence in used_sequences: work_summary['duplicates'] += 1 continue else: - used_sequences.add(sequence) - print(entry.header, file=output) - print(sequence, file=output) + used_sequences.add(entry.sequence) + entry.print(output) + elif options.d: + entry.print(discarded) - print('Completed filtering.') for parm, count in work_summary.items(): print('%s ==> %d' % (parm, count)) + if __name__ == "__main__": main()