comparison wig_rebase.py @ 2:ae5f291c788d draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:54:16 +0000
parents
children
comparison
equal deleted inserted replaced
1:143624dee31b 2:ae5f291c788d
1 #!/usr/bin/env python
2 import re
3 import sys
4 import logging
5 import argparse
6 import numpy
7 from gff3 import feature_lambda, feature_test_true
8 from CPT_GFFParser import gffParse, gffWrite
9
10 log = logging.getLogger(__name__)
11 logging.basicConfig(level=logging.INFO)
12
13
14 def __update_feature_location(pos, parent, protein2dna):
15 if protein2dna:
16 pos *= 3
17 # Move back so location is correct.
18 if parent.strand > 0:
19 pos -= 3
20
21 if parent.strand >= 0:
22 new_pos = parent.start + pos
23 else:
24 new_pos = parent.end - pos
25
26 # print(start, end, ns, ne, st)
27
28 # Don't let start/stops be less than zero.
29 # Instead, we'll replace with %3 to try and keep it in the same reading
30 # frame that it should be in.
31 if new_pos < 0:
32 new_pos %= 3
33 return new_pos
34
35
36 def getGff3Locations(parent, map_by="ID"):
37 featureLocations = {}
38 recs = gffParse(parent)
39 # Only parse first.
40 rec = recs[0]
41 # Get all the feature locations in this genome
42 for feature in feature_lambda(rec.features, feature_test_true, {}):
43 id = feature.qualifiers.get(map_by, [feature.id])[0]
44 featureLocations[id] = feature.location
45 return rec, featureLocations
46
47
48 def rebase_wig(parent, wigData, protein2dna=False, map_by="ID"):
49 rec, locations = getGff3Locations(parent, map_by=map_by)
50 current_id = None
51 current_ft = None
52 # We have to store in a giant array so we can overwrite safely and don't
53 # emit multiple values.
54 values = numpy.empty(1000000)
55
56 maxFtLoc = 0
57 for line in wigData:
58 if line.startswith("track"):
59 # pass through
60 sys.stdout.write(line)
61 sys.stdout.write("variableStep chrom=%s span=1\n" % rec.id)
62 continue
63 if line.startswith("variableStep"):
64 # No passthrough
65 current_id = re.findall("chrom=([^ ]+)", line)[0]
66 try:
67 current_ft = locations[current_id]
68 except:
69 continue
70 # Update max value
71 if current_ft.end > maxFtLoc:
72 maxFtLoc = current_ft.end
73 else:
74 (pos, val) = line.strip().split()
75 pos = int(pos)
76 val = float(val)
77
78 npos = __update_feature_location(pos, current_ft, protein2dna=protein2dna)
79 values[npos] = val
80 values[npos + 1] = val
81 values[npos + 2] = val
82
83 for i in range(maxFtLoc):
84 sys.stdout.write("%s %s\n" % (i + 1, values[i]))
85
86
87 if __name__ == "__main__":
88 parser = argparse.ArgumentParser(
89 description="rebase wig data against parent locations"
90 )
91 parser.add_argument("parent", type=argparse.FileType("r"))
92 parser.add_argument("wigData", type=argparse.FileType("r"))
93 parser.add_argument(
94 "--protein2dna",
95 action="store_true",
96 help="Map protein translated results to original DNA data",
97 )
98 parser.add_argument("--map_by", help="Map by key", default="ID")
99 args = parser.parse_args()
100 rebase_wig(**vars(args))