Mercurial > repos > cpt > cpt_export_seq_unique
diff cpt_export_seq_unique/gff3_extract_sequence.py @ 0:aaed5a0c774c draft
Uploaded
author | cpt |
---|---|
date | Fri, 01 Jul 2022 13:43:49 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cpt_export_seq_unique/gff3_extract_sequence.py Fri Jul 01 13:43:49 2022 +0000 @@ -0,0 +1,275 @@ +#!/usr/bin/env python +import sys +import argparse +import logging +import uuid +from CPT_GFFParser import gffParse, gffWrite +from Bio import SeqIO +from Bio.Seq import Seq +from Bio.SeqRecord import SeqRecord +from Bio.SeqFeature import FeatureLocation, CompoundLocation +from gff3 import feature_lambda, feature_test_type, get_id + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +def main(fasta, gff3, feature_filter=None, nodesc=False): + if feature_filter == "nice_cds": + from gff2gb import gff3_to_genbank as cpt_Gff2Gbk + + + for rec in cpt_Gff2Gbk(gff3, fasta, 11): + seenList = {} + if rec.seq[0] == "?": + sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") + exit(1) + for feat in sorted(rec.features, key=lambda x: x.location.start): + if feat.type != "CDS": + continue + + ind = 0 + if ( + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-") + ) + in seenList.keys() + ): + seenList[ + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + ] += 1 + ind = seenList[ + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + ] + else: + seenList[ + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + ] = 1 + append = "" + if ind != 0: + append = "_" + str(ind) + + if nodesc: + description = "" + else: + feat.qualifiers["ID"] = [feat._ID] + product = feat.qualifiers.get("product", "") + description = "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format( + feat, product + ) + yield [ + SeqRecord( + feat.extract(rec).seq, + id=str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + + append, + description=description, + ) + ] + + elif feature_filter == "unique_cds": + seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) + seen_ids = {} + + for rec in gffParse(gff3, base_dict=seq_dict): + noMatch = True + if "Alias" in rec.features[0].qualifiers.keys(): + lColumn = rec.features[0].qualifiers["Alias"][0] + else: + lColumn = "" + for x in seq_dict: + if x == rec.id or x == lColumn: + noMatch = False + if noMatch: + sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") + exit(1) + newfeats = [] + for feat in sorted( + feature_lambda( + rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False + ), + key=lambda f: f.location.start, + ): + nid = rec.id + "____" + feat.id + if nid in seen_ids: + nid = nid + "__" + uuid.uuid4().hex + feat.qualifiers["ID"] = [nid] + newfeats.append(feat) + seen_ids[nid] = True + + if nodesc: + description = "" + else: + if feat.strand == -1: + important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)} + else: + important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)} + if "Name" in feat.qualifiers: + important_data["Name"] = feat.qualifiers.get("Name", [""])[0] + + description = "[{}]".format( + ";".join( + [ + "{key}={value}".format(key=k, value=v) + for (k, v) in important_data.items() + ] + ) + ) + #if feat.id == "CPT_Privateer_006.p01": + #print(feat) + #exit() + + if isinstance(feat.location, CompoundLocation): + finSeq = "" + if feat.strand == -1: + for x in feat.location.parts: + finSeq += str((rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement()) + else: + for x in feat.location.parts: + finSeq += str(rec.seq[feat.location.start + feat.phase: feat.location.end]) + yield [ + SeqRecord( + finSeq, + id=nid.replace(" ", "-"), + description=description, + ) + ] + elif feat.strand == -1: + yield [ + SeqRecord( + (rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement(), + id=nid.replace(" ", "-"), + description=description, + ) + ] + else: + yield [ + SeqRecord( + #feat.extract(rec).seq, + rec.seq[feat.location.start + feat.phase: feat.location.end], + id=nid.replace(" ", "-"), + description=description, + ) + ] + rec.features = newfeats + rec.annotations = {} + #gffWrite([rec], sys.stdout) + else: + seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) + + for rec in gffParse(gff3, base_dict=seq_dict): + noMatch = True + if "Alias" in rec.features[0].qualifiers.keys(): + lColumn = rec.features[0].qualifiers["Alias"][0] + else: + lColumn = "" + for x in seq_dict: + if x == rec.id or x == lColumn: + noMatch = False + if noMatch: + sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") + exit(1) + for feat in sorted( + feature_lambda( + rec.features, + feature_test_type, + {"type": feature_filter}, + subfeatures=True, + ), + key=lambda f: f.location.start, + ): + id = feat.id + if len(id) == 0: + id = get_id(feat) + + if nodesc: + description = "" + else: + if feat.strand == -1: + important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)} + else: + important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)} + if "Name" in feat.qualifiers: + important_data["Name"] = feat.qualifiers.get("Name", [""])[0] + + description = "[{}]".format( + ";".join( + [ + "{key}={value}".format(key=k, value=v) + for (k, v) in important_data.items() + ] + ) + ) + + if isinstance(feat.location, CompoundLocation): + finSeq = "" + if feat.strand == -1: + for x in feat.location.parts: + finSeq += str((rec.seq[x.start: x.end - feat.phase]).reverse_complement()) + else: + for x in feat.location.parts: + finSeq += str(rec.seq[x.start + feat.phase: x.end]) + yield [ + SeqRecord( + Seq(finSeq), + id=id.replace(" ", "-"), + description=description, + ) + ] + + else: + + if feat.strand == -1: + yield [ + SeqRecord( + seq=Seq(str(rec.seq[feat.location.start: feat.location.end - feat.phase])).reverse_complement(), + id=id.replace(" ", "-"), + description=description, + ) + ] + else: + yield [ + SeqRecord( + #feat.extract(rec).seq, + seq=Seq(str(rec.seq[feat.location.start + feat.phase: feat.location.end])), + id=id.replace(" ", "-"), + description=description, + ) + ] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Export corresponding sequence in genome from GFF3", epilog="" + ) + parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") + parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") + parser.add_argument( + "--feature_filter", default=None, help="Filter for specific feature types" + ) + parser.add_argument( + "--nodesc", action="store_true", help="Strip description field off" + ) + args = parser.parse_args() + for seq in main(**vars(args)): + #if isinstance(seq, list): + # for x in seq: + # print(type(x.seq)) + # SeqIO.write(x, sys.stdout, "fasta") + #else: + SeqIO.write(seq, sys.stdout, "fasta")