view extract_genomic_dna.py @ 11:80414c33a59a draft default tip

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/extract_genomic_dna commit 6db2d98b513e4980788fcba49d809c91e5750296
author iuc
date Thu, 21 Nov 2024 07:20:29 +0000
parents e400dcbc60d0
children
line wrap: on
line source

#!/usr/bin/env python
from __future__ import print_function

import argparse
import os

import bx.seq.nib
import bx.seq.twobit
from bx.intervals.io import Comment, Header

import extract_genomic_dna_utils as egdu  # noqa: I100,I202

parser = argparse.ArgumentParser()
parser.add_argument("--input_format", dest="input_format", help="Input dataset format")
parser.add_argument("--input", dest="input", help="Input dataset")
parser.add_argument("--genome", dest="genome", help="Input dataset genome build")
parser.add_argument(
    "--interpret_features",
    dest="interpret_features",
    default=None,
    help="Interpret features if input format is gff",
)
parser.add_argument("--columns", dest="columns", help="Columns to use in input file")
parser.add_argument(
    "--reference_genome_source",
    dest="reference_genome_source",
    help="Source of reference genome file",
)
parser.add_argument(
    "--reference_genome", dest="reference_genome", help="Reference genome file"
)
parser.add_argument("--output_format", dest="output_format", help="Output format")
parser.add_argument(
    "--fasta_header_type",
    dest="fasta_header_type",
    default=None,
    help="Fasta header format",
)
parser.add_argument(
    "--fasta_header_delimiter",
    dest="fasta_header_delimiter",
    default=None,
    help="Fasta header field delimiter",
)
parser.add_argument("--output", dest="output", help="Output dataset")
args = parser.parse_args()

input_is_gff = args.input_format == "gff"
interpret_features = input_is_gff and args.interpret_features == "yes"
if len(args.columns.split(",")) == 5:
    # Bed file.
    chrom_col, start_col, end_col, strand_col, name_col = egdu.parse_cols_arg(
        args.columns
    )
else:
    # Gff file.
    chrom_col, start_col, end_col, strand_col = egdu.parse_cols_arg(args.columns)
    name_col = False

if args.reference_genome_source == "history":
    seq_path = egdu.convert_to_twobit(args.reference_genome)
else:
    seq_path = args.reference_genome
seq_dir = os.path.split(seq_path)[0]

includes_strand_col = strand_col >= 0
strand = None
nibs = {}
skipped_lines = 0
first_invalid_line = 0
invalid_lines = []
warnings = []
warning = ""
twobitfile = None
line_count = 1
file_iterator = open(args.input)
if interpret_features:
    file_iterator = egdu.GFFReaderWrapper(file_iterator, fix_strand=False)
out = open(args.output, "wt")

for feature in file_iterator:
    # Ignore comments, headers.
    if isinstance(feature, (Header, Comment)):
        line_count += 1
        continue
    name = ""
    if interpret_features:
        # Processing features.
        egdu.convert_gff_coords_to_bed(feature)
        chrom = feature.chrom
        start = feature.start
        end = feature.end
        strand = feature.strand
    else:
        # Processing lines, either interval or GFF format.
        line = feature.rstrip("\r\n")
        if line and not line.startswith("#"):
            fields = line.split("\t")
            try:
                chrom = fields[chrom_col]
                start = int(fields[start_col])
                end = int(fields[end_col])
                if name_col:
                    name = fields[name_col]
                if input_is_gff:
                    start, end = egdu.convert_gff_coords_to_bed([start, end])
                if includes_strand_col:
                    strand = fields[strand_col]
            except Exception:
                warning = "Invalid chrom, start or end column values. "
                warnings.append(warning)
                if not invalid_lines:
                    invalid_lines = egdu.get_lines(feature)
                    first_invalid_line = line_count
                skipped_lines += len(invalid_lines)
                continue
            if start > end:
                warning = "Invalid interval, start '%d' > end '%d'.  " % (start, end)
                warnings.append(warning)
                if not invalid_lines:
                    invalid_lines = egdu.get_lines(feature)
                    first_invalid_line = line_count
                skipped_lines += len(invalid_lines)
                continue
            if strand not in ["+", "-"]:
                strand = "+"
            sequence = ""
        else:
            continue
    # Open sequence file and get sequence for feature/interval.
    if os.path.exists("%s/%s.nib" % (seq_dir, chrom)):
        if chrom in nibs:
            nib = nibs[chrom]
        else:
            nibs[chrom] = nib = bx.seq.nib.NibFile(
                open("%s/%s.nib" % (seq_path, chrom))
            )
        try:
            sequence = nib.get(start, end - start)
        except Exception:
            warning = (
                "Unable to fetch the sequence from '%d' to '%d' for build '%s'. "
                % (start, end - start, args.genome)
            )
            warnings.append(warning)
            if not invalid_lines:
                invalid_lines = egdu.get_lines(feature)
                first_invalid_line = line_count
            skipped_lines += len(invalid_lines)
            continue
    elif os.path.isfile(seq_path):
        if not (twobitfile):
            twobitfile = bx.seq.twobit.TwoBitFile(open(seq_path))
        try:
            if interpret_features:
                # Create sequence from intervals within a feature.
                sequence = ""
                for interval in feature.intervals:
                    sequence += twobitfile[interval.chrom][
                        interval.start: interval.end
                    ]
            else:
                sequence = twobitfile[chrom][start:end]
        except Exception:
            warning = (
                "Unable to fetch the sequence from '%d' to '%d' for chrom '%s'. "
                % (start, end - start, chrom)
            )
            warnings.append(warning)
            if not invalid_lines:
                invalid_lines = egdu.get_lines(feature)
                first_invalid_line = line_count
            skipped_lines += len(invalid_lines)
            continue
    else:
        warning = "Chromosome by name '%s' was not found for build '%s'. " % (
            chrom,
            args.genome,
        )
        warnings.append(warning)
        if not invalid_lines:
            invalid_lines = egdu.get_lines(feature)
            first_invalid_line = line_count
        skipped_lines += len(invalid_lines)
        continue
    if sequence == "":
        warning = (
            "Chrom: '%s', start: '%d', end: '%d' is either invalid or not present in build '%s'. "
            % (chrom, start, end, args.genome)
        )
        warnings.append(warning)
        if not invalid_lines:
            invalid_lines = egdu.get_lines(feature)
            first_invalid_line = line_count
        skipped_lines += len(invalid_lines)
        continue
    if includes_strand_col and strand == "-":
        sequence = egdu.reverse_complement(sequence)
    if args.output_format == "fasta":
        if input_is_gff:
            start, end = egdu.convert_bed_coords_to_gff([start, end])
        if args.fasta_header_type == "bedtools_getfasta_default":
            out.write(
                ">%s\n"
                % egdu.get_bedtools_getfasta_default_header(
                    str(chrom), str(start), str(end), strand, includes_strand_col
                )
            )
        else:
            # args.fasta_header_type == "char_delimited":
            fields = [args.genome, str(chrom), str(start), str(end), strand]
            field_delimiter = egdu.get_fasta_header_delimiter(
                args.fasta_header_delimiter
            )
            meta_data = field_delimiter.join(fields)
            if name.strip():
                out.write(">%s %s\n" % (meta_data, name))
            else:
                out.write(">%s\n" % meta_data)
        c = 0
        sequence_length = len(sequence)
        while c < sequence_length:
            b = min(c + 50, sequence_length)
            out.write("%s\n" % str(sequence[c:b]))
            c = b
    else:
        # output_format == "interval".
        if interpret_features:
            meta_data = "\t".join(
                [
                    feature.chrom,
                    "galaxy_extract_genomic_dna",
                    "interval",
                    str(feature.start),
                    str(feature.end),
                    feature.score,
                    feature.strand,
                    ".",
                    egdu.gff_attributes_to_str(feature.attributes, "GTF"),
                ]
            )
        else:
            # Here fields was set up around line 73.
            meta_data = "\t".join(fields)
        if input_is_gff:
            format_str = '%s seq "%s";\n'
        else:
            format_str = "%s\t%s\n"
        out.write(format_str % (meta_data, str(sequence)))
    # Update line count.
    if isinstance(feature, egdu.GFFFeature):
        line_count += len(feature.intervals)
    else:
        line_count += 1
out.close()

if warnings:
    warn_msg = "%d warnings, 1st is: " % len(warnings)
    warn_msg += warnings[0]
    print(warn_msg)
if skipped_lines:
    # Error message includes up to the first 10 skipped lines.
    print(
        'Skipped %d invalid lines, 1st is #%d, "%s"'
        % (skipped_lines, first_invalid_line, "\n".join(invalid_lines[:10]))
    )

if args.reference_genome_source == "history":
    os.remove(seq_path)