diff cpt_export_seq_unique/gff3_extract_sequence.py @ 0:aaed5a0c774c draft

Uploaded
author cpt
date Fri, 01 Jul 2022 13:43:49 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cpt_export_seq_unique/gff3_extract_sequence.py	Fri Jul 01 13:43:49 2022 +0000
@@ -0,0 +1,275 @@
+#!/usr/bin/env python
+import sys
+import argparse
+import logging
+import uuid
+from CPT_GFFParser import gffParse, gffWrite
+from Bio import SeqIO
+from Bio.Seq import Seq
+from Bio.SeqRecord import SeqRecord
+from Bio.SeqFeature import FeatureLocation, CompoundLocation
+from gff3 import feature_lambda, feature_test_type, get_id
+
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger(__name__)
+
+
+def main(fasta, gff3, feature_filter=None, nodesc=False):
+    if feature_filter == "nice_cds":
+        from gff2gb import gff3_to_genbank as cpt_Gff2Gbk
+        
+
+        for rec in cpt_Gff2Gbk(gff3, fasta, 11):
+            seenList = {}
+            if rec.seq[0] == "?":
+                sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
+                exit(1)
+            for feat in sorted(rec.features, key=lambda x: x.location.start):
+                if feat.type != "CDS":
+                    continue
+
+                ind = 0
+                if (
+                    str(
+                        feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-")
+                    )
+                    in seenList.keys()
+                ):
+                    seenList[
+                        str(
+                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
+                                " ", "-"
+                            )
+                        )
+                    ] += 1
+                    ind = seenList[
+                        str(
+                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
+                                " ", "-"
+                            )
+                        )
+                    ]
+                else:
+                    seenList[
+                        str(
+                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
+                                " ", "-"
+                            )
+                        )
+                    ] = 1
+                append = ""
+                if ind != 0:
+                    append = "_" + str(ind)
+
+                if nodesc:
+                    description = ""
+                else:
+                    feat.qualifiers["ID"] = [feat._ID]
+                    product = feat.qualifiers.get("product", "")
+                    description = "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format(
+                        feat, product
+                    )
+                yield [
+                    SeqRecord(
+                        feat.extract(rec).seq,
+                        id=str(
+                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
+                                " ", "-"
+                            )
+                        )
+                        + append,
+                        description=description,
+                    )
+                ]
+
+    elif feature_filter == "unique_cds":
+        seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
+        seen_ids = {}
+
+        for rec in gffParse(gff3, base_dict=seq_dict):
+            noMatch = True
+            if "Alias" in rec.features[0].qualifiers.keys():
+                lColumn = rec.features[0].qualifiers["Alias"][0]
+            else:
+                lColumn = ""
+            for x in seq_dict:
+                if x == rec.id or x == lColumn:
+                    noMatch = False
+            if noMatch:
+                sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
+                exit(1)
+            newfeats = []
+            for feat in sorted(
+                feature_lambda(
+                    rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False
+                ),
+                key=lambda f: f.location.start,
+            ):
+                nid = rec.id + "____" + feat.id
+                if nid in seen_ids:
+                    nid = nid + "__" + uuid.uuid4().hex
+                feat.qualifiers["ID"] = [nid]
+                newfeats.append(feat)
+                seen_ids[nid] = True
+
+                if nodesc:
+                    description = ""
+                else:
+                    if feat.strand == -1:
+                      important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)}
+                    else:
+                      important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)}
+                    if "Name" in feat.qualifiers:
+                        important_data["Name"] = feat.qualifiers.get("Name", [""])[0]
+
+                    description = "[{}]".format(
+                        ";".join(
+                            [
+                                "{key}={value}".format(key=k, value=v)
+                                for (k, v) in important_data.items()
+                            ]
+                        )
+                    )
+                #if feat.id == "CPT_Privateer_006.p01":
+                #print(feat)
+                #exit()
+                
+                if isinstance(feat.location, CompoundLocation):
+                  finSeq = ""
+                  if feat.strand == -1:
+                    for x in feat.location.parts:
+                      finSeq += str((rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement())
+                  else:
+                    for x in feat.location.parts:
+                      finSeq += str(rec.seq[feat.location.start + feat.phase: feat.location.end])
+                  yield [
+                    SeqRecord(
+                        finSeq,
+                        id=nid.replace(" ", "-"),
+                        description=description,
+                    )
+                  ]
+                elif feat.strand == -1:
+                  yield [
+                    SeqRecord(
+                        (rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement(),
+                        id=nid.replace(" ", "-"),
+                        description=description,
+                    )
+                  ]
+                else:
+                  yield [
+                    SeqRecord(
+                        #feat.extract(rec).seq,
+                        rec.seq[feat.location.start + feat.phase: feat.location.end],
+                        id=nid.replace(" ", "-"),
+                        description=description,
+                    )
+                  ]
+            rec.features = newfeats
+            rec.annotations = {}
+            #gffWrite([rec], sys.stdout)
+    else:
+        seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
+        
+        for rec in gffParse(gff3, base_dict=seq_dict):
+            noMatch = True
+            if "Alias" in rec.features[0].qualifiers.keys():
+                lColumn = rec.features[0].qualifiers["Alias"][0]
+            else:
+                lColumn = ""
+            for x in seq_dict:
+                if x == rec.id or x == lColumn:
+                    noMatch = False
+            if noMatch:
+                sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
+                exit(1)
+            for feat in sorted(
+                feature_lambda(
+                    rec.features,
+                    feature_test_type,
+                    {"type": feature_filter},
+                    subfeatures=True,
+                ),
+                key=lambda f: f.location.start,
+            ):
+                id = feat.id
+                if len(id) == 0:
+                    id = get_id(feat)
+
+                if nodesc:
+                    description = ""
+                else:
+                    if feat.strand == -1:
+                      important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)}
+                    else:
+                      important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)}
+                    if "Name" in feat.qualifiers:
+                        important_data["Name"] = feat.qualifiers.get("Name", [""])[0]
+
+                    description = "[{}]".format(
+                        ";".join(
+                            [
+                                "{key}={value}".format(key=k, value=v)
+                                for (k, v) in important_data.items()
+                            ]
+                        )
+                    )
+
+                if isinstance(feat.location, CompoundLocation):
+                  finSeq = ""
+                  if feat.strand == -1:
+                    for x in feat.location.parts:
+                      finSeq += str((rec.seq[x.start: x.end - feat.phase]).reverse_complement())
+                  else:
+                    for x in feat.location.parts:
+                      finSeq += str(rec.seq[x.start + feat.phase: x.end])
+                  yield [
+                    SeqRecord(
+                        Seq(finSeq),
+                        id=id.replace(" ", "-"),
+                        description=description,
+                    )
+                  ]
+
+                else:
+
+                  if feat.strand == -1:
+                    yield [
+                      SeqRecord(
+                          seq=Seq(str(rec.seq[feat.location.start: feat.location.end - feat.phase])).reverse_complement(),
+                          id=id.replace(" ", "-"),
+                          description=description,
+                      )
+                    ]
+                  else:
+                    yield [
+                      SeqRecord(
+                          #feat.extract(rec).seq,
+                          seq=Seq(str(rec.seq[feat.location.start + feat.phase: feat.location.end])),
+                          id=id.replace(" ", "-"),
+                          description=description,
+                      )
+                    ]
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(
+        description="Export corresponding sequence in genome from GFF3", epilog=""
+    )
+    parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
+    parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File")
+    parser.add_argument(
+        "--feature_filter", default=None, help="Filter for specific feature types"
+    )
+    parser.add_argument(
+        "--nodesc", action="store_true", help="Strip description field off"
+    )
+    args = parser.parse_args()
+    for seq in main(**vars(args)):
+        #if isinstance(seq, list):
+        #  for x in seq:
+        #    print(type(x.seq))
+        #    SeqIO.write(x, sys.stdout, "fasta")
+        #else:
+          SeqIO.write(seq, sys.stdout, "fasta")