Mercurial > repos > cpt > cpt_wig_rebase
view wig_rebase.py @ 2:ae5f291c788d draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:54:16 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python import re import sys import logging import argparse import numpy from gff3 import feature_lambda, feature_test_true from CPT_GFFParser import gffParse, gffWrite log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) def __update_feature_location(pos, parent, protein2dna): if protein2dna: pos *= 3 # Move back so location is correct. if parent.strand > 0: pos -= 3 if parent.strand >= 0: new_pos = parent.start + pos else: new_pos = parent.end - pos # print(start, end, ns, ne, st) # Don't let start/stops be less than zero. # Instead, we'll replace with %3 to try and keep it in the same reading # frame that it should be in. if new_pos < 0: new_pos %= 3 return new_pos def getGff3Locations(parent, map_by="ID"): featureLocations = {} recs = gffParse(parent) # Only parse first. rec = recs[0] # Get all the feature locations in this genome for feature in feature_lambda(rec.features, feature_test_true, {}): id = feature.qualifiers.get(map_by, [feature.id])[0] featureLocations[id] = feature.location return rec, featureLocations def rebase_wig(parent, wigData, protein2dna=False, map_by="ID"): rec, locations = getGff3Locations(parent, map_by=map_by) current_id = None current_ft = None # We have to store in a giant array so we can overwrite safely and don't # emit multiple values. values = numpy.empty(1000000) maxFtLoc = 0 for line in wigData: if line.startswith("track"): # pass through sys.stdout.write(line) sys.stdout.write("variableStep chrom=%s span=1\n" % rec.id) continue if line.startswith("variableStep"): # No passthrough current_id = re.findall("chrom=([^ ]+)", line)[0] try: current_ft = locations[current_id] except: continue # Update max value if current_ft.end > maxFtLoc: maxFtLoc = current_ft.end else: (pos, val) = line.strip().split() pos = int(pos) val = float(val) npos = __update_feature_location(pos, current_ft, protein2dna=protein2dna) values[npos] = val values[npos + 1] = val values[npos + 2] = val for i in range(maxFtLoc): sys.stdout.write("%s %s\n" % (i + 1, values[i])) if __name__ == "__main__": parser = argparse.ArgumentParser( description="rebase wig data against parent locations" ) parser.add_argument("parent", type=argparse.FileType("r")) parser.add_argument("wigData", type=argparse.FileType("r")) parser.add_argument( "--protein2dna", action="store_true", help="Map protein translated results to original DNA data", ) parser.add_argument("--map_by", help="Map by key", default="ID") args = parser.parse_args() rebase_wig(**vars(args))