view cpt_export_seq_unique/gff3_extract_sequence.py @ 0:aaed5a0c774c draft

Uploaded
author cpt
date Fri, 01 Jul 2022 13:43:49 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python
import sys
import argparse
import logging
import uuid
from CPT_GFFParser import gffParse, gffWrite
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.SeqFeature import FeatureLocation, CompoundLocation
from gff3 import feature_lambda, feature_test_type, get_id

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)


def main(fasta, gff3, feature_filter=None, nodesc=False):
    if feature_filter == "nice_cds":
        from gff2gb import gff3_to_genbank as cpt_Gff2Gbk
        

        for rec in cpt_Gff2Gbk(gff3, fasta, 11):
            seenList = {}
            if rec.seq[0] == "?":
                sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
                exit(1)
            for feat in sorted(rec.features, key=lambda x: x.location.start):
                if feat.type != "CDS":
                    continue

                ind = 0
                if (
                    str(
                        feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-")
                    )
                    in seenList.keys()
                ):
                    seenList[
                        str(
                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
                                " ", "-"
                            )
                        )
                    ] += 1
                    ind = seenList[
                        str(
                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
                                " ", "-"
                            )
                        )
                    ]
                else:
                    seenList[
                        str(
                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
                                " ", "-"
                            )
                        )
                    ] = 1
                append = ""
                if ind != 0:
                    append = "_" + str(ind)

                if nodesc:
                    description = ""
                else:
                    feat.qualifiers["ID"] = [feat._ID]
                    product = feat.qualifiers.get("product", "")
                    description = "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format(
                        feat, product
                    )
                yield [
                    SeqRecord(
                        feat.extract(rec).seq,
                        id=str(
                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
                                " ", "-"
                            )
                        )
                        + append,
                        description=description,
                    )
                ]

    elif feature_filter == "unique_cds":
        seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
        seen_ids = {}

        for rec in gffParse(gff3, base_dict=seq_dict):
            noMatch = True
            if "Alias" in rec.features[0].qualifiers.keys():
                lColumn = rec.features[0].qualifiers["Alias"][0]
            else:
                lColumn = ""
            for x in seq_dict:
                if x == rec.id or x == lColumn:
                    noMatch = False
            if noMatch:
                sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
                exit(1)
            newfeats = []
            for feat in sorted(
                feature_lambda(
                    rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False
                ),
                key=lambda f: f.location.start,
            ):
                nid = rec.id + "____" + feat.id
                if nid in seen_ids:
                    nid = nid + "__" + uuid.uuid4().hex
                feat.qualifiers["ID"] = [nid]
                newfeats.append(feat)
                seen_ids[nid] = True

                if nodesc:
                    description = ""
                else:
                    if feat.strand == -1:
                      important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)}
                    else:
                      important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)}
                    if "Name" in feat.qualifiers:
                        important_data["Name"] = feat.qualifiers.get("Name", [""])[0]

                    description = "[{}]".format(
                        ";".join(
                            [
                                "{key}={value}".format(key=k, value=v)
                                for (k, v) in important_data.items()
                            ]
                        )
                    )
                #if feat.id == "CPT_Privateer_006.p01":
                #print(feat)
                #exit()
                
                if isinstance(feat.location, CompoundLocation):
                  finSeq = ""
                  if feat.strand == -1:
                    for x in feat.location.parts:
                      finSeq += str((rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement())
                  else:
                    for x in feat.location.parts:
                      finSeq += str(rec.seq[feat.location.start + feat.phase: feat.location.end])
                  yield [
                    SeqRecord(
                        finSeq,
                        id=nid.replace(" ", "-"),
                        description=description,
                    )
                  ]
                elif feat.strand == -1:
                  yield [
                    SeqRecord(
                        (rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement(),
                        id=nid.replace(" ", "-"),
                        description=description,
                    )
                  ]
                else:
                  yield [
                    SeqRecord(
                        #feat.extract(rec).seq,
                        rec.seq[feat.location.start + feat.phase: feat.location.end],
                        id=nid.replace(" ", "-"),
                        description=description,
                    )
                  ]
            rec.features = newfeats
            rec.annotations = {}
            #gffWrite([rec], sys.stdout)
    else:
        seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
        
        for rec in gffParse(gff3, base_dict=seq_dict):
            noMatch = True
            if "Alias" in rec.features[0].qualifiers.keys():
                lColumn = rec.features[0].qualifiers["Alias"][0]
            else:
                lColumn = ""
            for x in seq_dict:
                if x == rec.id or x == lColumn:
                    noMatch = False
            if noMatch:
                sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
                exit(1)
            for feat in sorted(
                feature_lambda(
                    rec.features,
                    feature_test_type,
                    {"type": feature_filter},
                    subfeatures=True,
                ),
                key=lambda f: f.location.start,
            ):
                id = feat.id
                if len(id) == 0:
                    id = get_id(feat)

                if nodesc:
                    description = ""
                else:
                    if feat.strand == -1:
                      important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)}
                    else:
                      important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)}
                    if "Name" in feat.qualifiers:
                        important_data["Name"] = feat.qualifiers.get("Name", [""])[0]

                    description = "[{}]".format(
                        ";".join(
                            [
                                "{key}={value}".format(key=k, value=v)
                                for (k, v) in important_data.items()
                            ]
                        )
                    )

                if isinstance(feat.location, CompoundLocation):
                  finSeq = ""
                  if feat.strand == -1:
                    for x in feat.location.parts:
                      finSeq += str((rec.seq[x.start: x.end - feat.phase]).reverse_complement())
                  else:
                    for x in feat.location.parts:
                      finSeq += str(rec.seq[x.start + feat.phase: x.end])
                  yield [
                    SeqRecord(
                        Seq(finSeq),
                        id=id.replace(" ", "-"),
                        description=description,
                    )
                  ]

                else:

                  if feat.strand == -1:
                    yield [
                      SeqRecord(
                          seq=Seq(str(rec.seq[feat.location.start: feat.location.end - feat.phase])).reverse_complement(),
                          id=id.replace(" ", "-"),
                          description=description,
                      )
                    ]
                  else:
                    yield [
                      SeqRecord(
                          #feat.extract(rec).seq,
                          seq=Seq(str(rec.seq[feat.location.start + feat.phase: feat.location.end])),
                          id=id.replace(" ", "-"),
                          description=description,
                      )
                    ]


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Export corresponding sequence in genome from GFF3", epilog=""
    )
    parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
    parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File")
    parser.add_argument(
        "--feature_filter", default=None, help="Filter for specific feature types"
    )
    parser.add_argument(
        "--nodesc", action="store_true", help="Strip description field off"
    )
    args = parser.parse_args()
    for seq in main(**vars(args)):
        #if isinstance(seq, list):
        #  for x in seq:
        #    print(type(x.seq))
        #    SeqIO.write(x, sys.stdout, "fasta")
        #else:
          SeqIO.write(seq, sys.stdout, "fasta")