Mercurial > repos > iuc > concoct_extract_fasta_bins
view coverage_table.py @ 1:a04028a8181d draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/concoct commit 40a09cbfd6052f7b0295946621db1bdf58228b09"
author | iuc |
---|---|
date | Sun, 13 Mar 2022 08:44:08 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python import argparse import gzip from functools import partial from Bio import SeqIO def generate_coverage_table(input_fasta, input_tabular, gzipped, output): # Read input file into a dict and return everything # in the table format required by CONCOCT. gc_and_len_dict = get_gc_and_len_dict(input_fasta, gzipped) assert(len(gc_and_len_dict) > 0) bed_coverage_dict = get_bed_coverage_dict(input_tabular) with open(output, 'w') as fh: # Output the header. fh.write("contig\tlength") t = tuple(range(len(bed_coverage_dict))) fh.write("\tcov_mean_sample_%d\n" % len(t)) # Output the content. for acc in gc_and_len_dict: # Fasta stats. fh.write("%s\t%s" % (acc, gc_and_len_dict[acc]['length'])) # Mean try: # Coverage mean fh.write("\t%f" % (bed_coverage_dict[acc]["cov_mean"])) except KeyError: # No reads mapped to this contig fh.write("\t0") fh.write("\n") def get_bed_coverage_dict(input_tabular): # Ddetermine mean coverage and percentage covered # for each contig, returning a dict with fasta id # as key and percentage covered and cov_mean as keys # for the inner dict. out_dict = {} with open(input_tabular, 'r') as fh: for line in fh: line = line.rstrip('\r\n') cols = line.split('\t') try: d = out_dict[cols[0]] except KeyError: d = {} out_dict[cols[0]] = d if int(cols[1]) == 0: d["percentage_covered"] = 100 - float(cols[4]) * 100.0 else: d["cov_mean"] = d.get("cov_mean", 0) + int(cols[1]) * float(cols[4]) return out_dict def get_gc_and_len_dict(input_fasta, gzipped): # Creates a dictionary with the fasta id as key # and GC and length as keys for the inner dictionary. if gzipped: _open = partial(gzip.open, mode='rt') else: _open = open out_dict = {} with _open(input_fasta) as input_fh: for rec in SeqIO.parse(input_fh, "fasta"): out_dict[rec.id] = {} out_dict[rec.id]["length"] = len(rec.seq) return out_dict parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('--input_tabular', action='store', dest='input_tabular', help='bedtools genomeCoverageBed bed file') parser.add_argument('--input_fasta', action='store', dest='input_fasta', help='Contigs fasta file') parser.add_argument("--gzipped", action="store_true", dest="gzipped", default=False, help="input_fasta is gzipped") parser.add_argument('--output', action='store', dest='output', help='Output file') args = parser.parse_args() generate_coverage_table(args.input_fasta, args.input_tabular, args.gzipped, args.output)