Mercurial > repos > cpt > cpt_genome_editor
view genome_editor.py @ 3:134bb2d7cdfd draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:43:21 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python import logging import copy import argparse import tsv from Bio import SeqIO from Bio.Seq import Seq from Bio.SeqFeature import FeatureLocation from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature, convertSeqRec from gff3 import feature_lambda, feature_test_contains logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) def mutate(gff3, fasta, changes, customSeqs, new_id): # Change Language # - we can only accept ONE genome as an input. (TODO: support multiple?) # - we can only build ONE genome as an output. (TODO: support multiple?) # - must allow selection of various regions # '1,1000,+ 40,100,- custom_seq_1' try: custom_seqs = SeqIO.to_dict(SeqIO.parse(customSeqs, "fasta")) except: custom_seqs = {} seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) # Pull first and onl record rec = list(gffParse(gff3, base_dict=seq_dict))[0] # Create a "clean" record new_record = copy.deepcopy(rec) new_record.id = new_id new_record.seq = Seq("") new_record.features = [] new_record.annotations = {} # Process changes. chain = [] topFeats = {} covered = 0 for feat in rec.features: if "ID" in feat.qualifiers.keys(): topFeats[feat.qualifiers["ID"][0]] = feat.location.start for change in changes: if "," in change: (start, end, strand) = change.split(",") start = int(start) - 1 end = int(end) # Make any complaints broken_feature_start = list( feature_lambda( rec.features, feature_test_contains, {"index": start}, subfeatures=False, ) ) if len(broken_feature_start) > 0: pass # log.info("DANGER: Start index chosen (%s) is in the middle of a feature (%s %s). This feature will disappear from the output", start, broken_feature_start[0].id, broken_feature_start[0].location) broken_feature_end = list( feature_lambda( rec.features, feature_test_contains, {"index": end}, subfeatures=False, ) ) if len(broken_feature_end) > 0: pass # log.info("DANGER: End index chosen (%s) is in the middle of a feature (%s %s). This feature will disappear from the output", end, broken_feature_end[0].id, broken_feature_end[0].location) # Ok, fetch features if strand == "+": tmp_req = rec[start:end] else: tmp_req = rec[start:end].reverse_complement( id=True, name=True, description=True, features=True, annotations=True, letter_annotations=True, dbxrefs=True, ) tmp_req = convertSeqRec(tmp_req)[0] def update_location(feature, shiftS): feature.location = FeatureLocation( feature.location.start + shiftS, feature.location.end + shiftS, feature.strand, ) for i in feature.sub_features: i = update_location(i, shiftS) return feature # for feature in tmp_req.features: chain.append( [ rec.id, start + 1, end, strand, new_record.id, len(new_record) + 1, len(new_record) + (end - start), "+", ] ) covered += len(new_record.seq) print(covered) new_record.seq += tmp_req.seq # NB: THIS MUST USE BIOPYTHON 1.67. 1.68 Removes access to # subfeatures, which means you will only get top-level features. startInd = len(new_record.features) new_record.features += tmp_req.features for i in new_record.features[startInd:]: i.location = FeatureLocation( i.location.start + covered, i.location.end + covered, i.location.strand, ) if "ID" not in i.qualifiers.keys(): continue diffS = i.location.start - topFeats[i.qualifiers["ID"][0]] subFeats = i.sub_features for j in subFeats: j = update_location(j, diffS) else: new_record.seq += custom_seqs[change].seq yield new_record, chain if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("fasta", type=argparse.FileType("r"), help="Sequence") parser.add_argument("gff3", type=argparse.FileType("r"), help="Annotations") parser.add_argument("new_id", help="Append to ID", default="_v2") parser.add_argument( "--out_fasta", type=argparse.FileType("w"), help="Output fasta", default="out.fa", ) parser.add_argument( "--out_gff3", type=argparse.FileType("w"), help="Output gff3", default="out.gff3", ) parser.add_argument( "--out_simpleChain", type=argparse.FileType("w"), help="Output simple chain (i.e. not a real UCSC chain file)", default="out.chain", ) parser.add_argument("--changes", nargs="+") parser.add_argument("--customSeqs", type=argparse.FileType("r")) args = parser.parse_args() for rec, chain in mutate( args.gff3, args.fasta, args.changes, args.customSeqs, args.new_id ): # TODO: Check that this appends and doesn't overwirte gffWrite([rec], args.out_gff3) SeqIO.write([rec], args.out_fasta, "fasta") tsv.dump(chain, args.out_simpleChain)