diff shm_csr.py @ 90:6809c63d9161 draft

"planemo upload commit fd64827ff6e63df008f6f50ddb8576ad2b1dbb26"
author rhpvorderman
date Tue, 25 Jan 2022 11:28:29 +0000
parents 729738462297
children 385dea3c6cb5
line wrap: on
line diff
--- a/shm_csr.py	Fri Nov 05 13:41:03 2021 +0000
+++ b/shm_csr.py	Tue Jan 25 11:28:29 2022 +0000
@@ -2,15 +2,94 @@
 import logging
 import sys
 import os
-import re
+import typing
+from typing import Optional
 
 from collections import defaultdict
 
+REGION_FILTERS = ("leader", "FR1", "CDR1", "FR2", "CDR2")
+
+
+class Mutation(typing.NamedTuple):
+	"""Represent a mutation type as a tuple"""
+	frm: str  # 'from' is a reserved python keyword.
+	where: int
+	to: str
+	frmAA: Optional[str] = None
+	whereAA: Optional[int] = None
+	toAA: Optional[str] = None
+	thing: Optional[str] = None  # '(---)' or '(+-+)' etc. No idea
+
+	@classmethod
+	def from_string(cls, string: str):
+		# Complete mutation example: a88>g,I30>V(+ - +)
+		# Only nucleotide example: g303>t
+		# Including codon change:
+		# t169>g,Y57>D(- - -); Y57 tat 169-171 [ta 169-170]>D gac
+		# Including codon change (synonumous mutation):
+		# c114>t, Y38; Y38 tac 112-114 [tact 112-115]>Y tat
+		if ',' in string:
+			nucleotide_change, aa_change = string.split(',', maxsplit=1)  # type: str, Optional[str]
+		else:
+			nucleotide_change = string
+			aa_change = None
+		frm_part, to = nucleotide_change.split('>', maxsplit=1)
+		frm = frm_part[0]
+		where = int(frm_part[1:])
+
+		if aa_change is None:
+			return cls(frm, where, to)
+
+		aa_change = aa_change.strip()
+		# The part after semicolon indicates the codon change. This part may
+		# not be present.
+		semi_colon_index = aa_change.find(";")
+		if semi_colon_index == -1:
+			codon_change = ""
+		else:
+			codon_change = aa_change[semi_colon_index:]
+			aa_change = aa_change[:semi_colon_index]
+		change_operator_index = aa_change.find(">")
+		if change_operator_index == -1:
+			# Synonymous change
+			frmAA_part = aa_change
+			toAA_part = ""
+		else:
+			frmAA_part, toAA_part = aa_change.split('>', maxsplit=1)  # type: str, str
+		frmAA = frmAA_part[0]
+		whereAA = int(frmAA_part[1:])
+		if toAA_part:
+			brace_start = toAA_part.index('(')
+			toAA = toAA_part[:brace_start]
+			thing = toAA_part[brace_start:] + codon_change
+		else:
+			# Synonymous mutation
+			toAA = frmAA
+			thing = codon_change
+		return cls(frm, where, to, frmAA, whereAA, toAA, thing)
+
+
+class Hotspot(typing.NamedTuple):
+	start: int
+	end: int
+	region: str
+
+	@classmethod
+	def from_string(cls, string):
+		# Example: aa,40-41(FR1)
+		sequence, rest = string.split(',')  # type: str, str
+		brace_pos = rest.index('(')
+		numbers = rest[:brace_pos]
+		start, end = numbers.split('-')
+		region = rest[brace_pos + 1:-1]  # Remove the braces
+		return cls(int(start), int(end), region)
+
+
 def main():
 	parser = argparse.ArgumentParser()
 	parser.add_argument("--input", help="The '7_V-REGION-mutation-and-AA-change-table' and '10_V-REGION-mutation-hotspots' merged together, with an added 'best_match' annotation")
 	parser.add_argument("--genes", help="The genes available in the 'best_match' column")
-	parser.add_argument("--empty_region_filter", help="Where does the sequence start?", choices=['leader', 'FR1', 'CDR1', 'FR2'])
+	parser.add_argument("--empty_region_filter", help="Where does the sequence start?", choices=REGION_FILTERS)
 	parser.add_argument("--output", help="Output file")
 
 	args = parser.parse_args()
@@ -23,12 +102,7 @@
 	genedic = dict()
 
 	mutationdic = dict()
-	mutationMatcher = re.compile("^(.)(\d+).(.),?[ ]?(.)?(\d+)?.?(.)?(.?.?.?.?.?)?")
-	mutationMatcher = re.compile("^([actg])(\d+).([actg]),?[ ]?([A-Z])?(\d+)?.?([A-Z])?(.*)?")
-	mutationMatcher = re.compile("^([actg])(\d+).([actg]),?[ ]?([A-Z])?(\d+)?[>]?([A-Z;])?(.*)?")
-	mutationMatcher = re.compile(r"^([nactg])(\d+).([nactg]),?[ ]?([A-Z*])?(\d+)?[>]?([A-Z*;])?(.*)?")
 	NAMatchResult = (None, None, None, None, None, None, '')
-	geneMatchers = {gene: re.compile("^" + gene + ".*") for gene in genes}
 	linecount = 0
 
 	IDIndex = 0
@@ -42,20 +116,14 @@
 	IDlist = []
 	mutationList = []
 	mutationListByID = {}
-	cdr1LengthDic = {}
-	cdr2LengthDic = {}
+	cdr1AALengthDic = {}
+	cdr2AALengthDic = {}
 
-	fr1LengthDict = {}
-	fr2LengthDict = {}
-	fr3LengthDict = {}
+	LengthDic = {}
 
 	cdr1LengthIndex = 0
 	cdr2LengthIndex = 0
 
-	fr1SeqIndex = 0
-	fr2SeqIndex = 0
-	fr3SeqIndex = 0
-
 	tandem_sum_by_class = defaultdict(int)
 	expected_tandem_sum_by_class = defaultdict(float)
 
@@ -70,11 +138,13 @@
 				fr2Index = linesplt.index("FR2.IMGT")
 				cdr2Index = linesplt.index("CDR2.IMGT")
 				fr3Index = linesplt.index("FR3.IMGT")
-				cdr1LengthIndex = linesplt.index("CDR1.IMGT.length")
-				cdr2LengthIndex = linesplt.index("CDR2.IMGT.length")
-				fr1SeqIndex = linesplt.index("FR1.IMGT.seq")
-				fr2SeqIndex = linesplt.index("FR2.IMGT.seq")
-				fr3SeqIndex = linesplt.index("FR3.IMGT.seq")
+				fr1LengthIndex = linesplt.index("FR1.IMGT.Nb.of.nucleotides")
+				fr2LengthIndex = linesplt.index("FR2.IMGT.Nb.of.nucleotides")
+				fr3LengthIndex = linesplt.index("FR3.IMGT.Nb.of.nucleotides")
+				cdr1LengthIndex = linesplt.index("CDR1.IMGT.Nb.of.nucleotides")
+				cdr2LengthIndex = linesplt.index("CDR2.IMGT.Nb.of.nucleotides")
+				cdr1AALengthIndex = linesplt.index("CDR1.IMGT.length")
+				cdr2AALengthIndex = linesplt.index("CDR2.IMGT.length")
 				first = False
 				continue
 			linecount += 1
@@ -84,50 +154,38 @@
 			
 			mutationdic[ID + "_FR1"] = []
 			if len(linesplt[fr1Index]) > 5 and empty_region_filter == "leader":
-				mutationdic[ID + "_FR1"] = [mutationMatcher.match(x).groups() for x in linesplt[fr1Index].split("|") if x]
+				mutationdic[ID + "_FR1"] = [Mutation.from_string(x) for x in linesplt[fr1Index].split("|") if x]
 
 			mutationdic[ID + "_CDR1"] = []
 			if len(linesplt[cdr1Index]) > 5 and empty_region_filter in ["leader", "FR1"]:
-				mutationdic[ID + "_CDR1"] = [mutationMatcher.match(x).groups() for x in linesplt[cdr1Index].split("|") if x]
+				mutationdic[ID + "_CDR1"] = [Mutation.from_string(x) for x in linesplt[cdr1Index].split("|") if x]
 
 			mutationdic[ID + "_FR2"] = []
 			if len(linesplt[fr2Index]) > 5 and empty_region_filter in ["leader", "FR1", "CDR1"]:
-				mutationdic[ID + "_FR2"] = [mutationMatcher.match(x).groups() for x in linesplt[fr2Index].split("|") if x]
+				mutationdic[ID + "_FR2"] = [Mutation.from_string(x) for x in linesplt[fr2Index].split("|") if x]
 
 			mutationdic[ID + "_CDR2"] = []
 			if len(linesplt[cdr2Index]) > 5:
-				mutationdic[ID + "_CDR2"] = [mutationMatcher.match(x).groups() for x in linesplt[cdr2Index].split("|") if x]
+				mutationdic[ID + "_CDR2"] = [Mutation.from_string(x) for x in linesplt[cdr2Index].split("|") if x]
 			
 			mutationdic[ID + "_FR2-CDR2"] = mutationdic[ID + "_FR2"] + mutationdic[ID + "_CDR2"]
 
 			mutationdic[ID + "_FR3"] = []
 			if len(linesplt[fr3Index]) > 5:
-				mutationdic[ID + "_FR3"] = [mutationMatcher.match(x).groups() for x in linesplt[fr3Index].split("|") if x]
+				mutationdic[ID + "_FR3"] = [Mutation.from_string(x) for x in linesplt[fr3Index].split("|") if x]
 				
 			mutationList += mutationdic[ID + "_FR1"] + mutationdic[ID + "_CDR1"] + mutationdic[ID + "_FR2"] + mutationdic[ID + "_CDR2"] + mutationdic[ID + "_FR3"]
 			mutationListByID[ID] = mutationdic[ID + "_FR1"] + mutationdic[ID + "_CDR1"] + mutationdic[ID + "_FR2"] + mutationdic[ID + "_CDR2"] + mutationdic[ID + "_FR3"]
 
-			try:
-				cdr1Length = int(linesplt[cdr1LengthIndex])
-			except:
-				cdr1Length = 0
-			
-			try:
-				cdr2Length = int(linesplt[cdr2LengthIndex])
-			except:
-				cdr2Length = 0
+			fr1Length = int(linesplt[fr1LengthIndex])
+			fr2Length = int(linesplt[fr2LengthIndex])
+			fr3Length = int(linesplt[fr3LengthIndex])
+			cdr1Length = int(linesplt[cdr1LengthIndex])
+			cdr2Length = int(linesplt[cdr2LengthIndex])
+			LengthDic[ID] = (fr1Length, cdr1Length, fr2Length, cdr2Length, fr3Length)
 
-			#print linesplt[fr2SeqIndex]
-			fr1Length = len(linesplt[fr1SeqIndex]) if empty_region_filter == "leader" else 0
-			fr2Length = len(linesplt[fr2SeqIndex]) if empty_region_filter in ["leader", "FR1", "CDR1"] else 0
-			fr3Length = len(linesplt[fr3SeqIndex])
-
-			cdr1LengthDic[ID] = cdr1Length
-			cdr2LengthDic[ID] = cdr2Length
-
-			fr1LengthDict[ID] = fr1Length
-			fr2LengthDict[ID] = fr2Length
-			fr3LengthDict[ID] = fr3Length
+			cdr1AALengthDic[ID] = int(linesplt[cdr1AALengthIndex])
+			cdr2AALengthDic[ID] = int(linesplt[cdr2AALengthIndex])
 
 			IDlist += [ID]
 	print("len(mutationdic) =", len(mutationdic))
@@ -155,10 +213,20 @@
 	tandem_file = os.path.join(os.path.dirname(outfile), "tandems_by_id.txt")
 	with open(tandem_file, 'w') as o:
 		highest_tandem_length = 0
+		# LengthDic stores length as a tuple
+		# (fr1Length, cdr1Length, fr2Length, cdr2Length, fr3Length)
+		# To get the total length, we can sum(region_lengths)
+		# To get the total length for leader:
+		# sum(region_lengths[0:]) (Equivalent to everything)
+		# sum(region_lengths[1:]) Gets everything except FR1 etc.
+		# We determine the position to start summing below.
+		# This returns 0 for leader, 1 for FR1 etc.
+		length_start_pos = REGION_FILTERS.index(empty_region_filter)
 
 		o.write("Sequence.ID\tnumber_of_mutations\tnumber_of_tandems\tregion_length\texpected_tandems\tlongest_tandem\ttandems\n")
 		for ID in IDlist:
 			mutations = mutationListByID[ID]
+			region_length = sum(LengthDic[ID][length_start_pos:])
 			if len(mutations) == 0:
 				continue
 			last_mut = max(mutations, key=lambda x: int(x[1]))
@@ -194,19 +262,16 @@
 				if highest_tandem_length < len(tandem_muts):
 					highest_tandem_length = len(tandem_muts)
 
-			region_length = fr1LengthDict[ID] + cdr1LengthDic[ID] + fr2LengthDict[ID] + cdr2LengthDic[ID] + fr3LengthDict[ID]
 			longest_tandem = max(tandem_muts, key=lambda x: x[1]) if len(tandem_muts) else (0, 0)
 			num_mutations = mutations_by_id_dic[ID] # len(mutations)
 			f_num_mutations = float(num_mutations)
 			num_tandem_muts = len(tandem_muts)
 			expected_tandem_muts = f_num_mutations * (f_num_mutations - 1.0) / float(region_length)
-			o.write("{0}\t{1}\t{2}\t{3}\t{4}\t{5}\t{6}\n".format(ID,
-																str(num_mutations),
-																str(num_tandem_muts),
-																str(region_length),
-																str(round(expected_tandem_muts, 2)),
-																str(longest_tandem[1]),
-																str(tandem_muts)))
+			# String format and round disagree slightly (see 3.605).
+			# So round before formatting.
+			o.write(f"{ID}\t{num_mutations}\t{num_tandem_muts}\t{region_length}\t"
+					f"{round(expected_tandem_muts, 2):.2f}\t"  
+					f"{longest_tandem[1]}\t{tandem_muts}\n")
 			gene = genedic[ID]
 			if gene.find("unmatched") == -1:
 				tandem_sum_by_class[gene] += num_tandem_muts
@@ -301,11 +366,11 @@
 	absentAACDR2Dic[9] = [60]
 
 	absentAA = [len(IDlist)] * (AALength-1)
-	for k, cdr1Length in cdr1LengthDic.items():
+	for k, cdr1Length in cdr1AALengthDic.items():
 		for c in absentAACDR1Dic[cdr1Length]:
 			absentAA[c] -= 1
 
-	for k, cdr2Length in cdr2LengthDic.items():
+	for k, cdr2Length in cdr2AALengthDic.items():
 		for c in absentAACDR2Dic[cdr2Length]:
 			absentAA[c] -= 1
 
@@ -315,11 +380,11 @@
 		o.write("ID\tcdr1length\tcdr2length\tbest_match\t" + "\t".join([str(x) for x in range(1,AALength)]) + "\n")
 		for ID in IDlist:
 			absentAAbyID = [1] * (AALength-1)
-			cdr1Length = cdr1LengthDic[ID]
+			cdr1Length = cdr1AALengthDic[ID]
 			for c in absentAACDR1Dic[cdr1Length]:
 				absentAAbyID[c] -= 1
 
-			cdr2Length = cdr2LengthDic[ID]
+			cdr2Length = cdr2AALengthDic[ID]
 			for c in absentAACDR2Dic[cdr2Length]:
 				absentAAbyID[c] -= 1
 			o.write(ID + "\t" + str(cdr1Length) + "\t" + str(cdr2Length) + "\t" + genedic[ID] + "\t" + "\t".join([str(x) for x in absentAAbyID]) + "\n")
@@ -333,7 +398,6 @@
 			o.write("TW (%)," + ("0,0,0\n" * len(genes)))
 		sys.exit()
 
-	hotspotMatcher = re.compile("[actg]+,(\d+)-(\d+)\((.*)\)")
 	RGYWCount = {}
 	WRCYCount = {}
 	WACount = {}
@@ -358,14 +422,10 @@
 			linesplt = line.split("\t")
 			gene = linesplt[best_matchIndex]
 			ID = linesplt[IDIndex]
-			RGYW = [(int(x), int(y), z) for (x, y, z) in
-					[hotspotMatcher.match(x).groups() for x in linesplt[aggctatIndex].split("|") if x]]
-			WRCY = [(int(x), int(y), z) for (x, y, z) in
-					[hotspotMatcher.match(x).groups() for x in linesplt[atagcctIndex].split("|") if x]]
-			WA = [(int(x), int(y), z) for (x, y, z) in
-				[hotspotMatcher.match(x).groups() for x in linesplt[ataIndex].split("|") if x]]
-			TW = [(int(x), int(y), z) for (x, y, z) in
-				[hotspotMatcher.match(x).groups() for x in linesplt[tatIndex].split("|") if x]]
+			RGYW = [Hotspot.from_string(x) for x in linesplt[aggctatIndex].split("|") if x]
+			WRCY = [Hotspot.from_string(x) for x in linesplt[atagcctIndex].split("|") if x]
+			WA = [Hotspot.from_string(x) for x in linesplt[ataIndex].split("|") if x]
+			TW = [Hotspot.from_string(x) for x in linesplt[tatIndex].split("|") if x]
 			RGYWCount[ID], WRCYCount[ID], WACount[ID], TWCount[ID] = 0, 0, 0, 0
 
 			with open(os.path.join(os.path.dirname(os.path.abspath(infile)), "RGYW.txt"), 'a') as out_handle:
@@ -417,7 +477,7 @@
 								out_handle.write("{0}\n".format(
 									"\t".join([
 										ID,
-										where,
+										str(where),
 										region,
 										frm,
 										to,
@@ -483,11 +543,10 @@
 				o.write(typ + " (%)")
 				curr = dic[typ]
 				for gene in genes:
-					geneMatcher = geneMatchers[gene]
-					if valuedic[gene + "_" + fname] is 0:
+					if valuedic[gene + "_" + fname] == 0:
 						o.write(",0,0,0")
 					else:
-						x, y, z = get_xyz([curr[x] for x in [y for y, z in genedic.items() if geneMatcher.match(z)]], gene, func, fname)
+						x, y, z = get_xyz([curr[x] for x in [y for y, z in genedic.items() if z.startswith(gene)]], gene, func, fname)
 						o.write("," + x + "," + y + "," + z)
 				x, y, z = get_xyz([y for x, y in curr.items() if not genedic[x].startswith("unmatched")], "total", func, fname)
 				#x, y, z = get_xyz([y for x, y in curr.iteritems()], "total", func, fname)