view filter_by_fasta_ids.py @ 2:1bd985f14938 draft

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/filter_by_fasta_ids commit 2bc87e917c91a3b7a43996a0f3752b8992c0c749
author galaxyp
date Sat, 28 Apr 2018 03:49:28 -0400
parents 8d15aebf55fd
children 3c623e81be77
line wrap: on
line source

#!/usr/bin/env python
""" A script to build specific fasta databases """
from __future__ import print_function

import argparse
import re
import sys


class Sequence(object):
    def __init__(self, header, sequence_parts):
        self.header = header
        self.sequence_parts = sequence_parts
        self._sequence = None

    @property
    def sequence(self):
        if self._sequence is None:
            self._sequence = ''.join(self.sequence_parts)
        return self._sequence

    def print(self, fh=sys.stdout):
        print(self.header, file=fh)
        for line in self.sequence_parts:
            print(line, file=fh)


def FASTAReader_gen(fasta_filename):
    with open(fasta_filename) as fasta_file:
        line = fasta_file.readline()
        while True:
            if not line:
                return
            assert line.startswith('>'), "FASTA headers must start with >"
            header = line.rstrip()
            sequence_parts = []
            line = fasta_file.readline()
            while line and line[0] != '>':
                sequence_parts.append(line.rstrip())
                line = fasta_file.readline()
            yield Sequence(header, sequence_parts)


def target_match(targets, header):
    ''' Matches '''
    # Remove '>' and initial spaces from the header
    header = header[1:].lstrip().upper()
    # Search for an exact match among the targets
    if header in targets:
        return header
    # Try to find an exact match for the first "word" in the header
    header = header.split()[0]
    if header in targets:
        return header
    return None


def main():
    ''' the main function'''

    parser = argparse.ArgumentParser()
    parser.add_argument('-i', required=True, help='Path to input FASTA file')
    parser.add_argument('-o', required=True, help='Path to output FASTA file')
    parser.add_argument('-d', help='Path to discarded entries file')
    header_criteria = parser.add_mutually_exclusive_group()
    header_criteria.add_argument('--id_list', help='Path to the ID list file')
    header_criteria.add_argument('--header_regexp', help='Regular expression pattern the header should match')
    sequence_criteria = parser.add_mutually_exclusive_group()
    sequence_criteria.add_argument('--min_length', type=int, help='Minimum sequence length')
    sequence_criteria.add_argument('--sequence_regexp', help='Regular expression pattern the header should match')
    parser.add_argument('--max_length', type=int, help='Maximum sequence length')
    parser.add_argument('--dedup', action='store_true', default=False, help='Whether to remove duplicate sequences')
    options = parser.parse_args()

    if options.min_length is not None and options.max_length is None:
        options.max_length = sys.maxsize
    if options.header_regexp:
        regexp = re.compile(options.header_regexp)
    if options.sequence_regexp:
        regexp = re.compile(options.sequence_regexp)

    work_summary = {'found': 0}

    if options.dedup:
        used_sequences = set()
        work_summary['duplicates'] = 0

    if options.id_list:
        targets = []
        with open(options.id_list) as f_target:
            for line in f_target.readlines():
                targets.append(line.strip().upper())
        work_summary['wanted'] = len(targets)

    homd_db = FASTAReader_gen(options.i)
    if options.d:
        discarded = open(options.d, 'w')

    with open(options.o, "w") as output:
        for entry in homd_db:
            print_entry = True
            if options.id_list:
                target_matched_results = target_match(targets, entry.header)
                if target_matched_results:
                    work_summary['found'] += 1
                    targets.remove(target_matched_results)
                else:
                    print_entry = False
            elif options.header_regexp:
                if regexp.search(entry.header) is None:
                    print_entry = False
            if options.min_length is not None:
                sequence_length = len(entry.sequence)
                if not(options.min_length <= sequence_length <= options.max_length):
                    print_entry = False
            elif options.sequence_regexp:
                if regexp.search(entry.sequence) is None:
                    print_entry = False
            if print_entry:
                if options.dedup:
                    if entry.sequence in used_sequences:
                        work_summary['duplicates'] += 1
                        continue
                    else:
                        used_sequences.add(entry.sequence)
                entry.print(output)
            elif options.d:
                entry.print(discarded)

    for parm, count in work_summary.items():
        print('%s ==> %d' % (parm, count))


if __name__ == "__main__":
    main()