comparison genome_editor.py @ 3:134bb2d7cdfd draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:43:21 +0000
parents
children
comparison
equal deleted inserted replaced
2:787ce84e8d16 3:134bb2d7cdfd
1 #!/usr/bin/env python
2 import logging
3 import copy
4 import argparse
5 import tsv
6 from Bio import SeqIO
7 from Bio.Seq import Seq
8 from Bio.SeqFeature import FeatureLocation
9 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature, convertSeqRec
10 from gff3 import feature_lambda, feature_test_contains
11
12 logging.basicConfig(level=logging.INFO)
13 log = logging.getLogger(__name__)
14
15
16 def mutate(gff3, fasta, changes, customSeqs, new_id):
17 # Change Language
18 # - we can only accept ONE genome as an input. (TODO: support multiple?)
19 # - we can only build ONE genome as an output. (TODO: support multiple?)
20 # - must allow selection of various regions
21 # '1,1000,+ 40,100,- custom_seq_1'
22 try:
23 custom_seqs = SeqIO.to_dict(SeqIO.parse(customSeqs, "fasta"))
24 except:
25 custom_seqs = {}
26 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
27 # Pull first and onl record
28 rec = list(gffParse(gff3, base_dict=seq_dict))[0]
29 # Create a "clean" record
30 new_record = copy.deepcopy(rec)
31 new_record.id = new_id
32 new_record.seq = Seq("")
33 new_record.features = []
34 new_record.annotations = {}
35 # Process changes.
36 chain = []
37 topFeats = {}
38 covered = 0
39 for feat in rec.features:
40 if "ID" in feat.qualifiers.keys():
41 topFeats[feat.qualifiers["ID"][0]] = feat.location.start
42 for change in changes:
43 if "," in change:
44 (start, end, strand) = change.split(",")
45 start = int(start) - 1
46 end = int(end)
47
48 # Make any complaints
49 broken_feature_start = list(
50 feature_lambda(
51 rec.features,
52 feature_test_contains,
53 {"index": start},
54 subfeatures=False,
55 )
56 )
57 if len(broken_feature_start) > 0:
58 pass
59 # log.info("DANGER: Start index chosen (%s) is in the middle of a feature (%s %s). This feature will disappear from the output", start, broken_feature_start[0].id, broken_feature_start[0].location)
60 broken_feature_end = list(
61 feature_lambda(
62 rec.features,
63 feature_test_contains,
64 {"index": end},
65 subfeatures=False,
66 )
67 )
68 if len(broken_feature_end) > 0:
69 pass
70 # log.info("DANGER: End index chosen (%s) is in the middle of a feature (%s %s). This feature will disappear from the output", end, broken_feature_end[0].id, broken_feature_end[0].location)
71
72 # Ok, fetch features
73 if strand == "+":
74 tmp_req = rec[start:end]
75 else:
76 tmp_req = rec[start:end].reverse_complement(
77 id=True,
78 name=True,
79 description=True,
80 features=True,
81 annotations=True,
82 letter_annotations=True,
83 dbxrefs=True,
84 )
85 tmp_req = convertSeqRec(tmp_req)[0]
86
87 def update_location(feature, shiftS):
88 feature.location = FeatureLocation(
89 feature.location.start + shiftS,
90 feature.location.end + shiftS,
91 feature.strand,
92 )
93 for i in feature.sub_features:
94 i = update_location(i, shiftS)
95 return feature
96
97 # for feature in tmp_req.features:
98
99 chain.append(
100 [
101 rec.id,
102 start + 1,
103 end,
104 strand,
105 new_record.id,
106 len(new_record) + 1,
107 len(new_record) + (end - start),
108 "+",
109 ]
110 )
111
112 covered += len(new_record.seq)
113 print(covered)
114 new_record.seq += tmp_req.seq
115 # NB: THIS MUST USE BIOPYTHON 1.67. 1.68 Removes access to
116 # subfeatures, which means you will only get top-level features.
117 startInd = len(new_record.features)
118 new_record.features += tmp_req.features
119
120 for i in new_record.features[startInd:]:
121 i.location = FeatureLocation(
122 i.location.start + covered,
123 i.location.end + covered,
124 i.location.strand,
125 )
126 if "ID" not in i.qualifiers.keys():
127 continue
128 diffS = i.location.start - topFeats[i.qualifiers["ID"][0]]
129 subFeats = i.sub_features
130 for j in subFeats:
131 j = update_location(j, diffS)
132 else:
133 new_record.seq += custom_seqs[change].seq
134 yield new_record, chain
135
136
137 if __name__ == "__main__":
138 parser = argparse.ArgumentParser()
139 parser.add_argument("fasta", type=argparse.FileType("r"), help="Sequence")
140 parser.add_argument("gff3", type=argparse.FileType("r"), help="Annotations")
141 parser.add_argument("new_id", help="Append to ID", default="_v2")
142 parser.add_argument(
143 "--out_fasta",
144 type=argparse.FileType("w"),
145 help="Output fasta",
146 default="out.fa",
147 )
148 parser.add_argument(
149 "--out_gff3",
150 type=argparse.FileType("w"),
151 help="Output gff3",
152 default="out.gff3",
153 )
154 parser.add_argument(
155 "--out_simpleChain",
156 type=argparse.FileType("w"),
157 help="Output simple chain (i.e. not a real UCSC chain file)",
158 default="out.chain",
159 )
160 parser.add_argument("--changes", nargs="+")
161 parser.add_argument("--customSeqs", type=argparse.FileType("r"))
162 args = parser.parse_args()
163
164 for rec, chain in mutate(
165 args.gff3, args.fasta, args.changes, args.customSeqs, args.new_id
166 ):
167 # TODO: Check that this appends and doesn't overwirte
168 gffWrite([rec], args.out_gff3)
169 SeqIO.write([rec], args.out_fasta, "fasta")
170 tsv.dump(chain, args.out_simpleChain)