view screen_abricate_report.py @ 4:22247b1a59d5 draft default tip

"planemo upload for repository https://github.com/public-health-bioinformatics/galaxy_tools/blob/master/tools/screen_abricate_report commit ae10b1f55b2356bea735a994619288f7575430cb"
author public-health-bioinformatics
date Fri, 03 Jan 2020 14:30:22 -0500
parents 2262e531c50b
children
line wrap: on
line source

#!/usr/bin/env python

from __future__ import print_function

import argparse
import csv
import re


class Range(object):
    """
    Used to limit the min_coverage and min_identity args to range 0.0 - 100.0
    """
    def __init__(self, start, end):
        self.start = start
        self.end = end

    def __eq__(self, other):
        return self.start <= other <= self.end

    def __contains__(self, item):
        return self.__eq__(item)

    def __iter__(self):
        yield self

    def __repr__(self):
        return str(self.start) + " - " + str(self.end)


def parse_screen_file(screen_file):
    screen = []
    with open(screen_file) as f:
        reader = csv.DictReader(f, delimiter="\t", quotechar='"')
        for row in reader:
            screen.append(row)
    return screen


def get_fieldnames(input_file):
    with open(input_file) as f:
        reader = csv.DictReader(f, delimiter="\t", quotechar='"')
        row = next(reader)
    fieldnames = row.keys()
    return fieldnames


def detect_gene(abricate_report_row, regex, min_coverage, min_identity):
    gene_of_interest = bool(re.search(regex, abricate_report_row['GENE']))
    sufficient_coverage = float(abricate_report_row['%COVERAGE']) >= min_coverage
    sufficient_identity = float(abricate_report_row['%IDENTITY']) >= min_identity
    if gene_of_interest and sufficient_coverage and sufficient_identity:
        return True
    else:
        return False


def main(args):
    screen = parse_screen_file(args.screening_file)
    gene_detection_status_fieldnames = ['gene_name', 'detected', 'alleles']
    with open(args.abricate_report, 'r') as f1, \
            open(args.screened_report, 'w') as f2, \
            open(args.gene_detection_status, 'w') as f3:
        abricate_report_reader = csv.DictReader(f1, delimiter="\t", quotechar='"')
        screened_report_writer = csv.DictWriter(f2, delimiter="\t", quotechar='"',
                                                fieldnames=abricate_report_reader.fieldnames)
        gene_detection_status_writer = csv.DictWriter(f3, delimiter="\t", quotechar='"',
                                                      fieldnames=gene_detection_status_fieldnames)
        screened_report_writer.writeheader()
        gene_detection_status_writer.writeheader()

        for gene in screen:
            gene_detection_status = {
                'gene_name': gene['gene_name'],
                'detected': False,
                'alleles': None,
            }
            for abricate_report_row in abricate_report_reader:
                if detect_gene(abricate_report_row, gene['regex'], args.min_coverage, args.min_identity):
                    gene_detection_status['detected'] = True
                    if not gene_detection_status['alleles']:
                        gene_detection_status['alleles'] = abricate_report_row['GENE']
                    else:
                        gene_detection_status['alleles'] = ",".join([
                            gene_detection_status['alleles'],
                            abricate_report_row['GENE'],
                        ])
                    screened_report_writer.writerow(abricate_report_row)
            gene_detection_status_writer.writerow(gene_detection_status)
            f1.seek(0)  # return file pointer to start of abricate report
            next(abricate_report_reader)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("abricate_report", help="Input: Abricate report to screen (tsv)")
    parser.add_argument("--screening_file", help="Input: List of genes to screen for (tsv)")
    parser.add_argument("--screened_report",
                        help=("Output: Screened abricate report, including only genes of interest (tsv)"))
    parser.add_argument("--gene_detection_status",
                        help=("Output: detection status for all genes listed in the screening file (tsv)"))
    parser.add_argument("--min_coverage", type=float,  default=90.0,
                        choices=Range(0.0, 100.0),
                        help=("Minimum percent coverage (default 90.0)"))
    parser.add_argument("--min_identity", type=float, default=90.0,
                        choices=Range(0.0, 100.0),
                        help=("Minimum percent identity (default 90.0)"))
    parser.add_argument("-v", "--version", action='version', version='%(prog)s 0.4.0')
    args = parser.parse_args()
    main(args)