Mercurial > repos > cpt > cpt_intersect_adjacent
comparison cpt_intersect_adj/intersect_and_adjacent.py @ 0:4c72b6accdee draft
Uploaded
author | cpt |
---|---|
date | Fri, 13 May 2022 05:05:59 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4c72b6accdee |
---|---|
1 #!/usr/bin/env python | |
2 import logging | |
3 import argparse | |
4 from intervaltree import IntervalTree, Interval | |
5 from CPT_GFFParser import gffParse, gffWrite | |
6 from Bio.SeqRecord import SeqRecord | |
7 from Bio.Seq import Seq | |
8 | |
9 logging.basicConfig(level=logging.INFO) | |
10 log = logging.getLogger(__name__) | |
11 | |
12 def validFeat(rec): | |
13 for feat in rec.features: | |
14 if feat.type != 'remark' and feat.type != 'annotation': | |
15 return True | |
16 return False | |
17 | |
18 def treeFeatures(features, window): | |
19 for feat in features: | |
20 # Interval(begin, end, data) | |
21 yield Interval( | |
22 int(feat.location.start) - int(window), | |
23 int(feat.location.end) + int(window), | |
24 feat.id, | |
25 ) | |
26 def treeFeatures_noRem(features, window): | |
27 for feat in features: | |
28 if feat.type == 'remark' or feat.type == 'annotation': | |
29 continue | |
30 # Interval(begin, end, data) | |
31 yield Interval( | |
32 int(feat.location.start) - int(window), | |
33 int(feat.location.end) + int(window), | |
34 feat.id, | |
35 ) | |
36 | |
37 | |
38 def intersect(a, b, window, stranding): | |
39 rec_a = list(gffParse(a)) | |
40 rec_b = list(gffParse(b)) | |
41 rec_a_out = [] | |
42 rec_b_out = [] | |
43 maxLen = min(len(rec_a), len(rec_b)) | |
44 iterate = 0 | |
45 | |
46 | |
47 if maxLen > 0: | |
48 while iterate < maxLen: | |
49 rec_a_i = rec_a[iterate] | |
50 rec_b_i = rec_b[iterate] | |
51 | |
52 if (not validFeat(rec_a_i)) or (not validFeat(rec_b_i)): | |
53 rec_a_out.append(SeqRecord(rec_a[iterate].seq, rec_a[iterate].id, rec_a[iterate].name, rec_a[iterate].description, rec_a[iterate].dbxrefs, [], rec_a[iterate].annotations)) | |
54 rec_b_out.append(SeqRecord(rec_b[iterate].seq, rec_b[iterate].id, rec_b[iterate].name, rec_b[iterate].description, rec_b[iterate].dbxrefs, [], rec_b[iterate].annotations)) | |
55 iterate += 1 | |
56 continue | |
57 | |
58 a_neg = [] | |
59 a_pos = [] | |
60 b_neg = [] | |
61 b_pos = [] | |
62 tree_a = [] | |
63 tree_b = [] | |
64 if stranding == True: | |
65 for feat in rec_a_i.features: | |
66 if feat.type == 'remark' or feat.type == 'annotation': | |
67 continue | |
68 if feat.strand > 0: | |
69 a_pos.append( | |
70 Interval( | |
71 int(feat.location.start) - int(window), | |
72 int(feat.location.end) + int(window), | |
73 feat.id, | |
74 ) | |
75 ) | |
76 else: | |
77 a_neg.append( | |
78 Interval( | |
79 int(feat.location.start) - int(window), | |
80 int(feat.location.end) + int(window), | |
81 feat.id, | |
82 ) | |
83 ) | |
84 | |
85 for feat in rec_b_i.features: | |
86 if feat.type == 'remark' or feat.type == 'annotation': | |
87 continue | |
88 if feat.strand > 0: | |
89 b_pos.append( | |
90 Interval( | |
91 int(feat.location.start) - int(window), | |
92 int(feat.location.end) + int(window), | |
93 feat.id, | |
94 ) | |
95 ) | |
96 else: | |
97 b_neg.append( | |
98 Interval( | |
99 int(feat.location.start) - int(window), | |
100 int(feat.location.end) + int(window), | |
101 feat.id, | |
102 ) | |
103 ) | |
104 | |
105 else: | |
106 for feat in rec_a_i.features: | |
107 if feat.type == 'remark' or feat.type == 'annotation': | |
108 continue | |
109 tree_a.append( | |
110 Interval( | |
111 int(feat.location.start) - int(window), | |
112 int(feat.location.end) + int(window), | |
113 feat.id, | |
114 ) | |
115 ) | |
116 for feat in rec_b_i.features: | |
117 if feat.type == 'remark' or feat.type == 'annotation': | |
118 continue | |
119 tree_b.append( | |
120 Interval( | |
121 int(feat.location.start) - int(window), | |
122 int(feat.location.end) + int(window), | |
123 feat.id, | |
124 ) | |
125 ) | |
126 if stranding: | |
127 # builds interval tree from Interval objects of form (start, end, id) for each feature | |
128 # tree_a = IntervalTree(list(treeFeatures_noRem(rec_a_i.features, window))) | |
129 #tree_b = IntervalTree(list(treeFeatures_noRem(rec_b_i.features, window))) | |
130 #else: | |
131 tree_a_pos = IntervalTree(a_pos) | |
132 tree_a_neg = IntervalTree(a_neg) | |
133 tree_b_pos = IntervalTree(b_pos) | |
134 tree_b_neg = IntervalTree(b_neg) | |
135 else: | |
136 tree_a = IntervalTree(tree_a) | |
137 tree_b = IntervalTree(tree_b) | |
138 | |
139 | |
140 # Used to map ids back to features later | |
141 rec_a_map = {f.id: f for f in rec_a_i.features} | |
142 rec_b_map = {f.id: f for f in rec_b_i.features} | |
143 | |
144 rec_a_hits_in_b = [] | |
145 rec_b_hits_in_a = [] | |
146 | |
147 for feature in rec_a_i.features: | |
148 # Save each feature in rec_a that overlaps a feature in rec_b | |
149 # hits = tree_b.find_range((int(feature.location.start), int(feature.location.end))) | |
150 | |
151 if feature.type == "remark" or feature.type == "annotation": | |
152 continue | |
153 | |
154 if stranding == False: | |
155 hits = tree_b[int(feature.location.start) : int(feature.location.end)] | |
156 | |
157 | |
158 # feature id is saved in interval result.data, use map to get full feature | |
159 for hit in hits: | |
160 rec_a_hits_in_b.append(rec_b_map[hit.data]) | |
161 | |
162 else: | |
163 if feature.strand > 0: | |
164 hits_pos = tree_b_pos[ | |
165 int(feature.location.start) : int(feature.location.end) | |
166 ] | |
167 for hit in hits_pos: | |
168 rec_a_hits_in_b.append(rec_b_map[hit.data]) | |
169 else: | |
170 hits_neg = tree_b_neg[ | |
171 int(feature.location.start) : int(feature.location.end) | |
172 ] | |
173 for hit in hits_neg: | |
174 rec_a_hits_in_b.append(rec_b_map[hit.data]) | |
175 | |
176 for feature in rec_b_i.features: | |
177 if feature.type == "remark" or feature.type == "annotation": | |
178 continue | |
179 | |
180 if stranding == False: | |
181 hits = tree_a[int(feature.location.start) : int(feature.location.end)] | |
182 | |
183 # feature id is saved in interval result.data, use map to get full feature | |
184 for hit in hits: | |
185 rec_b_hits_in_a.append(rec_a_map[hit.data]) | |
186 | |
187 else: | |
188 if feature.strand > 0: | |
189 hits_pos = tree_a_pos[ | |
190 int(feature.location.start) : int(feature.location.end) | |
191 ] | |
192 for hit in hits_pos: | |
193 rec_b_hits_in_a.append(rec_a_map[hit.data]) | |
194 else: | |
195 hits_neg = tree_a_neg[ | |
196 int(feature.location.start) : int(feature.location.end) | |
197 ] | |
198 for hit in hits_neg: | |
199 rec_b_hits_in_a.append(rec_a_map[hit.data]) | |
200 | |
201 # Remove duplicate features using sets | |
202 rec_a_out.append(SeqRecord(rec_a[iterate].seq, rec_a[iterate].id, rec_a[iterate].name, rec_a[iterate].description, rec_a[iterate].dbxrefs, sorted(set(rec_a_hits_in_b), key=lambda feat: feat.location.start), rec_a[iterate].annotations)) | |
203 rec_b_out.append(SeqRecord(rec_b[iterate].seq, rec_b[iterate].id, rec_b[iterate].name, rec_b[iterate].description, rec_b[iterate].dbxrefs, sorted(set(rec_b_hits_in_a), key=lambda feat: feat.location.start), rec_b[iterate].annotations)) | |
204 iterate += 1 | |
205 | |
206 else: | |
207 # If one input is empty, output two empty result files. | |
208 rec_a_out = [SeqRecord(Seq(""), "none")] | |
209 rec_b_out = [SeqRecord(Seq(""), "none")] | |
210 return rec_a_out, rec_b_out | |
211 | |
212 | |
213 if __name__ == "__main__": | |
214 parser = argparse.ArgumentParser( | |
215 description="rebase gff3 features against parent locations", epilog="" | |
216 ) | |
217 parser.add_argument("a", type=argparse.FileType("r")) | |
218 parser.add_argument("b", type=argparse.FileType("r")) | |
219 parser.add_argument( | |
220 "window", | |
221 type=int, | |
222 default=50, | |
223 help="Allows features this far away to still be considered 'adjacent'", | |
224 ) | |
225 parser.add_argument( | |
226 "-stranding", | |
227 action="store_true", | |
228 help="Only allow adjacency for same-strand features", | |
229 ) | |
230 parser.add_argument("--oa", type=str, default="a_hits_near_b.gff") | |
231 parser.add_argument("--ob", type=str, default="b_hits_near_a.gff") | |
232 args = parser.parse_args() | |
233 | |
234 b, a = intersect(args.a, args.b, args.window, args.stranding) | |
235 | |
236 with open(args.oa, "w") as handle: | |
237 for rec in a: | |
238 gffWrite([rec], handle) | |
239 | |
240 with open(args.ob, "w") as handle: | |
241 for rec in b: | |
242 gffWrite([rec], handle) |