comparison _modules/readsCoverage_res.py @ 0:69e8f12c8b31 draft

"planemo upload"
author bioit_sciensano
date Fri, 11 Mar 2022 15:06:20 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:69e8f12c8b31
1 ##@file readsCoverage_res.py
2 # Compact structure to store partial results of readsCoverage for later processing; used in multi machine mode and for checkpoints.
3 #
4 #@author vlegrand@pasteur.fr
5 import numpy as np
6 import os
7 import time
8
9 base_chk_fname="chk_"
10 chk_fname_sep="_"
11
12
13 ## Utility classes for testing the checkpoint implementation
14 # class checkpoint_visitor:
15 # def __str__(self):
16 # return self.__class__.__name__
17 #
18 # class checkpoint_visitor_11150_Cos5(checkpoint_visitor):
19 # def visit(self,chk_res):
20 # if chk_res.host_len!=0 or chk_res.gen!=25 or chk_res.reads_tested!=2:
21 # return False
22 # return True
23 #
24 # class checkpoint_visitor_38_Cos5(checkpoint_visitor):
25 # def visit(self,chk_res):
26 # if chk_res.host_len!=0 or chk_res.gen!=25 or chk_res.reads_tested!=2:
27 # return False
28 # return True
29
30
31
32
33
34
35 def loadArr(arr_idx0,arr_val0,arr_idx1,arr_val1,arr2D):
36 for idx, val in zip(arr_idx0, arr_val0):
37 arr2D[0][idx] = val
38
39 for idx, val in zip(arr_idx1, arr_val1):
40 arr2D[1][idx] = val
41
42
43 def loadRCRes(filename):
44 npzfile = np.load(filename)
45 gen_len=npzfile['gen_len']
46 gen_len=int(gen_len)
47 host_len=npzfile['host_len']
48 host_len=int(host_len)
49 termini_coverage_idx0 = npzfile['termini_coverage_idx0']
50 termini_coverage_val0=npzfile['termini_coverage_val0']
51 termini_coverage_idx1 = npzfile['termini_coverage_idx1']
52 termini_coverage_val1 = npzfile['termini_coverage_val1']
53
54 whole_coverage_idx0=npzfile['whole_coverage_idx0']
55 whole_coverage_val0 = npzfile['whole_coverage_val0']
56 whole_coverage_idx1 = npzfile['whole_coverage_idx1']
57 whole_coverage_val1 = npzfile['whole_coverage_val1']
58
59 paired_whole_coverage_idx0=npzfile['paired_whole_coverage_idx0']
60 paired_whole_coverage_val0 = npzfile['paired_whole_coverage_val0']
61 paired_whole_coverage_idx1 = npzfile['paired_whole_coverage_idx1']
62 paired_whole_coverage_val1 = npzfile['paired_whole_coverage_val1']
63
64 phage_hybrid_coverage_idx0=npzfile['phage_hybrid_coverage_idx0']
65 phage_hybrid_coverage_val0 = npzfile['phage_hybrid_coverage_val0']
66 phage_hybrid_coverage_idx1 = npzfile['phage_hybrid_coverage_idx0']
67 phage_hybrid_coverage_val1 = npzfile['phage_hybrid_coverage_idx1']
68
69 host_hybrid_coverage_idx0 = npzfile['host_hybrid_coverage_idx0']
70 host_hybrid_coverage_val0 = npzfile['host_hybrid_coverage_val0']
71 host_hybrid_coverage_idx1 = npzfile['host_hybrid_coverage_idx1']
72 host_hybrid_coverage_val1 = npzfile['host_hybrid_coverage_val1']
73
74 host_whole_coverage_idx0 = npzfile['host_whole_coverage_idx0']
75 host_whole_coverage_val0 = npzfile['host_whole_coverage_val0']
76 host_whole_coverage_idx1 = npzfile['host_whole_coverage_idx1']
77 host_whole_coverage_val1 = npzfile['host_whole_coverage_val1']
78
79 list_hybrid=npzfile['list_hybrid']
80 insert=npzfile['insert']
81 insert=list(insert)
82 paired_mismatch=npzfile['paired_mismatch']
83 reads_tested=npzfile['reads_tested']
84
85 termini_coverage=np.array([gen_len*[0], gen_len*[0]])
86
87 whole_coverage = np.array([gen_len*[0], gen_len*[0]])
88 paired_whole_coverage = np.array([gen_len*[0], gen_len*[0]])
89 phage_hybrid_coverage = np.array([gen_len*[0], gen_len*[0]])
90 host_hybrid_coverage = np.array([host_len*[0], host_len*[0]])
91 host_whole_coverage = np.array([host_len*[0], host_len*[0]])
92 loadArr(termini_coverage_idx0,termini_coverage_val0,termini_coverage_idx1,termini_coverage_val1,termini_coverage)
93 loadArr(whole_coverage_idx0,whole_coverage_val0,whole_coverage_idx1,whole_coverage_val1,whole_coverage)
94 loadArr(paired_whole_coverage_idx0,paired_whole_coverage_val0,paired_whole_coverage_idx1,paired_whole_coverage_val1,paired_whole_coverage)
95 loadArr(phage_hybrid_coverage_idx0,phage_hybrid_coverage_val0,phage_hybrid_coverage_idx1,phage_hybrid_coverage_val1,phage_hybrid_coverage)
96 loadArr(host_hybrid_coverage_idx0,host_hybrid_coverage_val0,host_hybrid_coverage_idx1,host_hybrid_coverage_val1,host_hybrid_coverage)
97 loadArr(host_whole_coverage_idx0,host_whole_coverage_val0,host_whole_coverage_idx1,host_whole_coverage_val1,host_whole_coverage)
98
99 res=RCRes(termini_coverage,whole_coverage,paired_whole_coverage,\
100 phage_hybrid_coverage, host_hybrid_coverage,\
101 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested)
102
103 return res
104
105 ##
106 # Working structure for readsCoverage (encapsulating temporary results)
107 class RCWorkingS:
108 def __init__(self,rc_res,cnt_line,read_match):
109 self.interm_res=rc_res
110 self.count_line=cnt_line
111 self.read_match=read_match
112
113 class RCRes:
114 def __init__(self,termini_coverage,whole_coverage,paired_whole_coverage,\
115 phage_hybrid_coverage, host_hybrid_coverage, \
116 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested):
117
118 self.termini_coverage=termini_coverage
119 self.whole_coverage=whole_coverage
120 self.paired_whole_coverage=paired_whole_coverage
121 self.phage_hybrid_coverage=phage_hybrid_coverage
122 self.host_hybrid_coverage=host_hybrid_coverage
123 self.host_whole_coverage=host_whole_coverage
124
125 self.list_hybrid=list_hybrid
126 self.insert=insert
127 self.paired_mismatch=paired_mismatch
128 self.reads_tested=reads_tested
129
130 self.gen_len = len(self.termini_coverage[0])
131 self.host_len= len(self.host_hybrid_coverage[0])
132
133 # def accept(self,a_visitor):
134 # self.vis=a_visitor
135 #
136 # def make_visit(self):
137 # self.vis.visit()
138
139 def save(self,filename):
140 termini_coverage_idx0 = np.flatnonzero(self.termini_coverage[0])
141 termini_coverage_val0 = self.termini_coverage[0][termini_coverage_idx0]
142 termini_coverage_idx1 = np.flatnonzero(self.termini_coverage[1])
143 termini_coverage_val1 = self.termini_coverage[1][termini_coverage_idx1]
144
145 whole_coverage_idx0 = np.flatnonzero(self.whole_coverage[0])
146 whole_coverage_val0 = self.whole_coverage[0][whole_coverage_idx0]
147 whole_coverage_idx1 = np.flatnonzero(self.whole_coverage[1])
148 whole_coverage_val1 = self.whole_coverage[1][whole_coverage_idx1]
149
150 paired_whole_coverage_idx0 = np.flatnonzero(self.paired_whole_coverage[0])
151 paired_whole_coverage_val0 = self.paired_whole_coverage[0][paired_whole_coverage_idx0]
152 paired_whole_coverage_idx1 = np.flatnonzero(self.paired_whole_coverage[1])
153 paired_whole_coverage_val1 = self.paired_whole_coverage[1][paired_whole_coverage_idx1]
154
155 phage_hybrid_coverage_idx0 = np.flatnonzero(self.phage_hybrid_coverage[0])
156 phage_hybrid_coverage_val0 = self.phage_hybrid_coverage[0][phage_hybrid_coverage_idx0]
157 phage_hybrid_coverage_idx1 = np.flatnonzero(self.phage_hybrid_coverage[1])
158 phage_hybrid_coverage_val1 = self.phage_hybrid_coverage[1][phage_hybrid_coverage_idx1]
159
160 host_hybrid_coverage_idx0 = np.flatnonzero(self.host_hybrid_coverage[0])
161 host_hybrid_coverage_val0 = self.host_hybrid_coverage[0][host_hybrid_coverage_idx0]
162 host_hybrid_coverage_idx1 = np.flatnonzero(self.host_hybrid_coverage[1])
163 host_hybrid_coverage_val1 = self.host_hybrid_coverage[1][host_hybrid_coverage_idx1]
164
165 host_whole_coverage_idx0 = np.flatnonzero(self.host_whole_coverage[0])
166 host_whole_coverage_val0 = self.host_whole_coverage[0][host_whole_coverage_idx0]
167 host_whole_coverage_idx1 = np.flatnonzero(self.host_whole_coverage[1])
168 host_whole_coverage_val1 = self.host_whole_coverage[1][host_whole_coverage_idx1]
169
170 np.savez(filename,gen_len=np.array(self.gen_len),host_len=np.array(self.host_len),\
171 termini_coverage_idx0=termini_coverage_idx0, termini_coverage_val0=termini_coverage_val0,\
172 termini_coverage_idx1=termini_coverage_idx1, termini_coverage_val1=termini_coverage_val1,\
173 whole_coverage_idx0=whole_coverage_idx0,whole_coverage_val0=whole_coverage_val0,\
174 whole_coverage_idx1=whole_coverage_idx1,whole_coverage_val1=whole_coverage_val1,\
175 paired_whole_coverage_idx0=paired_whole_coverage_idx0,paired_whole_coverage_val0=paired_whole_coverage_val0,\
176 paired_whole_coverage_idx1=paired_whole_coverage_idx1,paired_whole_coverage_val1=paired_whole_coverage_val1, \
177 phage_hybrid_coverage_idx0=phage_hybrid_coverage_idx0,phage_hybrid_coverage_val0=phage_hybrid_coverage_val0, \
178 phage_hybrid_coverage_idx1=phage_hybrid_coverage_idx1,phage_hybrid_coverage_val1=phage_hybrid_coverage_val1, \
179 host_hybrid_coverage_idx0=host_hybrid_coverage_idx0,host_hybrid_coverage_val0=host_hybrid_coverage_val0, \
180 host_hybrid_coverage_idx1=host_hybrid_coverage_idx1,host_hybrid_coverage_val1=host_hybrid_coverage_val1, \
181 host_whole_coverage_idx0=host_whole_coverage_idx0,host_whole_coverage_val0=host_whole_coverage_val0, \
182 host_whole_coverage_idx1=host_whole_coverage_idx1,host_whole_coverage_val1=host_whole_coverage_val1, \
183 list_hybrid=self.list_hybrid,insert=self.insert,paired_mismatch=np.array(self.paired_mismatch),\
184 reads_tested=self.reads_tested)
185
186
187 class RCCheckpoint:
188 def __init__(self,count_line,core_id,idx_seq,termini_coverage,whole_coverage,paired_whole_coverage,\
189 phage_hybrid_coverage, host_hybrid_coverage, \
190 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested,read_match):
191 self.count_line=count_line
192 self.core_id=core_id
193 self.idx_seq=idx_seq
194 self.read_match=read_match
195 self.res=RCRes(termini_coverage,whole_coverage,paired_whole_coverage,\
196 phage_hybrid_coverage, host_hybrid_coverage, \
197 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested)
198
199
200 def save(self,dir_chk,core_id,idx_refseq):
201 filename=base_chk_fname+str(self.core_id)+chk_fname_sep+str(self.idx_seq)+chk_fname_sep+\
202 str(self.count_line)+chk_fname_sep+str(self.read_match)
203 full_fname = os.path.join(dir_chk, filename)
204 self.res.save(full_fname)
205 # remove old breakpoint file
206 list_f=os.listdir(dir_chk)
207 sub_s=base_chk_fname+ str(core_id) + chk_fname_sep + str(idx_refseq) + chk_fname_sep
208 for f in list_f:
209 if f!=filename+".npz" and sub_s in f:
210 os.remove(os.path.join(dir_chk,f))
211
212
213 class RCCheckpoint_handler:
214 def __init__(self,chk_freq,dir_chk,test_mode=False):
215 self.chk_freq=chk_freq
216 self.test_mode = test_mode
217 self.start_t=0
218 self.dir_chk = dir_chk
219 # if self.test_mode == True:
220 # self.v38_C5 = checkpoint_visitor_38_Cos5()
221 # self.v11150_C5 = checkpoint_visitor_11150_Cos5()
222 if self.test_mode==True:
223 self.start_t = time.perf_counter_ns()
224 if os.path.exists(dir_chk):
225 if not (os.path.isdir(dir_chk)):
226 raise RuntimeError("dir_chk must point to a directory")
227 else:
228 os.mkdir(dir_chk)
229 elif self.chk_freq!=0:
230 if os.path.exists(dir_chk):
231 if not (os.path.isdir(dir_chk)):
232 raise RuntimeError("dir_chk must point to a directory")
233 else:
234 raise RuntimeError("dir_chk must point to an existing directory")
235
236 def getIdxSeq(self,core_id):
237 idx_seq=0
238 if self.chk_freq!=0 or self.test_mode==True:
239 list_f = os.listdir(self.dir_chk)
240 subfname = base_chk_fname+ str(core_id) + chk_fname_sep
241 chk_f = ""
242 for fname in list_f:
243 if subfname in fname:
244 chk_f = fname
245 break
246 if chk_f != "":
247 l=chk_f.split(chk_fname_sep)
248 idx_seq=int(l[2])
249 return idx_seq
250
251
252 def load(self,core_id,idx_refseq):
253 if self.chk_freq!=0 or self.test_mode==True:
254 list_f = os.listdir(self.dir_chk)
255 subfname = base_chk_fname+ str(core_id) + chk_fname_sep + str(idx_refseq) + chk_fname_sep
256 chk_f = ""
257 for fname in list_f:
258 if subfname in fname:
259 chk_f = fname
260 break
261 if chk_f != "":
262 interm_res=loadRCRes(os.path.join(self.dir_chk,chk_f))
263 # if self.test_mode==True:
264 # interm_res.accept(self.v38_C5)
265 l=chk_f.split(chk_fname_sep)
266 cnt_line=int(l[-2])
267 tmp=l[-1] # get rid of .npz extension
268 l2=tmp.split(".")
269 read_match=int(l2[0])
270 partial_res=RCWorkingS(interm_res,cnt_line,read_match)
271 # if self.test_mode:
272 # partial_res.accept(self.v38_C5)
273 # partial_res.make_visit()
274 return partial_res
275 else: # no checkpoint found for this sequence, start from beginning
276 return None
277 else:
278 return None
279
280
281 def check(self,count_line,core_id,idx_seq,termini_coverage,whole_coverage,paired_whole_coverage,\
282 phage_hybrid_coverage, host_hybrid_coverage, \
283 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested,read_match):
284 cur_t = time.perf_counter_ns()
285 elapsed_t = cur_t - self.start_t
286 #convert elapsed_t tp to seconds
287 elaspsed_t=elapsed_t * 0.000000001
288 if (self.test_mode==True or (self.chk_freq!=0 and (elapsed_t % self.chk_freq == 0))): # time to create checkpoint.
289 chkp=RCCheckpoint(count_line,core_id,idx_seq,termini_coverage,whole_coverage,paired_whole_coverage,\
290 phage_hybrid_coverage, host_hybrid_coverage, \
291 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested,read_match)
292 chkp.save(self.dir_chk,core_id,idx_seq)
293
294
295 def end(self,core_id):
296 if (self.test_mode==False and self.chk_freq!=0) :
297 # remove old breakpoint file
298 list_f = os.listdir(self.dir_chk)
299 sub_s=base_chk_fname+str(core_id)+chk_fname_sep
300 for f in list_f:
301 if sub_s in f:
302 os.remove(os.path.join(self.dir_chk, f))
303
304
305
306
307
308
309
310
311