diff filter_by_fasta_ids.py @ 2:1bd985f14938 draft

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/filter_by_fasta_ids commit 2bc87e917c91a3b7a43996a0f3752b8992c0c749
author galaxyp
date Sat, 28 Apr 2018 03:49:28 -0400
parents 8d15aebf55fd
children 3c623e81be77
line wrap: on
line diff
--- a/filter_by_fasta_ids.py	Tue May 24 13:05:22 2016 -0400
+++ b/filter_by_fasta_ids.py	Sat Apr 28 03:49:28 2018 -0400
@@ -1,100 +1,135 @@
 #!/usr/bin/env python
 """ A script to build specific fasta databases """
 from __future__ import print_function
-import optparse
+
+import argparse
+import re
+import sys
 
 
-# ===================================== Iterator ===============================
-class Sequence:
-    ''' Holds protein sequence information '''
-    def __init__(self):
-        self.header = ""
-        self.sequence_parts = []
+class Sequence(object):
+    def __init__(self, header, sequence_parts):
+        self.header = header
+        self.sequence_parts = sequence_parts
+        self._sequence = None
 
-    def get_sequence(self):
-        return "".join([line.rstrip().replace('\n', '').replace('\r', '') for line in self.sequence_parts])
+    @property
+    def sequence(self):
+        if self._sequence is None:
+            self._sequence = ''.join(self.sequence_parts)
+        return self._sequence
+
+    def print(self, fh=sys.stdout):
+        print(self.header, file=fh)
+        for line in self.sequence_parts:
+            print(line, file=fh)
 
 
-class FASTAReader:
-    """
-        FASTA db iterator. Returns a single FASTA sequence object.
-    """
-    def __init__(self, fasta_name):
-        self.fasta_file = open(fasta_name)
-        self.next_line = self.fasta_file.readline()
-
-    def __iter__(self):
-        return self
-
-    def __next__(self):
-        ''' Iteration '''
-        next_line = self.next_line
-        if not next_line:
-            raise StopIteration
-
-        seq = Sequence()
-        seq.header = next_line.rstrip().replace('\n', '').replace('\r', '')
-
-        next_line = self.fasta_file.readline()
-        while next_line and next_line[0] != '>':
-            seq.sequence_parts.append(next_line)
-            next_line = self.fasta_file.readline()
-        self.next_line = next_line
-        return seq
-
-    # Python 2/3 compat
-    next = __next__
+def FASTAReader_gen(fasta_filename):
+    with open(fasta_filename) as fasta_file:
+        line = fasta_file.readline()
+        while True:
+            if not line:
+                return
+            assert line.startswith('>'), "FASTA headers must start with >"
+            header = line.rstrip()
+            sequence_parts = []
+            line = fasta_file.readline()
+            while line and line[0] != '>':
+                sequence_parts.append(line.rstrip())
+                line = fasta_file.readline()
+            yield Sequence(header, sequence_parts)
 
 
-def target_match(target, search_entry):
+def target_match(targets, header):
     ''' Matches '''
-    search_entry = search_entry.upper()
-    for atarget in target:
-        if search_entry.find(atarget) > -1:
-            return atarget
+    # Remove '>' and initial spaces from the header
+    header = header[1:].lstrip().upper()
+    # Search for an exact match among the targets
+    if header in targets:
+        return header
+    # Try to find an exact match for the first "word" in the header
+    header = header.split()[0]
+    if header in targets:
+        return header
     return None
 
 
 def main():
     ''' the main function'''
 
-    parser = optparse.OptionParser()
-    parser.add_option('--dedup', dest='dedup', action='store_true', default=False, help='Whether to remove duplicate sequences')
-    (options, args) = parser.parse_args()
-
-    targets = []
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-i', required=True, help='Path to input FASTA file')
+    parser.add_argument('-o', required=True, help='Path to output FASTA file')
+    parser.add_argument('-d', help='Path to discarded entries file')
+    header_criteria = parser.add_mutually_exclusive_group()
+    header_criteria.add_argument('--id_list', help='Path to the ID list file')
+    header_criteria.add_argument('--header_regexp', help='Regular expression pattern the header should match')
+    sequence_criteria = parser.add_mutually_exclusive_group()
+    sequence_criteria.add_argument('--min_length', type=int, help='Minimum sequence length')
+    sequence_criteria.add_argument('--sequence_regexp', help='Regular expression pattern the header should match')
+    parser.add_argument('--max_length', type=int, help='Maximum sequence length')
+    parser.add_argument('--dedup', action='store_true', default=False, help='Whether to remove duplicate sequences')
+    options = parser.parse_args()
 
-    with open(args[0]) as f_target:
-        for line in f_target.readlines():
-            targets.append(">%s" % line.strip().upper())
+    if options.min_length is not None and options.max_length is None:
+        options.max_length = sys.maxsize
+    if options.header_regexp:
+        regexp = re.compile(options.header_regexp)
+    if options.sequence_regexp:
+        regexp = re.compile(options.sequence_regexp)
 
-    print('Read target file, now looking for %d sequences.' % len(targets))
+    work_summary = {'found': 0}
 
-    work_summary = {'wanted': len(targets), 'found': 0}
     if options.dedup:
         used_sequences = set()
         work_summary['duplicates'] = 0
-    homd_db = FASTAReader(args[1])
 
-    with open(args[2], "w") as output:
+    if options.id_list:
+        targets = []
+        with open(options.id_list) as f_target:
+            for line in f_target.readlines():
+                targets.append(line.strip().upper())
+        work_summary['wanted'] = len(targets)
+
+    homd_db = FASTAReader_gen(options.i)
+    if options.d:
+        discarded = open(options.d, 'w')
+
+    with open(options.o, "w") as output:
         for entry in homd_db:
-            target_matched_results = target_match(targets, entry.header)
-            if target_matched_results:
-                work_summary['found'] += 1
-                targets.remove(target_matched_results)
-                sequence = entry.get_sequence()
+            print_entry = True
+            if options.id_list:
+                target_matched_results = target_match(targets, entry.header)
+                if target_matched_results:
+                    work_summary['found'] += 1
+                    targets.remove(target_matched_results)
+                else:
+                    print_entry = False
+            elif options.header_regexp:
+                if regexp.search(entry.header) is None:
+                    print_entry = False
+            if options.min_length is not None:
+                sequence_length = len(entry.sequence)
+                if not(options.min_length <= sequence_length <= options.max_length):
+                    print_entry = False
+            elif options.sequence_regexp:
+                if regexp.search(entry.sequence) is None:
+                    print_entry = False
+            if print_entry:
                 if options.dedup:
-                    if sequence in used_sequences:
+                    if entry.sequence in used_sequences:
                         work_summary['duplicates'] += 1
                         continue
                     else:
-                        used_sequences.add(sequence)
-                print(entry.header, file=output)
-                print(sequence, file=output)
+                        used_sequences.add(entry.sequence)
+                entry.print(output)
+            elif options.d:
+                entry.print(discarded)
 
-    print('Completed filtering.')
     for parm, count in work_summary.items():
         print('%s ==> %d' % (parm, count))
 
+
 if __name__ == "__main__":
     main()