comparison _modules/utilities.py @ 0:69e8f12c8b31 draft

"planemo upload"
author bioit_sciensano
date Fri, 11 Mar 2022 15:06:20 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:69e8f12c8b31
1 ## @file utilities.py
2 #
3 # Gather here utility methods for phageterm. Used in both CPU and GPU version.
4 #from string import maketrans
5 import re
6 import random
7 import sys
8
9 import numpy as np
10 import datetime
11
12 if sys.version_info < (3,):
13 import string
14 TRANSTAB = string.maketrans("ACGTN", "TGCAN")
15 else:
16 TRANSTAB = str.maketrans("ACGTN", "TGCAN")
17
18 def checkReportTitle(report_title):
19 """Normalise report title (take out any special char)"""
20 default_title="Analysis_"
21 right_now=datetime.datetime.now()
22 default_title+=str(right_now.month)
23 default_title+=str(right_now.day)
24 default_title+="_"
25 default_title+=str(right_now.hour)
26 default_title+=str(right_now.minute)
27 titleNorm = ""
28 charok = list(range(48,58)) + list(range(65,91)) + list(range(97,123)) + [45,95]
29 for char in report_title:
30 if ord(char) in charok:
31 titleNorm += char
32 if len(titleNorm) > 1:
33 return titleNorm[:20]
34 else:
35 return default
36
37 ### SEQUENCE manipulation function
38 def changeCase(seq):
39 """Change lower case to UPPER CASE for a sequence string."""
40 return seq.upper()
41
42
43 def reverseComplement(seq, transtab=str.maketrans('ATGCN', 'TACGN')):
44 """Reverse Complement a sequence."""
45 return changeCase(seq).translate(transtab)[::-1]
46
47 def longest_common_substring(read, refseq):
48 """Longest common substring between two strings."""
49 m = [[0] * (1 + len(refseq)) for i in range(1 + len(read))]
50 longest, x_longest = 0, 0
51 for x in range(1, 1 + len(read)):
52 for y in range(1, 1 + len(refseq)):
53 if read[x - 1] == refseq[y - 1]:
54 m[x][y] = m[x - 1][y - 1] + 1
55 if m[x][y] > longest:
56 longest = m[x][y]
57 x_longest = x
58 else:
59 m[x][y] = 0
60 return read[x_longest - longest: x_longest]
61
62 def hybridCoverage(read, sequence, hybrid_coverage, start, end):
63 """Return hybrid coverage."""
64 aligned_part_only = longest_common_substring(read, sequence[start:end])
65 for i in range(start, min(len(sequence),start+len(aligned_part_only))):
66 hybrid_coverage[i]+=1
67 return hybrid_coverage
68
69 ## Determines if readPart maps against Sequence.
70 #
71 # @param readPart A part of a read (seed characters usually)
72 # @param sequence (a contig)
73 # It choses randomly a mapping position amongst all mappings found.
74 # It returns 2 numbers: the start and stop position of the chosen mapping location.
75 def applyCoverage(readPart, sequence):
76 """Return a random match of a read onto the sequence. """
77 position = []
78 for pos in re.finditer(readPart,sequence):
79 position.append(pos)
80 if len(position) > 0:
81 match = random.choice(position)
82 return match.start(), match.end()
83 else:
84 return -1, -1
85
86 def correctEdge(coverage, edge):
87 """Correction of the Edge coverage. """
88 correctCov = np.array([len(coverage[0])*[0], len(coverage[0])*[0]])
89 End = len(coverage[0])
90 covSta = range(edge)
91 covEnd = range(End-edge,End)
92 for i in range(len(coverage)):
93 for j in range(len(coverage[i])):
94 correctCov[i][j] = coverage[i][j]
95 for k in covSta:
96 correctCov[i][k+edge] += coverage[i][k+End-edge]
97 for l in covEnd:
98 correctCov[i][l-edge] += coverage[i][l-End+edge]
99 return correctCov
100
101 # utility class for storing results of decisionProcess function
102 class DecisionProcessOutput:
103 def __init__(self, Redundant, Permuted, P_class, P_type, P_seqcoh, P_concat,
104 P_orient, P_left, P_right, Mu_like):
105 pass
106