Mercurial > repos > cpt > cpt_intersect_adj
comparison intersect_and_adjacent.py @ 0:923d2528480d draft
planemo upload commit 8500f316f8325380aa5d0000fbd13dd96012b1bb
| author | cpt |
|---|---|
| date | Thu, 08 Aug 2024 04:14:38 +0000 |
| parents | |
| children | 10000414e916 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:923d2528480d |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import logging | |
| 3 import argparse | |
| 4 from intervaltree import IntervalTree, Interval | |
| 5 from CPT_GFFParser import gffParse, gffWrite | |
| 6 from Bio.SeqRecord import SeqRecord | |
| 7 from Bio.Seq import Seq | |
| 8 | |
| 9 logging.basicConfig(level=logging.INFO) | |
| 10 log = logging.getLogger(__name__) | |
| 11 | |
| 12 | |
| 13 def validFeat(rec): | |
| 14 for feat in rec.features: | |
| 15 if feat.type != "remark" and feat.type != "annotation": | |
| 16 return True | |
| 17 return False | |
| 18 | |
| 19 | |
| 20 def treeFeatures(features, window): | |
| 21 for feat in features: | |
| 22 # Interval(begin, end, data) | |
| 23 yield Interval( | |
| 24 int(feat.location.start) - int(window), | |
| 25 int(feat.location.end) + int(window), | |
| 26 feat.id, | |
| 27 ) | |
| 28 | |
| 29 | |
| 30 def treeFeatures_noRem(features, window): | |
| 31 for feat in features: | |
| 32 if feat.type == "remark" or feat.type == "annotation": | |
| 33 continue | |
| 34 # Interval(begin, end, data) | |
| 35 yield Interval( | |
| 36 int(feat.location.start) - int(window), | |
| 37 int(feat.location.end) + int(window), | |
| 38 feat.id, | |
| 39 ) | |
| 40 | |
| 41 | |
| 42 def intersect(a, b, window, stranding): | |
| 43 rec_a = list(gffParse(a)) | |
| 44 rec_b = list(gffParse(b)) | |
| 45 rec_a_out = [] | |
| 46 rec_b_out = [] | |
| 47 maxLen = min(len(rec_a), len(rec_b)) | |
| 48 iterate = 0 | |
| 49 | |
| 50 if maxLen > 0: | |
| 51 while iterate < maxLen: | |
| 52 rec_a_i = rec_a[iterate] | |
| 53 rec_b_i = rec_b[iterate] | |
| 54 | |
| 55 if (not validFeat(rec_a_i)) or (not validFeat(rec_b_i)): | |
| 56 rec_a_out.append( | |
| 57 SeqRecord( | |
| 58 rec_a[iterate].seq, | |
| 59 rec_a[iterate].id, | |
| 60 rec_a[iterate].name, | |
| 61 rec_a[iterate].description, | |
| 62 rec_a[iterate].dbxrefs, | |
| 63 [], | |
| 64 rec_a[iterate].annotations, | |
| 65 ) | |
| 66 ) | |
| 67 rec_b_out.append( | |
| 68 SeqRecord( | |
| 69 rec_b[iterate].seq, | |
| 70 rec_b[iterate].id, | |
| 71 rec_b[iterate].name, | |
| 72 rec_b[iterate].description, | |
| 73 rec_b[iterate].dbxrefs, | |
| 74 [], | |
| 75 rec_b[iterate].annotations, | |
| 76 ) | |
| 77 ) | |
| 78 iterate += 1 | |
| 79 continue | |
| 80 | |
| 81 a_neg = [] | |
| 82 a_pos = [] | |
| 83 b_neg = [] | |
| 84 b_pos = [] | |
| 85 tree_a = [] | |
| 86 tree_b = [] | |
| 87 if stranding == True: | |
| 88 for feat in rec_a_i.features: | |
| 89 if feat.type == "remark" or feat.type == "annotation": | |
| 90 continue | |
| 91 if feat.strand > 0: | |
| 92 a_pos.append( | |
| 93 Interval( | |
| 94 int(feat.location.start) - int(window), | |
| 95 int(feat.location.end) + int(window), | |
| 96 feat.id, | |
| 97 ) | |
| 98 ) | |
| 99 else: | |
| 100 a_neg.append( | |
| 101 Interval( | |
| 102 int(feat.location.start) - int(window), | |
| 103 int(feat.location.end) + int(window), | |
| 104 feat.id, | |
| 105 ) | |
| 106 ) | |
| 107 | |
| 108 for feat in rec_b_i.features: | |
| 109 if feat.type == "remark" or feat.type == "annotation": | |
| 110 continue | |
| 111 if feat.strand > 0: | |
| 112 b_pos.append( | |
| 113 Interval( | |
| 114 int(feat.location.start) - int(window), | |
| 115 int(feat.location.end) + int(window), | |
| 116 feat.id, | |
| 117 ) | |
| 118 ) | |
| 119 else: | |
| 120 b_neg.append( | |
| 121 Interval( | |
| 122 int(feat.location.start) - int(window), | |
| 123 int(feat.location.end) + int(window), | |
| 124 feat.id, | |
| 125 ) | |
| 126 ) | |
| 127 | |
| 128 else: | |
| 129 for feat in rec_a_i.features: | |
| 130 if feat.type == "remark" or feat.type == "annotation": | |
| 131 continue | |
| 132 tree_a.append( | |
| 133 Interval( | |
| 134 int(feat.location.start) - int(window), | |
| 135 int(feat.location.end) + int(window), | |
| 136 feat.id, | |
| 137 ) | |
| 138 ) | |
| 139 for feat in rec_b_i.features: | |
| 140 if feat.type == "remark" or feat.type == "annotation": | |
| 141 continue | |
| 142 tree_b.append( | |
| 143 Interval( | |
| 144 int(feat.location.start) - int(window), | |
| 145 int(feat.location.end) + int(window), | |
| 146 feat.id, | |
| 147 ) | |
| 148 ) | |
| 149 if stranding: | |
| 150 # builds interval tree from Interval objects of form (start, end, id) for each feature | |
| 151 # tree_a = IntervalTree(list(treeFeatures_noRem(rec_a_i.features, window))) | |
| 152 # tree_b = IntervalTree(list(treeFeatures_noRem(rec_b_i.features, window))) | |
| 153 # else: | |
| 154 tree_a_pos = IntervalTree(a_pos) | |
| 155 tree_a_neg = IntervalTree(a_neg) | |
| 156 tree_b_pos = IntervalTree(b_pos) | |
| 157 tree_b_neg = IntervalTree(b_neg) | |
| 158 else: | |
| 159 tree_a = IntervalTree(tree_a) | |
| 160 tree_b = IntervalTree(tree_b) | |
| 161 | |
| 162 # Used to map ids back to features later | |
| 163 rec_a_map = {f.id: f for f in rec_a_i.features} | |
| 164 rec_b_map = {f.id: f for f in rec_b_i.features} | |
| 165 | |
| 166 rec_a_hits_in_b = [] | |
| 167 rec_b_hits_in_a = [] | |
| 168 | |
| 169 for feature in rec_a_i.features: | |
| 170 # Save each feature in rec_a that overlaps a feature in rec_b | |
| 171 # hits = tree_b.find_range((int(feature.location.start), int(feature.location.end))) | |
| 172 | |
| 173 if feature.type == "remark" or feature.type == "annotation": | |
| 174 continue | |
| 175 | |
| 176 if stranding == False: | |
| 177 hits = tree_b[ | |
| 178 int(feature.location.start) : int(feature.location.end) | |
| 179 ] | |
| 180 | |
| 181 # feature id is saved in interval result.data, use map to get full feature | |
| 182 for hit in hits: | |
| 183 rec_a_hits_in_b.append(rec_b_map[hit.data]) | |
| 184 | |
| 185 else: | |
| 186 if feature.strand > 0: | |
| 187 hits_pos = tree_b_pos[ | |
| 188 int(feature.location.start) : int(feature.location.end) | |
| 189 ] | |
| 190 for hit in hits_pos: | |
| 191 rec_a_hits_in_b.append(rec_b_map[hit.data]) | |
| 192 else: | |
| 193 hits_neg = tree_b_neg[ | |
| 194 int(feature.location.start) : int(feature.location.end) | |
| 195 ] | |
| 196 for hit in hits_neg: | |
| 197 rec_a_hits_in_b.append(rec_b_map[hit.data]) | |
| 198 | |
| 199 for feature in rec_b_i.features: | |
| 200 if feature.type == "remark" or feature.type == "annotation": | |
| 201 continue | |
| 202 | |
| 203 if stranding == False: | |
| 204 hits = tree_a[ | |
| 205 int(feature.location.start) : int(feature.location.end) | |
| 206 ] | |
| 207 | |
| 208 # feature id is saved in interval result.data, use map to get full feature | |
| 209 for hit in hits: | |
| 210 rec_b_hits_in_a.append(rec_a_map[hit.data]) | |
| 211 | |
| 212 else: | |
| 213 if feature.strand > 0: | |
| 214 hits_pos = tree_a_pos[ | |
| 215 int(feature.location.start) : int(feature.location.end) | |
| 216 ] | |
| 217 for hit in hits_pos: | |
| 218 rec_b_hits_in_a.append(rec_a_map[hit.data]) | |
| 219 else: | |
| 220 hits_neg = tree_a_neg[ | |
| 221 int(feature.location.start) : int(feature.location.end) | |
| 222 ] | |
| 223 for hit in hits_neg: | |
| 224 rec_b_hits_in_a.append(rec_a_map[hit.data]) | |
| 225 | |
| 226 # Remove duplicate features using sets | |
| 227 rec_a_out.append( | |
| 228 SeqRecord( | |
| 229 rec_a[iterate].seq, | |
| 230 rec_a[iterate].id, | |
| 231 rec_a[iterate].name, | |
| 232 rec_a[iterate].description, | |
| 233 rec_a[iterate].dbxrefs, | |
| 234 sorted(set(rec_a_hits_in_b), key=lambda feat: feat.location.start), | |
| 235 rec_a[iterate].annotations, | |
| 236 ) | |
| 237 ) | |
| 238 rec_b_out.append( | |
| 239 SeqRecord( | |
| 240 rec_b[iterate].seq, | |
| 241 rec_b[iterate].id, | |
| 242 rec_b[iterate].name, | |
| 243 rec_b[iterate].description, | |
| 244 rec_b[iterate].dbxrefs, | |
| 245 sorted(set(rec_b_hits_in_a), key=lambda feat: feat.location.start), | |
| 246 rec_b[iterate].annotations, | |
| 247 ) | |
| 248 ) | |
| 249 iterate += 1 | |
| 250 | |
| 251 else: | |
| 252 # If one input is empty, output two empty result files. | |
| 253 rec_a_out = [SeqRecord(Seq(""), "none")] | |
| 254 rec_b_out = [SeqRecord(Seq(""), "none")] | |
| 255 return rec_a_out, rec_b_out | |
| 256 | |
| 257 | |
| 258 if __name__ == "__main__": | |
| 259 parser = argparse.ArgumentParser( | |
| 260 description="rebase gff3 features against parent locations", epilog="" | |
| 261 ) | |
| 262 parser.add_argument("a", type=argparse.FileType("r")) | |
| 263 parser.add_argument("b", type=argparse.FileType("r")) | |
| 264 parser.add_argument( | |
| 265 "window", | |
| 266 type=int, | |
| 267 default=50, | |
| 268 help="Allows features this far away to still be considered 'adjacent'", | |
| 269 ) | |
| 270 parser.add_argument( | |
| 271 "-stranding", | |
| 272 action="store_true", | |
| 273 help="Only allow adjacency for same-strand features", | |
| 274 ) | |
| 275 parser.add_argument("--oa", type=str, default="a_hits_near_b.gff") | |
| 276 parser.add_argument("--ob", type=str, default="b_hits_near_a.gff") | |
| 277 args = parser.parse_args() | |
| 278 | |
| 279 b, a = intersect(args.a, args.b, args.window, args.stranding) | |
| 280 | |
| 281 with open(args.oa, "w") as handle: | |
| 282 for rec in a: | |
| 283 gffWrite([rec], handle) | |
| 284 | |
| 285 with open(args.ob, "w") as handle: | |
| 286 for rec in b: | |
| 287 gffWrite([rec], handle) |
