0
|
1 #!/usr/bin/env python
|
|
2 """
|
|
3 Bisulfighter::bsf-call
|
|
4
|
|
5 Bisulfighter (http://epigenome.cbrc.jp/bisulfighter)
|
|
6 by National Institute of Advanced Industrial Science and Technology (AIST)
|
|
7 is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License.
|
|
8 http://creativecommons.org/licenses/by-nc-sa/3.0/
|
|
9 """
|
|
10
|
|
11 __version__= "1.3"
|
|
12
|
|
13 import sys
|
|
14 import os
|
|
15 import glob
|
|
16 import threading
|
|
17 import subprocess
|
|
18 import Queue
|
|
19 from datetime import datetime
|
|
20 import hashlib
|
|
21 from string import maketrans
|
|
22 import gzip
|
|
23 import bz2
|
|
24 import zipfile
|
|
25 import logging
|
|
26 from time import sleep
|
|
27 from shutil import copy
|
|
28 import re
|
|
29 # import pysam
|
|
30
|
|
31 class BsfCallBase(object):
|
|
32 """
|
|
33 base class for BsfCall, LastExecutor, McDetector.
|
|
34 define functions that is used in each sub classes.
|
|
35 """
|
|
36
|
|
37 def splitFilePath(self, filePath):
|
|
38 """
|
|
39 split file path to directory path, file name (with file extension),
|
|
40 file name (without file extension) and file extension.
|
|
41 if extension of filePath is '.gz', '.gz' extension is ignored.
|
|
42 """
|
|
43 dir_name, file_name = os.path.split(filePath)
|
|
44 base_name, ext = os.path.splitext(file_name)
|
|
45 prog = None
|
|
46 if (ext == '.gz' or ext == '.gzip' or ext == '.bz2' or ext == '.bzip2' or ext == '.zip'):
|
|
47 prog = ext[1:]
|
|
48 base_name, ext = os.path.splitext(base_name)
|
|
49 if len(ext) > 1:
|
|
50 ext = ext[1:]
|
|
51 return (dir_name, file_name, base_name, ext, prog)
|
|
52
|
|
53
|
|
54 def readNameByReadFile(self, readFilePath):
|
|
55 """
|
|
56 get read name by read file path.
|
|
57 """
|
|
58 dir_name, file_name, read_name, ext, prog = self.splitFilePath(readFilePath)
|
|
59 return read_name
|
|
60
|
|
61
|
|
62 def secondReadFilePathByFirstReadFilePath(self, readFile, secondReadType = None):
|
|
63 """
|
|
64 get second read file path by first read file path.
|
|
65 if first read file path is '/path/to/read_file_1.fastq' second read file
|
|
66 path is '/path/to/read_file_2.fastq'
|
|
67 if secondReadType is specified, the extension of second read file is its
|
|
68 value.
|
|
69 """
|
|
70 fpath = ""
|
|
71 dir_name, file_name, basename, ext, prog = self.splitFilePath(readFile)
|
|
72 if secondReadType:
|
|
73 ext = ".%s" % secondReadType
|
|
74 if prog is not None:
|
|
75 fpath = "%s/%s2%s.%s" % (dir_name, basename[0:-1], ext, prog)
|
|
76 else:
|
|
77 fpath = "%s/%s2%s" % (dir_name, basename[0:-1], ext)
|
|
78 return fpath
|
|
79
|
|
80
|
|
81 def pairedEndReadNumbers(self):
|
|
82 return (1, 2)
|
|
83
|
|
84
|
|
85 def clearGap(self, seq):
|
|
86 return seq.replace("-", "")
|
|
87
|
|
88
|
|
89 def complementStartPosition(self, genomeLen, subseqStart, subseqLen):
|
|
90 return genomeLen - subseqStart - subseqLen
|
|
91
|
|
92
|
|
93 def complementSeq(self, seq):
|
|
94 return seq.translate(maketrans("ATGCatgc", "TACGtacg"))[::-1]
|
|
95
|
|
96
|
|
97 def mcContextType(self, genomeSeq, cBasePos, strand='+'):
|
|
98 """
|
|
99 get mC context type (CG, CHG, CHH) by genome sequence and C base position.
|
|
100 if no mC context found, return None.
|
|
101 """
|
|
102
|
|
103 try:
|
|
104 if strand == '+':
|
|
105 if genomeSeq[cBasePos + 1] == "G":
|
|
106 return "CG"
|
|
107 else:
|
|
108 if genomeSeq[cBasePos + 2] == "G":
|
|
109 return "CHG"
|
|
110 else:
|
|
111 return "CHH"
|
|
112 elif strand == '-':
|
|
113 if genomeSeq[cBasePos - 1] == "C":
|
|
114 return "CG"
|
|
115 else:
|
|
116 if genomeSeq[cBasePos - 2] == "C":
|
|
117 return "CHG"
|
|
118 else:
|
|
119 return "CHH"
|
|
120 return None
|
|
121 except IndexError:
|
|
122 return None
|
|
123
|
|
124
|
|
125 def chrSort(self, a, b):
|
|
126 return cmp(a, b)
|
|
127
|
|
128
|
|
129 def strands(self):
|
|
130 return ("+", "-")
|
|
131
|
|
132
|
|
133 def mcContextTypes(self):
|
|
134 return ("CG", "CHG", "CHH")
|
|
135
|
|
136
|
|
137 def bzip2File(self, filePath, wait = True, log = False):
|
|
138 """
|
|
139 bzip2 file. If wait argument is False, without waiting for bzip2 process to be
|
|
140 completed, this function returns immediately.
|
|
141 """
|
|
142
|
|
143 if log:
|
|
144 logging.info("bzip2 start: %s" % filePath)
|
|
145
|
|
146 dirpath, fname = os.path.split(filePath)
|
|
147 cmd = "bzip2 %s" % fname
|
|
148 p = subprocess.Popen(cmd, shell = True, cwd = dirpath)
|
|
149 if wait:
|
|
150 p.wait()
|
|
151
|
|
152 if log:
|
|
153 logging.info("bzip2 done: %s" % filePath)
|
|
154
|
|
155
|
|
156 def isGzipFile(self, filePath):
|
|
157 return filePath[-3:] == ".gz" or filePath[-5:] == ".gzip"
|
|
158
|
|
159
|
|
160 def isBzip2File(self, filePath):
|
|
161 return filePath[-4:] == ".bz2" or filePath[-6:] == ".bzip2"
|
|
162
|
|
163
|
|
164 def isZipFile(self, filePath):
|
|
165 return filePath[-4:] == ".zip"
|
|
166
|
|
167
|
|
168 def isMafFile(self, filePath):
|
|
169 f = open(filePath, "rb")
|
|
170 data = f.read(1)
|
|
171 f.close()
|
|
172 if re.match("[\x20-\x7E]", data):
|
|
173 f = open(filePath, "r")
|
|
174 first_line = f.readline()
|
|
175 if first_line[0:5] != "track" and first_line[0] != "#" and first_line[0:2] != "a ":
|
|
176 f.close()
|
|
177 return False
|
|
178
|
|
179 if first_line[0:2] == "a ":
|
|
180 cnt = 2
|
|
181 else:
|
|
182 cnt = 1
|
|
183
|
|
184 while True:
|
|
185 line = f.readline()
|
|
186 if line == "":
|
|
187 break
|
|
188 if line[0] == "#" or line.strip() == "":
|
|
189 continue
|
|
190
|
|
191 if cnt == 1 and line[0:2] != "a ":
|
|
192 f.close()
|
|
193 return False
|
|
194
|
|
195 if cnt == 2 and line[0:2] != "s ":
|
|
196 f.close()
|
|
197 return False
|
|
198
|
|
199 if cnt == 3:
|
|
200 f.close()
|
|
201 if line[0:2] == "s ":
|
|
202 return True
|
|
203 else:
|
|
204 return False
|
|
205
|
|
206 cnt += 1
|
|
207
|
|
208 f.close()
|
|
209 return False
|
|
210 else:
|
|
211 return False
|
|
212
|
|
213
|
|
214 def isBamFile(self, filePath):
|
|
215 bgzf_magic = b"\x1f\x8b\x08\x04"
|
|
216
|
|
217 f = open(filePath, "rb")
|
|
218 data = f.read(4)
|
|
219 f.close()
|
|
220
|
|
221 return data == bgzf_magic
|
|
222
|
|
223
|
|
224 def isSamFile(self, filePath):
|
|
225 f = open(filePath, "rb")
|
|
226 data = f.read(1)
|
|
227 if data == "@":
|
|
228 tag = f.read(2)
|
|
229 if tag == "HD" or tag == "SQ" or tag == "RG" or tag == "CO":
|
|
230 f.close()
|
|
231 return True
|
|
232
|
|
233 f.seek(0)
|
|
234 data = f.read(1)
|
|
235 f.close()
|
|
236 if re.match("[\x20-\x7E]", data):
|
|
237 f = open(filePath, "r")
|
|
238 line = f.readline()
|
|
239 f.close()
|
|
240 return len(line.split("\t")) > 10
|
|
241 else:
|
|
242 return False
|
|
243
|
|
244
|
|
245 def scriptDir(self):
|
|
246 return os.path.dirname(os.path.abspath(sys.argv[0]))
|
|
247
|
|
248
|
|
249 def chrnoFromFastaDescription(self, description):
|
|
250 """
|
|
251 get chromosome number by fasta description line.
|
|
252 """
|
|
253
|
|
254 return description.strip()[1:].strip()
|
|
255
|
|
256
|
|
257 def chrsByRefGenome(self, refGenome):
|
|
258 """
|
|
259 get all chromosome numbers that is included in the reference genome.
|
|
260 """
|
|
261
|
|
262 chrs = []
|
|
263 for line in open(refGenome, "r"):
|
|
264 line = line.strip()
|
|
265 if line[0] == ">":
|
|
266 chrs.append(self.chrnoFromFastaDescription(line))
|
|
267
|
|
268 return chrs
|
|
269
|
|
270
|
|
271 def readRefGenome(self, refGenome, refGenomeBuf, refGenomeChr):
|
|
272 """
|
|
273 read the reference genome fasta file.
|
|
274 """
|
|
275
|
|
276 logging.info("BsfCallBase::readRefGenome: %s" % refGenome)
|
|
277 chr = None
|
|
278 buf = []
|
|
279 fin = open(refGenome, 'r')
|
|
280 for line in fin:
|
|
281 if line[0] == '>':
|
|
282 chr = self.chrnoFromFastaDescription(line)
|
|
283 logging.info("BsfCallBase::readRefGenome: chr=%s" % chr)
|
|
284 if len(buf) > 0:
|
|
285 refGenomeBuf[refGenomeChr[-1]]=''.join(buf)
|
|
286 del buf[:]
|
|
287 refGenomeChr.append(chr)
|
|
288 elif chr != None:
|
|
289 buf.append(line.strip().upper())
|
|
290 else:
|
|
291 logging.fatal("BsfCallBase::readRefGenome: the specified reference genome file \"%s\" is malformed." % refGenome)
|
|
292 fin.close()
|
|
293 refGenomeBuf[refGenomeChr[-1]]=''.join(buf)
|
|
294 logging.info("BsfCallBase::readRefGenome: done.")
|
|
295 return
|
|
296
|
|
297
|
|
298 def lastalOpts(self, lastOpt):
|
|
299 """
|
|
300 get options for lastal by bsf-call --last option
|
|
301 """
|
|
302
|
|
303 return " ".join(lastOpt.split(","))
|
|
304
|
|
305
|
|
306 def mergeOpts(self):
|
|
307 return ""
|
|
308
|
|
309
|
|
310 def filterOpts(self, mismapProb, scoreThres, isPairedEnd):
|
|
311 """
|
|
312 get filtering option. this option is specified to last-map-probs or
|
|
313 last-pair-probs.
|
|
314 """
|
|
315
|
|
316 option = ""
|
|
317 if isPairedEnd:
|
|
318 option = "-m%s" % str(mismapProb)
|
|
319 else:
|
|
320 option = "-s%d -m%s" % (scoreThres, str(mismapProb))
|
|
321
|
|
322 return option
|
|
323
|
|
324
|
|
325 def isPairedEnd(self, readAttr):
|
|
326 return self.pairedEndReadNumbers()[1] in readAttr
|
|
327
|
|
328
|
|
329 def jobIdByQsubOutput(self, qsubOutput):
|
|
330 """
|
|
331 get job id submitted by qsub command and its output.
|
|
332 """
|
|
333
|
|
334 fields = qsubOutput.strip().split()
|
|
335
|
|
336 return fields[2]
|
|
337
|
|
338
|
|
339 def waitForSubmitJobs(self, jobIds, checkInterval = 10):
|
|
340 """
|
|
341 wait for all jobs that have been submitted with qsub command to finish.
|
|
342 """
|
|
343
|
|
344 error_jobs = []
|
|
345 while True:
|
|
346 all_done = True
|
|
347 qstat = os.popen("qstat")
|
|
348 for line in qstat:
|
|
349 fields = line.strip().split()
|
|
350 if fields[0] in jobIds:
|
|
351 if fields[4].find('E') > 0:
|
|
352 if fields[0] not in error_jobs:
|
|
353 logging.fatal("Error has occurred: Job ID=%s" % fields[0])
|
|
354 error_jobs.append(fields[0])
|
|
355 else:
|
|
356 all_done = False
|
|
357 break
|
|
358 qstat.close()
|
|
359
|
|
360 if all_done:
|
|
361 break
|
|
362 else:
|
|
363 sleep(checkInterval)
|
|
364
|
|
365 return
|
|
366
|
|
367
|
|
368 def logJobSubmit(self, msg, jobId, cmd = None):
|
|
369 logging.info("Submit job: %s --> Job ID = %s" % (msg, jobId))
|
|
370 if cmd:
|
|
371 logging.info(cmd)
|
|
372
|
|
373
|
|
374 def bamMapq2Mismap(self, mapq):
|
|
375 return pow(0.1, (float(mapq) / 10))
|
|
376
|
|
377
|
|
378 def getAllMappingResultFiles(self, resultDirs):
|
|
379 mapping_result_files = []
|
|
380
|
|
381 for result_dir in resultDirs:
|
|
382 for root, dirs, files in os.walk(result_dir):
|
|
383 for filename in files:
|
|
384 logging.info("McDetector::getAllMappingResultFiles: %s" % filename)
|
|
385 mapping_result_file = os.path.join(root, filename)
|
|
386 mapping_result_files.append(mapping_result_file)
|
|
387
|
|
388 return mapping_result_files
|
|
389
|
|
390
|
|
391 class BsfCall(BsfCallBase):
|
|
392 """
|
|
393 class to execute bsf-call process.
|
|
394 """
|
|
395
|
|
396 def __init__(self, refGenome, readFilePaths, cmdOpts):
|
|
397 """
|
|
398 constructor of BsfCall
|
|
399 """
|
|
400
|
|
401 self.refGenome = refGenome
|
|
402 self.readFilePaths = readFilePaths
|
|
403 self.reads = []
|
|
404 self.opts = cmdOpts
|
|
405
|
|
406 self.dataDir = None
|
|
407 self.genomeDir = None
|
|
408 self.mcContextDir = None
|
|
409
|
|
410 self.readInFh1 = None
|
|
411 self.readInFh2 = None
|
|
412 self.numReads = {1: 0, 2: 0}
|
|
413
|
|
414 self.mappingResultDirs = []
|
|
415 self.mappingResultFiles = []
|
|
416
|
|
417 self.setDataDir()
|
|
418 self.setLogger()
|
|
419
|
|
420 logging.info("bsf-call start.")
|
|
421 self.logOption()
|
|
422
|
|
423 # self.numReadsPerFile = self.sizeForSplitRead(self.opts["split_read_size"])
|
|
424
|
|
425 if self.opts["mapping_dir"]:
|
|
426 self.opts["only_mcdetection"] = True
|
|
427 else:
|
|
428 self.opts["only_mcdetection"] = False
|
|
429
|
|
430
|
|
431 def execute(self):
|
|
432 """
|
|
433 execute bsf-call process.
|
|
434 """
|
|
435
|
|
436 try:
|
|
437 if self.opts["mapping_dir"]:
|
|
438 # Only mc detection
|
|
439 self.mappingResultDirs = self.opts["mapping_dir"].split(",")
|
|
440 self.mappingResultFiles = self.getAllMappingResultFiles(self.mappingResultDirs)
|
|
441 self.opts["mapping_result_files"] = self.mappingResultFiles
|
|
442 else:
|
|
443 self.makeIndexFile()
|
|
444 self.prepareForReads()
|
|
445 self.mappingResultDirs = self.processMapping()
|
|
446
|
|
447 logging.debug("BsfCall:execute: mapping result directories are: %s" % ','.join(self.mappingResultDirs))
|
|
448 self.processMcDetection(self.mappingResultDirs, self.opts["local_dir"])
|
|
449
|
|
450 logging.info("bsf-call done.")
|
|
451 except:
|
|
452 logging.exception("Exception has occurred.")
|
|
453
|
|
454
|
|
455 def processMapping(self):
|
|
456 """
|
|
457 run read mapping and filtering process.
|
|
458 """
|
|
459
|
|
460 logging.info("Mapping and filtering process start.")
|
|
461
|
|
462 result_dirs = []
|
|
463 for read_attr in self.reads:
|
|
464 self.runLast(read_attr)
|
|
465 result_dirs.append(read_attr["results_dir"])
|
|
466
|
|
467 logging.info("Mapping and filtering process done.")
|
|
468
|
|
469 return result_dirs
|
|
470
|
|
471
|
|
472 def processMcDetection(self, resultDirs, localDir = None):
|
|
473 """
|
|
474 run mC detection process.
|
|
475 """
|
|
476
|
|
477 logging.info("mC detection process start.")
|
|
478
|
|
479 mc_detector = McDetector(self.refGenome, resultDirs, self.mcContextDir, self.opts)
|
|
480 mc_detector.execute(self.opts["output"], self.opts["num_threads"])
|
|
481
|
|
482 logging.info("mC detection process done.")
|
|
483
|
|
484
|
|
485 def setDataDir(self):
|
|
486 """
|
|
487 create directries to store the files of the bsf-call process.
|
|
488 """
|
|
489
|
|
490 if self.opts["work_dir"]:
|
|
491 self.dataDir = self.opts["work_dir"]
|
|
492 else:
|
|
493 self.dataDir = self.autoWorkDir()
|
|
494
|
|
495 self.mcContextDir = "%s/mc_contexts" % self.dataDir
|
|
496
|
|
497 if not os.path.exists(self.dataDir):
|
|
498 os.makedirs(self.dataDir)
|
|
499
|
|
500 if not os.path.exists(self.mcContextDir):
|
|
501 os.makedirs(self.mcContextDir)
|
|
502
|
|
503
|
|
504 def setLogger(self):
|
|
505 """
|
|
506 create logger to store the logs of the bsf-call process.
|
|
507 """
|
|
508
|
|
509 log_level = logging.INFO
|
|
510 # log_level = logging.DEBUG
|
|
511
|
|
512 log_file = "%s/bsf-call.log" % self.dataDir
|
|
513 file_logger = logging.FileHandler(filename=log_file)
|
|
514
|
|
515 file_logger.setLevel(log_level)
|
|
516 file_logger.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
|
|
517
|
|
518 logging.getLogger().addHandler(file_logger)
|
|
519 logging.getLogger().setLevel(log_level)
|
|
520
|
|
521
|
|
522 def logOption(self):
|
|
523 """
|
|
524 output bsf-call option values and arguments to the log.
|
|
525 """
|
|
526
|
|
527 if self.opts["mapping_dir"]:
|
|
528 logging.info("Mapping result directory is specified. Only mC detection is executed.")
|
|
529 logging.info(" Mapping result directory: %s" % self.opts["mapping_dir"])
|
|
530 logging.info(" Reference genome: %s" % self.refGenome)
|
|
531 # logging.info(" Read BAM file: %s" % ("Yes" if self.opts["read_bam"] else "No"))
|
|
532 # logging.info(" Read SAM file: %s" % ("Yes" if self.opts["read_sam"] else "No"))
|
|
533 else:
|
|
534 logging.info("Reference genome: %s" % self.refGenome)
|
|
535 logging.info("Read files: %s" % self.readFilePaths)
|
|
536 logging.info("Working directory: %s" % self.dataDir)
|
|
537 logging.info("Options:")
|
|
538 logging.info(" Threshold of the alignment score at filtering: %d" % self.opts["aln_score_thres"])
|
|
539 # logging.info(" Paired-end direction: %s" % self.opts["pe_direction"])
|
|
540 # logging.info(" Options for LAST: %s" % self.opts["last_opts"])
|
|
541
|
|
542 logging.info(" Threshold of read coverate: %d" % self.opts["coverage"])
|
|
543 logging.info(" Threshold of mC ratio: %s" % str(self.opts["lower_bound"]))
|
|
544 logging.info(" Threshold of the mismap probability at filtering: %s" % str(self.opts["aln_mismap_prob_thres"]))
|
|
545 logging.info(" Working directory: %s" % self.dataDir)
|
|
546 logging.info(" Local directory: %s" % self.opts["local_dir"])
|
|
547 logging.info(" Output file: %s" % (self.opts["output"] if self.opts["output"] else "(stdout)"))
|
|
548 # logging.info(" Use cluster: %s" % ("Yes" if self.opts["use_cluster"] else "No"))
|
|
549 # logging.info(" Queue name: %s" % self.opts["queue_list"])
|
|
550 logging.info(" Number of threads: %d" % self.opts["num_threads"])
|
|
551 # logging.info(" Split read size: %s" % self.opts["split_read_size"])
|
|
552
|
|
553
|
|
554 def prepareForReads(self):
|
|
555 """
|
|
556 create directories to store split reads and result files.
|
|
557 """
|
|
558
|
|
559 for read_no, read_path in enumerate(self.readFilePaths):
|
|
560 readNo = read_no + 1
|
|
561 logging.info("Preparations for a read file start: %d: %s" % (readNo, read_path))
|
|
562
|
|
563 data_dir = "%s/%d" % (self.dataDir, readNo)
|
|
564 read = {"base_dir": data_dir, "path": read_path, "reads_dir": data_dir + "/reads", "results_dir": data_dir + "/results"}
|
|
565
|
|
566 if not os.path.exists(data_dir):
|
|
567 os.makedirs(data_dir)
|
|
568 os.makedirs(read["reads_dir"])
|
|
569 os.makedirs(read["results_dir"])
|
|
570
|
|
571 pe_no = self.pairedEndReadNumbers()[0]
|
|
572 for readpath in read_path.split(","):
|
|
573 dir_name, file_name, base_name, ext, prog = self.splitFilePath(readpath)
|
|
574 file_type = self.checkReadFileType(readpath)
|
|
575 read[pe_no] = {"name": base_name, "fname": file_name, "type": file_type, "path": readpath}
|
|
576 pe_no += 1
|
|
577
|
|
578 is_paired_end = self.isPairedEnd(read)
|
|
579 logging.info("Paired-end: %s" % is_paired_end)
|
|
580 logging.info(" Forward: %s" % read[1]["path"])
|
|
581 if is_paired_end:
|
|
582 logging.info(" Reverse: %s" % read[2]["path"])
|
|
583
|
|
584 logging.info("Preparations for a read file done")
|
|
585 self.reads.append(read)
|
|
586 return
|
|
587
|
|
588
|
|
589 def runLast(self, readAttr):
|
|
590 """
|
|
591 run LAST programs to map reads and filtering.
|
|
592 """
|
|
593
|
|
594 is_paired_end = self.isPairedEnd(readAttr)
|
|
595 filter_option = self.filterOpts(self.opts["aln_mismap_prob_thres"], self.opts["aln_score_thres"], is_paired_end)
|
|
596
|
|
597 if is_paired_end:
|
|
598 logging.info('BsfCall::runLast: PairedEnd')
|
|
599 last_exec = LastExecutorPairedEnd(self.refGenome, self.dataDir, readAttr["reads_dir"], readAttr["results_dir"], self.opts["num_threads"])
|
|
600 else:
|
|
601 logging.info('BsfCall::runLast: Not PairedEnd')
|
|
602 last_exec = LastExecutorSingle(self.refGenome, self.dataDir, readAttr["reads_dir"], readAttr["results_dir"])
|
|
603
|
|
604 # last_exec.execute(readAttr, self.opts["num_threads"], self.lastalOpts(self.opts["last_opts"]), self.mergeOpts(), filter_option)
|
|
605 # last_exec.execute(readAttr, 1, self.lastalOpts(self.opts["last_opts"]), self.mergeOpts(), filter_option)
|
|
606 last_exec.execute(readAttr, 1, "", self.mergeOpts(), filter_option)
|
|
607
|
|
608
|
|
609 def makeIndexFile(self):
|
|
610 """
|
|
611 create index file of reference genome.
|
|
612 """
|
|
613
|
|
614 directions = []
|
|
615 if not os.path.exists("%s.f.prj" % self.refGenome):
|
|
616 directions.append("f")
|
|
617 if not os.path.exists("%s.r.prj" % self.refGenome):
|
|
618 directions.append("r")
|
|
619
|
|
620 if len(directions) > 0:
|
|
621 logging.info("Make index file start.")
|
|
622 last_executor = LastExecutor(self.refGenome, self.dataDir)
|
|
623 last_executor.lastdb(directions, self.opts["num_threads"] > 1)
|
|
624 logging.info("Make index file done.")
|
|
625
|
|
626
|
|
627 def checkReadFileType(self, readFilePath):
|
|
628 """
|
|
629 get read file type.
|
|
630 """
|
|
631
|
|
632 name, ext = os.path.splitext(readFilePath)
|
|
633 if ext == ".gz":
|
|
634 name, ext = os.path.splitext(name)
|
|
635
|
|
636 if len(ext) > 1:
|
|
637 ext = ext[1:]
|
|
638
|
|
639 file_type = None
|
|
640
|
|
641 if ext == "sra" or "fastq" or "fasta":
|
|
642 file_type = ext
|
|
643 elif ext == "fa":
|
|
644 file_type = "fasta"
|
|
645 else:
|
|
646 f = open(readFilePath, "r")
|
|
647 first_char = f.read(1)
|
|
648 if first_char == "@":
|
|
649 file_type = "fastq"
|
|
650 elif first_char == ">":
|
|
651 file_type = "fasta"
|
|
652 else:
|
|
653 file_type = "sra"
|
|
654 f.close()
|
|
655
|
|
656 return file_type
|
|
657
|
|
658
|
|
659 def splitedReadFilePath(self, outputDir, start, end, readDirection, ext):
|
|
660 """
|
|
661 get splitted read file path.
|
|
662 """
|
|
663
|
|
664 return "%s/%010d-%010d_%d.%s" % (outputDir, start, end, readDirection, ext)
|
|
665
|
|
666
|
|
667 def fastqDumpedFilePath(self, outputDir, readName, readDirection = None):
|
|
668 """
|
|
669 get output file path for fastq-dump command.
|
|
670 """
|
|
671
|
|
672 path = "%s/%s" % (outputDir, readName)
|
|
673 if readDirection:
|
|
674 path += "_%d" % readDirection
|
|
675
|
|
676 return path + ".fastq"
|
|
677
|
|
678
|
|
679 def autoWorkDir(self):
|
|
680 """
|
|
681 get working directory path determined automatically.
|
|
682 """
|
|
683
|
|
684 now = datetime.now()
|
|
685
|
|
686 s = ",".join(self.readFilePaths) + self.refGenome
|
|
687 for key, value in self.opts.items():
|
|
688 s += ("%s:%s" % (key, value))
|
|
689 h = hashlib.md5(s).hexdigest()
|
|
690
|
|
691 return "%s-%06d-%s" % (now.strftime("%Y%m%d-%H%M%S"), now.microsecond, h[0:16])
|
|
692
|
|
693
|
|
694 class BsfCallCluster(BsfCall):
|
|
695 """
|
|
696 class to execute bsf-call process on pc cluster.
|
|
697 """
|
|
698
|
|
699 def __init__(self, refGenome, readFilePaths, cmdOpts):
|
|
700 """
|
|
701 constructor of BsfCallCluster
|
|
702 """
|
|
703
|
|
704 BsfCall.__init__(self, refGenome, readFilePaths, cmdOpts)
|
|
705
|
|
706 self.lastExecutor = LastExecutorCluster(self.refGenome, self.opts)
|
|
707 self.mappingJobIds = []
|
|
708
|
|
709
|
|
710 def processMapping(self):
|
|
711 """
|
|
712 if bsf-call is executed on cluster, mapping and filtering job is submitted
|
|
713 when read file is splitted.
|
|
714 therefore, in this function, only wait for all jobs to finish.
|
|
715 """
|
|
716
|
|
717 logging.info("Waiting for all jobs to finish.")
|
|
718
|
|
719 self.waitForSubmitJobs(self.mappingJobIds)
|
|
720
|
|
721 logging.info("Mapping and filtering process done.")
|
|
722
|
|
723 return self.lastExecutor.resultDirs
|
|
724
|
|
725
|
|
726 def processMcDetection(self, resultDirs, localDir = None):
|
|
727 """
|
|
728 for each chromosome number, submit mC detection process job to the cluster.
|
|
729 after all jobs have been finished, output mC detection result.
|
|
730 """
|
|
731
|
|
732 logging.info("mC detection process start.")
|
|
733
|
|
734 chrs = self.chrsByRefGenome(self.refGenome)
|
|
735 job_ids = []
|
|
736 for chr_no in chrs:
|
|
737 job_id = self.submitMcDetectionJob(resultDirs, chr_no)
|
|
738 job_ids.append(job_id)
|
|
739 sleep(1)
|
|
740 logging.info("Submitted jobs: %s" % ",".join(job_ids))
|
|
741
|
|
742 self.waitForSubmitJobs(job_ids)
|
|
743
|
|
744 mc_detector = McDetector(self.refGenome, resultDirs, self.mcContextDir, self.opts)
|
|
745
|
|
746 mc_detector.chrs = chrs
|
|
747 mc_detector.output(self.opts["output"])
|
|
748
|
|
749 logging.info("mC detection process done.")
|
|
750
|
|
751
|
|
752 def submitMcDetectionJob(self, resultDirs, chrNo):
|
|
753 """
|
|
754 submit mC detection process job to the cluster.
|
|
755 """
|
|
756
|
|
757 argv = self.qsubRemoteCommandArgv(resultDirs, chrNo)
|
|
758 cmd = self.qsubCommand(chrNo, " ".join(map((lambda s: '"' + s + '"'), argv)))
|
|
759
|
|
760 qsub = os.popen(cmd)
|
|
761 out = qsub.read()
|
|
762 qsub.close()
|
|
763
|
|
764 job_id = self.jobIdByQsubOutput(out)
|
|
765 self.logJobSubmit("mC detection job: %s: Mapping result directories: %s" % (chrNo, resultDirs), job_id)
|
|
766
|
|
767 return job_id
|
|
768
|
|
769
|
|
770 def qsubRemoteCommandArgv(self, resultDirs, chrNo):
|
|
771 """
|
|
772 get arguments for mC detection program (mc-detector.py).
|
|
773 """
|
|
774
|
|
775 argv = []
|
|
776
|
|
777 argv.append(self.refGenome)
|
|
778 argv.append(",".join(resultDirs))
|
|
779 argv.append(self.mcContextDir)
|
|
780 argv.append(chrNo)
|
|
781 argv.append(str(self.opts["only_mcdetection"]))
|
|
782 argv.append(str(self.opts["lower_bound"]))
|
|
783 argv.append(str(self.opts["coverage"]))
|
|
784 argv.append(str(self.opts["aln_mismap_prob_thres"]))
|
|
785
|
|
786 # if self.opts["read_bam"]:
|
|
787 # argv.append("BAM")
|
|
788 # elif self.opts["read_sam"]:
|
|
789 # argv.append("SAM")
|
|
790 # else:
|
|
791 # argv.append("MAF")
|
|
792 argv.append("MAF")
|
|
793
|
|
794 if self.opts["local_dir"]:
|
|
795 argv.append(self.opts["local_dir"])
|
|
796
|
|
797 return argv
|
|
798
|
|
799
|
|
800 def qsubCommand(self, chrNo, cmdArgs):
|
|
801 """
|
|
802 get qsub command to submit mC detection job.
|
|
803 """
|
|
804
|
|
805 remote_cmd = os.path.join(self.scriptDir(), "mc-detector.py")
|
|
806 out_file = os.path.join(self.mcContextDir, "mc-detector-%s.out" % chrNo)
|
|
807 err_file = os.path.join(self.mcContextDir, "mc-detector-%s.err" % chrNo)
|
|
808
|
|
809 if self.opts["queue_list"]:
|
|
810 cmd = "qsub -o %s -e %s -q %s -pe openmpi %d -cwd -b y %s %s" % (out_file, err_file, self.opts["queue_list"], self.opts['num_threads'], remote_cmd, cmdArgs)
|
|
811 else:
|
|
812 cmd = "qsub -o %s -e %s -pe openmpi %d -cwd -b y %s %s" % (out_file, err_file, self.opts['num_threads'], remote_cmd, cmdArgs)
|
|
813
|
|
814 return cmd
|
|
815
|
|
816
|
|
817 def afterProcessSplitRead(self, readFile, readAttr = None):
|
|
818 """
|
|
819 this function is called after output splitted one read file.
|
|
820 if bsf-call is executed on pc cluster, mapping and filtering job is submitted
|
|
821 after output one splitted read file.
|
|
822 """
|
|
823
|
|
824 if self.isPairedEnd(readAttr):
|
|
825 fpath, ext = os.path.splitext(readFile)
|
|
826 if fpath[-1:] == "2":
|
|
827 read_file = "%s1.%s" % (fpath[0:-1], readAttr[1]["type"])
|
|
828 job_id = self.lastExecutor.executeOneRead(readFile, readAttr)
|
|
829 self.mappingJobIds.append(job_id)
|
|
830 else:
|
|
831 job_id = self.lastExecutor.executeOneRead(readFile, readAttr)
|
|
832 self.mappingJobIds.append(job_id)
|
|
833
|
|
834
|
|
835 class LastExecutor(BsfCallBase):
|
|
836 """
|
|
837 class to run LAST programs to map read and filtering.
|
|
838 """
|
|
839
|
|
840 def __init__(self, refGenome, baseDir = ".", readsDir = None, resultsDir = None, numThreads = 1):
|
|
841 """
|
|
842 constructor of LastExecutor
|
|
843 """
|
|
844
|
|
845 self.refGenome = refGenome
|
|
846 self.baseDir = baseDir
|
|
847 self.readsDir = readsDir
|
|
848 self.resultsDir = resultsDir
|
|
849 self.queue = Queue.Queue()
|
|
850 self.lock = threading.Lock()
|
|
851 self.numThreads = numThreads
|
|
852
|
|
853
|
|
854 def execute(self, readAttr, numThreads = 1, lastalOpts = "", mergeOpts = "", filterOpts = ""):
|
|
855 """
|
|
856 enqueue all splited read files to the queue.
|
|
857 create and start threads to execute read mapping and filtering process.
|
|
858 wait for all threads to finish.
|
|
859 """
|
|
860
|
|
861 self.enqueue(readAttr)
|
|
862
|
|
863 if self.queue.qsize()==0:
|
|
864 logging.fatal("LastExecutor::execute: Error: queue size=%d" % self.queue.qsize())
|
|
865 sys.exit(1)
|
|
866
|
|
867 logging.info("Queued %d files." % self.queue.qsize())
|
|
868
|
|
869 threads = []
|
|
870 for i in range(numThreads):
|
|
871 t = threading.Thread(target=self.worker, args=(readAttr, lastalOpts, mergeOpts, filterOpts))
|
|
872 t.daemon = True
|
|
873 threads.append(t)
|
|
874 t.start()
|
|
875
|
|
876 i = 1
|
|
877 for thread in threads:
|
|
878 logging.info("Waiting for %d th thread." % i)
|
|
879 thread.join()
|
|
880 logging.info("Joined %d th thread." % i)
|
|
881 i+=1
|
|
882
|
|
883
|
|
884 def worker(self, readAttr, lastalOpts, mergeOpts, filterOpts):
|
|
885 """
|
|
886 thread worker to execute LAST.
|
|
887 dequeue read file path from the queue and execute read mapping filtering
|
|
888 process.
|
|
889 """
|
|
890 if self.queue.empty():
|
|
891 logging.info("LastExecutor::worker: Queue is empty.")
|
|
892 while not self.queue.empty():
|
|
893 fpath = self.queue.get_nowait()
|
|
894 self.runLast(fpath, readAttr, lastalOpts, mergeOpts, filterOpts)
|
|
895 return
|
|
896
|
|
897
|
|
898 def runLast(self, readFile, readAttr, lastalOpts, mergeOpts, filterOpts, rmInFiles = True):
|
|
899 """
|
|
900 execute LAST programs to map read and filtering.
|
|
901 """
|
|
902
|
|
903 cmd = self.batchCmd(readFile, readAttr, lastalOpts, mergeOpts, filterOpts, rmInFiles)
|
|
904 logging.info("LastExecutor::runLast: command=%s" % cmd)
|
|
905 p = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
|
|
906 # out, error = p.communicate()
|
|
907 p.wait()
|
|
908
|
|
909 error_msg = p.communicate()[1]
|
|
910 if len(error_msg) > 0:
|
|
911 logging.fatal(error_msg)
|
|
912
|
|
913
|
|
914 def enqueue(self, readAttr):
|
|
915 """
|
|
916 enqueue all splitted read files to the queue.
|
|
917 """
|
|
918
|
|
919 # logging.info("%s/*_1.%s.bz2" % (self.readsDir, readAttr[1]["type"]))
|
|
920 # for read_file in glob.glob("%s/*_1.%s" % (self.readsDir, readAttr[1]["type"])):
|
|
921 # self.queue.put(read_file)
|
|
922 self.queue.put(readAttr[1]["path"])
|
|
923
|
|
924
|
|
925 def lastdb(self, directions, parallel = False):
|
|
926 """
|
|
927 execute lastdb command to create index file of reference genome.
|
|
928 """
|
|
929
|
|
930 cmds = []
|
|
931 for direction in directions:
|
|
932 cmds.append(self.lastdbCmd(direction))
|
|
933
|
|
934 if parallel:
|
|
935 processes = []
|
|
936 for cmd in cmds:
|
|
937 logging.info(cmd)
|
|
938 p = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
|
|
939 out = p.stdout
|
|
940 processes.append({"p": p, "out": out})
|
|
941 for process in processes:
|
|
942 process["p"].wait()
|
|
943 out_data = process["out"].read()
|
|
944 if len(out_data) > 0:
|
|
945 logging.info(out_data)
|
|
946 else:
|
|
947 for cmd in cmds:
|
|
948 logging.info(cmd)
|
|
949 p = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
|
|
950 out = p.stdout
|
|
951 p.wait()
|
|
952 out_data = out.read()
|
|
953 if len(out_data) > 0:
|
|
954 logging.info(out_data)
|
|
955
|
|
956
|
|
957 def lastdbCmd(self, direction):
|
|
958 """
|
|
959 get lastdb command to create index file of reference genome.
|
|
960 """
|
|
961 # return "lastdb -w2 -u bisulfite_%s.seed %s.%s %s" % (direction, self.refGenome, direction, self.refGenome)
|
|
962 return "lastdb -uBIS%s %s.%s %s" % (direction.upper(), self.refGenome, direction, self.refGenome)
|
|
963
|
|
964
|
|
965 def lastalBsCmd(self, readFile, opts = ""):
|
|
966 """
|
|
967 get lastal command to map read.
|
|
968 """
|
|
969 # s_opt = self.lastalSopt(direction)
|
|
970 read_name = self.readNameByReadFile(readFile)
|
|
971 return "TMPDIR=%s last-bisulfite.sh %s.%s %s.%s %s > %s" % (self.readsDir, self.refGenome, 'f', self.refGenome, 'r', readFile, self.mappingResultFilePath(read_name))
|
|
972
|
|
973 def lastalBsPairCmd(self, readFile1, readFile2, opts = ""):
|
|
974 """
|
|
975 get lastal command to map read.
|
|
976 """
|
|
977 # s_opt = self.lastalSopt(direction)
|
|
978 read_name = self.readNameByReadFile(readFile)
|
|
979 return "PARALLEL=-j%d TMPDIR=%s last-bisulfite-pair.sh %s.%s %s.%s %s %s > %s" % (self.numThreads, self.readsDir, self.refGenome, 'f', self.refGenome, 'r', readFile1, readFile2, self.mappingResultFilePath(read_name))
|
|
980
|
|
981
|
|
982 # def mergeCmd(self, forwardFile, reverseFile, outputFile, opts = "", rmInFiles = True):
|
|
983 def mergeCmd(self, inputFile, outputFile, opts = "", rmInFiles = True):
|
|
984 """
|
|
985 get command to merge lastal output.
|
|
986 """
|
|
987 cmd = "last-merge-batches %s > %s" % (inputFile, outputFile)
|
|
988 if rmInFiles:
|
|
989 cmd += "; rm %s" % inputFile
|
|
990 return cmd
|
|
991
|
|
992
|
|
993 # def mappingAndMergeCmd(self, readFile, lastalOpts = "", mergeOpts = "", rmInFiles = True):
|
|
994 def mappingPairCmd(self, readFile1, readFile2, lastalOpts = "", mergeOpts = "", rmInFiles = True):
|
|
995 """
|
|
996 get read mapping and filtering command.
|
|
997 """
|
|
998 read_name = self.readNameByReadFile(readFile)
|
|
999 n, ext = os.path.splitext(readFile)
|
|
1000 if ext == ".gz":
|
|
1001 n, ext = os.path.splitext(n)
|
|
1002 lastal_qopt = self.lastalQopt(ext[1:])
|
|
1003 lastal_opt = "%s %s" % (lastalOpts, lastal_qopt)
|
|
1004 mapping_file = self.mappingResultFilePath(read_name)
|
|
1005
|
|
1006 return self.lastalBsPairCmd(readFile1, readFile2, lastal_opt)
|
|
1007
|
|
1008
|
|
1009 # def mappingResultFilePath(self, readName, direction):
|
|
1010 def mappingResultFilePath(self, readName):
|
|
1011 """
|
|
1012 get read mapping result file path.
|
|
1013 """
|
|
1014
|
|
1015 return "%s/%s.maf" % (self.resultsDir, readName)
|
|
1016
|
|
1017
|
|
1018 def mergeResultFilePath(self, readName):
|
|
1019 """
|
|
1020 get merge result file path.
|
|
1021 """
|
|
1022
|
|
1023 return "%s/%s.merge.maf" % (self.resultsDir, readName)
|
|
1024
|
|
1025
|
|
1026 def filterResultFilePath(self, readName):
|
|
1027 """
|
|
1028 get filtering result file path.
|
|
1029 """
|
|
1030
|
|
1031 return "%s/%s.maf" % (self.resultsDir, readName)
|
|
1032
|
|
1033
|
|
1034 def lastalSopt(self, direction):
|
|
1035 """
|
|
1036 get -s option for lastal.
|
|
1037 """
|
|
1038
|
|
1039 opt = ""
|
|
1040 if direction == "f":
|
|
1041 opt = "-s1"
|
|
1042 elif direction == "r":
|
|
1043 opt = "-s0"
|
|
1044
|
|
1045 return opt
|
|
1046
|
|
1047
|
|
1048 def lastalQopt(self, fileType):
|
|
1049 """
|
|
1050 get -Q option for lastal.
|
|
1051 """
|
|
1052
|
|
1053 opt = ""
|
|
1054 if fileType == "fasta":
|
|
1055 opt = "-Q0"
|
|
1056 elif fileType == "fastq":
|
|
1057 opt = "-Q1"
|
|
1058
|
|
1059 return opt
|
|
1060
|
|
1061
|
|
1062 class LastExecutorCluster(LastExecutor):
|
|
1063 """
|
|
1064 class to run LAST programs on pc cluster.
|
|
1065 """
|
|
1066
|
|
1067 def __init__(self, refGenome, bsfCallOpts):
|
|
1068 """
|
|
1069 constructor of LastExecutorCluster
|
|
1070 """
|
|
1071
|
|
1072 self.refGenome = refGenome
|
|
1073 self.opts = bsfCallOpts
|
|
1074
|
|
1075 self.resultDirs = []
|
|
1076
|
|
1077
|
|
1078 def executeOneRead(self, readFile, readAttr):
|
|
1079 """
|
|
1080 execute read mapping and filtering process with specified read.
|
|
1081 on pc cluster, submit mapping and filtering job and return job id.
|
|
1082 """
|
|
1083
|
|
1084 if readAttr["results_dir"] not in self.resultDirs:
|
|
1085 self.resultDirs.append(readAttr["results_dir"])
|
|
1086
|
|
1087 lastal_opts = self.lastalOpts(self.opts["last_opts"])
|
|
1088 merge_opts = self.mergeOpts()
|
|
1089 filter_opts = self.filterOpts(self.opts["aln_mismap_prob_thres"], self.opts["aln_score_thres"], self.isPairedEnd(readAttr))
|
|
1090 job_id = self.submitJob(readFile, readAttr, lastal_opts, merge_opts, filter_opts, self.opts["queue_list"])
|
|
1091
|
|
1092 return job_id
|
|
1093
|
|
1094
|
|
1095 def submitJob(self, readFile, readAttr, lastalOpts = "", mergeOpts = "", filterOpts = "", queueName = None):
|
|
1096 """
|
|
1097 submit read mapping and filtering process job.
|
|
1098 """
|
|
1099
|
|
1100 job_id = None
|
|
1101
|
|
1102 read_name = self.readNameByReadFile(readFile)[0:-2]
|
|
1103 out_file = self.qsubStdoutFilePath(readAttr["base_dir"], read_name)
|
|
1104 err_file = self.qsubStderrFilePath(readAttr["base_dir"], read_name)
|
|
1105 remote_cmd = self.remoteCommand(readAttr)
|
|
1106 remote_cmd_args = " ".join(map((lambda s: '"' + s + '"'), self.remoteCommandArgv(read_name, readAttr, lastalOpts, filterOpts)))
|
|
1107
|
|
1108 if queueName:
|
|
1109 cmd = "qsub -o %s -e %s -q %s -cwd %s %s" % (out_file, err_file, queueName, remote_cmd, remote_cmd_args)
|
|
1110 else:
|
|
1111 cmd = "qsub -o %s -e %s -cwd %s %s" % (out_file, err_file, remote_cmd, remote_cmd_args)
|
|
1112
|
|
1113 qsub = os.popen(cmd)
|
|
1114 out = qsub.read()
|
|
1115 qsub.close()
|
|
1116
|
|
1117 job_id = self.jobIdByQsubOutput(out)
|
|
1118
|
|
1119 dir_path, file_name, base_name, ext, prog = self.splitFilePath(readFile)
|
|
1120 self.logJobSubmit("Mapping and filtering: read: %s/%s" % (dir_path, base_name[0:-2]), job_id)
|
|
1121
|
|
1122 return job_id
|
|
1123
|
|
1124
|
|
1125 def remoteCommand(self, readAttr):
|
|
1126 """
|
|
1127 get read mapping and filtering command path to submit by qsub command.
|
|
1128 """
|
|
1129
|
|
1130 if self.isPairedEnd(readAttr):
|
|
1131 return os.path.join(self.scriptDir(), "mapping-p.sh")
|
|
1132 else:
|
|
1133 return os.path.join(self.scriptDir(), "mapping-s.sh")
|
|
1134
|
|
1135
|
|
1136 def remoteCommandArgv(self, readName, readAttr, lastalOpts, filterOpts):
|
|
1137 """
|
|
1138 get read mapping and filtering command arguments.
|
|
1139 """
|
|
1140
|
|
1141 argv = []
|
|
1142
|
|
1143 argv.append(readAttr["reads_dir"])
|
|
1144 argv.append(readAttr["results_dir"])
|
|
1145 argv.append(self.refGenome)
|
|
1146 argv.append(filterOpts)
|
|
1147
|
|
1148 argv.append(readName)
|
|
1149 argv.append(readAttr[1]["type"])
|
|
1150 argv.append("%s %s" % (lastalOpts, self.lastalQopt(readAttr[1]["type"])))
|
|
1151
|
|
1152 if self.isPairedEnd(readAttr):
|
|
1153 argv.append(readName)
|
|
1154 argv.append(readAttr[2]["type"])
|
|
1155 argv.append("%s %s" % (lastalOpts, self.lastalQopt(readAttr[2]["type"])))
|
|
1156
|
|
1157 return argv
|
|
1158
|
|
1159
|
|
1160 def qsubStdoutFilePath(self, dirPath, readName):
|
|
1161 """
|
|
1162 get qsub command standard output file path.
|
|
1163 """
|
|
1164
|
|
1165 return "%s/mapping_%s.out" % (dirPath, readName)
|
|
1166
|
|
1167
|
|
1168 def qsubStderrFilePath(self, dirPath, readName):
|
|
1169 """
|
|
1170 get qsub command standard error file path.
|
|
1171 """
|
|
1172
|
|
1173 return "%s/mapping_%s.err" % (dirPath, readName)
|
|
1174
|
|
1175
|
|
1176 class LastExecutorSingle(LastExecutor):
|
|
1177 """
|
|
1178 class to run LAST programs to map single read and filtering.
|
|
1179 """
|
|
1180
|
|
1181 def __init__(self, refGenome, baseDir, readsDir, resultsDir):
|
|
1182 """
|
|
1183 constructor of LastExecutorSingle
|
|
1184 """
|
|
1185
|
|
1186 LastExecutor.__init__(self, refGenome, baseDir, readsDir, resultsDir)
|
|
1187
|
|
1188
|
|
1189 def batchCmd(self, readFile1, readAttr, lastalOpts = "", mergeOpts = "", filterOpts = "", rmInFiles = True):
|
|
1190 """
|
|
1191 get batch command to map read and filtering.
|
|
1192 """
|
|
1193 read_name = self.readNameByReadFile(readFile1)
|
|
1194 out_file = self.filterResultFilePath(read_name[0:-2])
|
|
1195 return "TMPDIR=%s last-bisulfite.sh %s.f %s.r %s > %s/%s.maf" % (self.readsDir, self.refGenome, self.refGenome, readFile1, self.resultsDir, read_name)
|
|
1196 # cmds = []
|
|
1197 # cmds.append(self.mappingAndMergeCmd(readFile, lastalOpts, mergeOpts, rmInFiles))
|
|
1198 # cmds.append(self.mappingPairCmd(readFile1, readFile2, lastalOpts, mergeOpts, rmInFiles))
|
|
1199 # cmds.append(self.filterCmd(self.mergeResultFilePath(read_name), out_file, filterOpts, rmInFiles))
|
|
1200 # cmds.append(self.filterCmd(self.mappingResultFilePath(read_name), out_file, filterOpts, rmInFiles))
|
|
1201 # cmds.append("bzip2 %s" % out_file)
|
|
1202 # return "; ".join(cmds)
|
|
1203
|
|
1204
|
|
1205 def filterCmd(self, inputFile, outputFile, opts = "", rmInFile = True):
|
|
1206 """
|
|
1207 get filter command.
|
|
1208 """
|
|
1209
|
|
1210 cmd = "last-map-probs %s %s > %s" % (opts, inputFile, outputFile)
|
|
1211 if rmInFile:
|
|
1212 cmd += "; rm %s" % inputFile
|
|
1213
|
|
1214 return cmd
|
|
1215
|
|
1216
|
|
1217 class LastExecutorPairedEnd(LastExecutor):
|
|
1218 """
|
|
1219 class to run LAST programs to map paired-end read and filtering.
|
|
1220 """
|
|
1221
|
|
1222 def __init__(self, refGenome, baseDir, readsDir, resultsDir, numThreads):
|
|
1223 """
|
|
1224 constructor of LastExecutorPairedEnd
|
|
1225 """
|
|
1226 LastExecutor.__init__(self, refGenome, baseDir, readsDir, resultsDir)
|
|
1227
|
|
1228
|
|
1229 def batchCmd(self, readFile1, readAttr, lastalOpts = "", mergeOpts = "", filterOpts = "", rmInFiles = True):
|
|
1230 """
|
|
1231 get batch command to map read and filtering.
|
|
1232 """
|
|
1233 # readFile2 = self.secondReadFilePathByFirstReadFilePath(readFile1, readAttr[2]["type"])
|
|
1234 # read_name1 = self.readNameByReadFile(readFile1)
|
|
1235 # read_name2 = self.readNameByReadFile(read_files[1])
|
|
1236 # merge_result_file = "%s %s" % (self.mergeResultFilePath(read_name1), self.mergeResultFilePath(read_name2))
|
|
1237 # mapping_result_file = "%s %s" % (self.mappingResultFilePath(read_name1), self.mappingResultFilePath(read_name2))
|
|
1238 # out_file = self.filterResultFilePath(read_name1[0:-2])
|
|
1239 # return "TMPDIR=%s last-bisulfite-paired.sh %s.f %s.r %s %s > %s" % (self.baseDir, self.refGenome, self.refGenome, readFile1, readFile2, out_file)
|
|
1240 read_name1 = self.readNameByReadFile(readAttr[1]["path"])
|
|
1241 read_name2 = self.readNameByReadFile(readAttr[2]["path"])
|
|
1242 return "PARALLEL=-j%d TMPDIR=%s last-bisulfite-paired.sh %s.f %s.r %s %s > %s/%s,%s.maf" % (self.numThreads, self.readsDir, self.refGenome, self.refGenome, readAttr[1]["path"], readAttr[2]["path"], self.resultsDir, read_name1, read_name2)
|
|
1243
|
|
1244
|
|
1245 def filterCmd(self, inputFile, outputFile, opts = "", rmInFile = True):
|
|
1246 """
|
|
1247 get filter command.
|
|
1248 """
|
|
1249
|
|
1250 cmd = "last-pair-probs %s %s > %s" % (opts, inputFile, outputFile)
|
|
1251 if rmInFile:
|
|
1252 cmd += "; rm %s" % inputFile
|
|
1253
|
|
1254 return cmd
|
|
1255
|
|
1256
|
|
1257 class McDetector(BsfCallBase):
|
|
1258 """
|
|
1259 class to execute mC detection process.
|
|
1260 """
|
|
1261
|
|
1262 def __init__(self, refGenome, resultDirs, mcContextDir, options):
|
|
1263 """
|
|
1264 constructor of McDetector
|
|
1265 """
|
|
1266
|
|
1267 self.refGenome = refGenome
|
|
1268 self.refGenomeBuf = {}
|
|
1269 self.refGenomeChr = []
|
|
1270
|
|
1271 self.mappingResultDirs = resultDirs
|
|
1272 self.mcContextDir = mcContextDir
|
|
1273 self.lowerBound = options["lower_bound"]
|
|
1274 self.coverageThreshold = options["coverage"]
|
|
1275 self.onlyMcDetection = options["only_mcdetection"]
|
|
1276
|
|
1277 self.opts = options
|
|
1278
|
|
1279 self.mappingResultFiles = []
|
|
1280
|
|
1281 if self.onlyMcDetection:
|
|
1282 self.mismapThreshold = options["aln_mismap_prob_thres"]
|
|
1283 # self.readBam = options["read_bam"]
|
|
1284 # self.readSam = options["read_sam"]
|
|
1285 self.readBam = False
|
|
1286 self.readSam = False
|
|
1287 if "mapping_result_files" in options:
|
|
1288 self.mappingResultFiles = options["mapping_result_files"]
|
|
1289 else:
|
|
1290 self.mappingResultFiles = self.getAllMappingResultFiles(resultDirs)
|
|
1291 else:
|
|
1292 self.mismapThreshold = 1e-10
|
|
1293 self.readBam = False
|
|
1294 self.readSam = False
|
|
1295 self.mappingResultFiles = self.getAllMappingResultFiles(resultDirs)
|
|
1296
|
|
1297 if len(self.mappingResultFiles)==0:
|
|
1298 logging.fatal("McDetector::__init__: error: no mapping result file found.")
|
|
1299 sys.exit(1)
|
|
1300
|
|
1301 if options["local_dir"]:
|
|
1302 self.localDir = options["local_dir"]
|
|
1303 else:
|
|
1304 self.localDir = mcContextDir
|
|
1305
|
|
1306 self.readRefGenome(self.refGenome, self.refGenomeBuf, self.refGenomeChr)
|
|
1307
|
|
1308 logging.debug("McDetector::__init__: mappingResultDirs=%s" % ','.join(self.mappingResultDirs))
|
|
1309
|
|
1310
|
|
1311 def execute(self, outputFile, numWorkers = 1):
|
|
1312 """
|
|
1313 execute mC detection process and output result.
|
|
1314 """
|
|
1315
|
|
1316 self.process()
|
|
1317 self.output(outputFile)
|
|
1318
|
|
1319
|
|
1320 def process(self):
|
|
1321 """
|
|
1322 execute mC detection process.
|
|
1323 """
|
|
1324
|
|
1325 logging.info("mC detection process start")
|
|
1326
|
|
1327 if len(self.mappingResultFiles)==0:
|
|
1328 logging.fatal("McDetector::process: error: no mapping result found.")
|
|
1329 sys.exit(1)
|
|
1330
|
|
1331 for mapping_result_file in self.mappingResultFiles:
|
|
1332 logging.info("Parsing mapping result file: %s" % mapping_result_file)
|
|
1333 if self.onlyMcDetection:
|
|
1334 if not (self.readBam or self.readSam):
|
|
1335 if self.isGzipFile(mapping_result_file) or self.isMafFile(mapping_result_file):
|
|
1336 self.processMafFile(mapping_result_file)
|
|
1337 else:
|
|
1338 # BAM or SAM
|
|
1339 if self.readBam and self.isBamFile(mapping_result_file):
|
|
1340 self.processSamFile(mapping_result_file)
|
|
1341 elif self.readSam and self.isSamFile(mapping_result_file):
|
|
1342 self.processSamFile(mapping_result_file)
|
|
1343 else:
|
|
1344 self.processMafFile(mapping_result_file)
|
|
1345
|
|
1346 logging.info("Parsing mapping result file done")
|
|
1347
|
|
1348 if self.mcContextDir != self.localDir:
|
|
1349 copy(self.mcContextLocalFilePath(self.targetChr), self.mcContextDir)
|
|
1350
|
|
1351
|
|
1352 def output(self, outputFile):
|
|
1353 """
|
|
1354 output mC detection result.
|
|
1355 """
|
|
1356
|
|
1357 logging.info("McDetector::output: outputFile=%s" % outputFile)
|
|
1358 popen_args = ['sort', '-k1']
|
|
1359 list = glob.glob("%s/*.bsf" % self.localDir)
|
|
1360 if len(list)==0:
|
|
1361 logging.fatal("McDetect::output: no *._bsf_ files found in %s" % self.localDir)
|
|
1362 sys.exit(1)
|
|
1363 for bsf_file in list:
|
|
1364 if os.path.getsize(bsf_file) > 0:
|
|
1365 popen_args.append(bsf_file)
|
|
1366 logging.info("McDetector::output: added bsf_file=\"%s\"" % bsf_file)
|
|
1367 logging.debug("McDetector::output: popen_args=%s" % ' '.join(popen_args))
|
|
1368 fout = open(outputFile, 'w')
|
|
1369 if len(popen_args) > 2:
|
|
1370 pipe = subprocess.Popen(popen_args, stdout=subprocess.PIPE)
|
|
1371 block = []
|
|
1372 for line in pipe.stdout:
|
|
1373 (chr, ctx_pos, strand, mc_ctx, base, conf) = line.strip().split("\t")
|
|
1374 ctx_pos = int(ctx_pos)
|
|
1375 conf = float(conf)
|
|
1376 if len(block)==0 or block[-1][0]==chr:
|
|
1377 block.append([chr, ctx_pos, strand, mc_ctx, base, conf])
|
|
1378 else:
|
|
1379 self.outputOneChrBlock(fout, block)
|
|
1380 del block[:]
|
|
1381 block.append([chr, ctx_pos, strand, mc_ctx, base, conf])
|
|
1382 if len(block) > 0:
|
|
1383 self.outputOneChrBlock(fout, block)
|
|
1384 pipe.stdout.close()
|
|
1385 else:
|
|
1386 logging.info("McDetector::output: no result files found.")
|
|
1387 fout.close()
|
|
1388
|
|
1389
|
|
1390 def outputOneChrBlock(self, fout, block):
|
|
1391 """
|
|
1392 output mC detection result for one chromosome.
|
|
1393 """
|
|
1394
|
|
1395 chr = block[0][0]
|
|
1396 logging.info("McDetector::outputOneChrBlock: chr=%s" % chr)
|
|
1397 mc_contexts = {}
|
|
1398 for b in sorted(block, key=lambda block: block[1]):
|
|
1399 try:
|
|
1400 ctx_pos, strand, mc_ctx, base, conf = b[1:]
|
|
1401 if not ctx_pos in mc_contexts:
|
|
1402 mc_contexts[ctx_pos] = {}
|
|
1403 if not strand in mc_contexts[ctx_pos]:
|
|
1404 mc_contexts[ctx_pos][strand] = {}
|
|
1405 if not mc_ctx in mc_contexts[ctx_pos][strand]:
|
|
1406 mc_contexts[ctx_pos][strand][mc_ctx] = []
|
|
1407 mc_contexts[ctx_pos][strand][mc_ctx].append([base, conf])
|
|
1408 except ValueError, e:
|
|
1409 logging.warning("McDetect::outputOneChrBlock: value error: %s: %s -> %s" % (fpath, line.strip(), e.args[0]))
|
|
1410
|
|
1411 num_entry = 0
|
|
1412 if len(mc_contexts.keys()) > 0:
|
|
1413 for pos in sorted(mc_contexts.keys()):
|
|
1414 for strand in mc_contexts[pos].keys():
|
|
1415 for mc_ctx in mc_contexts[pos][strand].keys():
|
|
1416 coverage, mc_ratio = self.calcCoverage(mc_contexts[pos][strand][mc_ctx], strand)
|
|
1417 if coverage >= self.coverageThreshold and mc_ratio >= self.lowerBound:
|
|
1418 fout.write("%s\t%d\t%s\t%s\t%g\t%d\n" % (chr, pos, strand, mc_ctx, mc_ratio, coverage))
|
|
1419 num_entry += 1
|
|
1420 else:
|
|
1421 logging.info("McDetect::outputOneChrBlock: rejected: chr=%s pos=%d strand=%s coverage=%g mc_ratio=%g" % (chr, pos, strand, coverage, mc_ratio))
|
|
1422 # self.bzip2File(fpath, False)
|
|
1423 logging.info("McDetector::outputOneChrBlock: number of entries %s" % num_entry)
|
|
1424
|
|
1425
|
|
1426 def processMafFile(self, resultFile):
|
|
1427 """
|
|
1428 read mapping result file, and output mC context data file.
|
|
1429 """
|
|
1430
|
|
1431 file_name = self.splitFilePath(resultFile)[1]
|
|
1432 outputFile = "%s/%s.bsf" % (self.localDir, file_name)
|
|
1433 try:
|
|
1434 fin = open(resultFile, 'r')
|
|
1435 fout = open(outputFile, 'w')
|
|
1436 block = []
|
|
1437 for line in fin:
|
|
1438 if line[0] == '#' or line[0] == 'p' or line[0] == "\n":
|
|
1439 continue
|
|
1440 if line[0] == 'a' or line[0] == 's' or line[0] == 'q':
|
|
1441 block.append(line.strip())
|
|
1442 if len(block)==4:
|
|
1443 if block[0][0]=='a' and block[1][0]=='s' and block[2][0]=='s' and block[3][0]=='q':
|
|
1444 mismap_prob = float(block[0].split('=')[2])
|
|
1445 if mismap_prob <= self.mismapThreshold:
|
|
1446 b1 = block[1].split()
|
|
1447 chr = b1[1]
|
|
1448 ref_seq = b1[-1].upper()
|
|
1449 ref_start = int(b1[2]) # 0-based position
|
|
1450 read_seq = block[2].split()[-1]
|
|
1451 read_len = len(read_seq)
|
|
1452 read_qual = block[3].split()[-1]
|
|
1453 strand = self.findStrandFromAlignment(chr, ref_start, read_seq, read_len)
|
|
1454 # strand = block[2].split()[4]
|
|
1455 if strand == '+' or strand == '-':
|
|
1456 lines = self.extractMcContextsByOneRead(chr, strand, mismap_prob, ref_seq, ref_start, read_seq, read_qual, read_len)
|
|
1457 for line in lines:
|
|
1458 fout.write(line)
|
|
1459 logging.debug("processMafFile: a maf block(%s) is successfully captured." % strand)
|
|
1460 elif strand == '+-':
|
|
1461 for st in ('+', '-'):
|
|
1462 lines = self.extractMcContextsByOneRead(chr, st, mismap_prob, ref_seq, ref_start, read_seq, read_qual, read_len)
|
|
1463 for line in lines:
|
|
1464 fout.write(line)
|
|
1465 logging.debug("processMafFile: a maf block(%s) is successfully captured." % strand)
|
|
1466 else:
|
|
1467 logging.debug("processMafFile: a maf block does not show strand-specific info.")
|
|
1468 else:
|
|
1469 logging.info("processMafFile: alignment \"%s\" has greater mismap prob. than the threshold." % block[0])
|
|
1470 del block[:]
|
|
1471 else:
|
|
1472 logging.fatal("processMafFile: error: unexpected malformed maf block is found.")
|
|
1473 logging.fatal("block 1: \"%s\"\n" % block[0])
|
|
1474 logging.fatal("block 2: \"%s\"\n" % block[1])
|
|
1475 logging.fatal("block 3: \"%s\"\n" % block[2])
|
|
1476 logging.fatal("block 4: \"%s\"\n" % block[3])
|
|
1477 sys.exit(1)
|
|
1478 fin.close()
|
|
1479 fout.close()
|
|
1480
|
|
1481 if len(block) > 0:
|
|
1482 logging.fatal("McDetect::processMafFile: error: possible malformed MAF file.")
|
|
1483 for b in block:
|
|
1484 logging.fatal(b)
|
|
1485 sys.exit(1)
|
|
1486 except IOError:
|
|
1487 logging.fatal("McDetect::processMafFile: error: unable to read a MAF file \"%s\"" % resultFile)
|
|
1488 sys.exit(1)
|
|
1489 return
|
|
1490
|
|
1491
|
|
1492 def processSamFile(self, samFile):
|
|
1493 """
|
|
1494 read mapping BAM/SAM file, and output mC context data file.
|
|
1495 """
|
|
1496
|
|
1497 logging.info("Process BAM/SAM file start: %s" % samFile)
|
|
1498
|
|
1499 samfile = None
|
|
1500 try:
|
|
1501 if self.readBam:
|
|
1502 samfile = pysam.Samfile(samFile, "rb")
|
|
1503 else:
|
|
1504 samfile = pysam.Samfile(samFile, "r")
|
|
1505
|
|
1506 counter = 1
|
|
1507 for aln in samfile.fetch(until_eof = True):
|
|
1508 samaln = SamAlnParser(samfile, aln)
|
|
1509 samaln.setRefGenome(self.targetChr, self.targetSeqD, self.targetSeqLen)
|
|
1510 samaln.parse()
|
|
1511
|
|
1512 chr = samaln.referenceName()
|
|
1513
|
|
1514 if (chr is None) or (chr != self.targetChr):
|
|
1515 continue
|
|
1516
|
|
1517 if aln.mapq:
|
|
1518 mismap = self.bamMapq2Mismap(aln.mapq)
|
|
1519 if mismap - self.mismapThreshold > 0:
|
|
1520 continue
|
|
1521
|
|
1522 read_seq = samaln.alnReadSeq.replace("t", "C")
|
|
1523 self.extractMcContextsByOneRead(chr, samaln.strand, 0, samaln.alnRefSeq.upper(), samaln.alnRefStart, self.targetSeqLen, read_seq, read_qual)
|
|
1524
|
|
1525 counter += 1
|
|
1526 if counter > 500000:
|
|
1527 self.outputMcContextData()
|
|
1528 counter = 1
|
|
1529
|
|
1530 samfile.close()
|
|
1531 self.outputMcContextData()
|
|
1532 except Exception, e:
|
|
1533 logging.fatal(samFile)
|
|
1534 logging.fatal(" %s" % str(type(e)))
|
|
1535 logging.fatal(" %s" % str(e))
|
|
1536 if samfile:
|
|
1537 samfile.close()
|
|
1538
|
|
1539 def extractMcContextsByOneRead(self, chr, strand, mismapProb, refSeq, refStart, readSeq, readQual, readLen):
|
|
1540 """
|
|
1541 extract mC context by one read.
|
|
1542 """
|
|
1543
|
|
1544 lines = []
|
|
1545 if strand == '+':
|
|
1546 real_pos = refStart
|
|
1547 for i in range(readLen):
|
|
1548 if readSeq[i] == '-':
|
|
1549 continue
|
|
1550 # if refSeq[i] == 'C' and (readSeq[i] == 'C' or readSeq[i] == 'T'):
|
|
1551 if refSeq[i] == 'C':
|
|
1552 baseConf = self.qualityCharToErrorProb(readQual[i])
|
|
1553 ctx_type = self.mcContextType(self.refGenomeBuf[chr], real_pos, strand)
|
|
1554 line = "%s\t%d\t%s\t%s\t%s\t%g\n" % (chr, real_pos, strand, ctx_type, readSeq[i], (1-mismapProb) * (1-baseConf))
|
|
1555 # logging.debug("extractMcContextsByOneRead: line=%s" % line)
|
|
1556 lines.append(line)
|
|
1557 real_pos += 1
|
|
1558 if strand == '-':
|
|
1559 real_pos = refStart
|
|
1560 for i in range(readLen):
|
|
1561 if readSeq[i] == '-':
|
|
1562 continue
|
|
1563 # if refSeq[i] == 'G' and (readSeq[i] == 'G' or readSeq[i] == 'A'):
|
|
1564 if refSeq[i] == 'G':
|
|
1565 baseConf = self.qualityCharToErrorProb(readQual[i])
|
|
1566 ctx_type = self.mcContextType(self.refGenomeBuf[chr], real_pos, strand)
|
|
1567 read_base = self.complementaryBase(readSeq[i])
|
|
1568 line = "%s\t%d\t%s\t%s\t%s\t%g\n" % (chr, real_pos, strand, ctx_type, readSeq[i], (1-mismapProb) * (1-baseConf))
|
|
1569 # logging.debug("extractMcContextsByOneRead: line=%s" % line)
|
|
1570 lines.append(line)
|
|
1571 real_pos += 1
|
|
1572 return lines
|
|
1573
|
|
1574
|
|
1575 def complementaryBase(self, base):
|
|
1576 """
|
|
1577 compute a complemetary base.
|
|
1578 """
|
|
1579
|
|
1580 if base == 'A': return 'T'
|
|
1581 if base == 'C': return 'G'
|
|
1582 if base == 'G': return 'C'
|
|
1583 if base == 'T': return 'A'
|
|
1584 if base == 'N': return 'N'
|
|
1585 if base == 'a': return 't'
|
|
1586 if base == 'c': return 'g'
|
|
1587 if base == 'g': return 'c'
|
|
1588 if base == 't': return 'a'
|
|
1589 if base == 'n': return 'n'
|
|
1590 sys.exit(1)
|
|
1591
|
|
1592
|
|
1593 def findStrandFromAlignment(self, chr, ref_start, read_seq, read_len):
|
|
1594 plus_sup = 0
|
|
1595 minus_sup = 0
|
|
1596 other_mismatch = 0
|
|
1597 base_size = 0
|
|
1598 ref_seq = self.refGenomeBuf[chr]
|
|
1599 for i in range(read_len):
|
|
1600 j = ref_start + i
|
|
1601 if ref_seq[j]=='C' and read_seq[i]=='T':
|
|
1602 plus_sup += 1
|
|
1603 base_size += 1
|
|
1604 elif ref_seq[j]=='G' and read_seq[i]=='A':
|
|
1605 minus_sup += 1
|
|
1606 base_size += 1
|
|
1607 elif (ref_seq[j]!='-' and read_seq[i]!='-') and ref_seq[j]!=read_seq[i]:
|
|
1608 other_mismatch += 1
|
|
1609 base_size += 1
|
|
1610 if base_size==0:
|
|
1611 return '.'
|
|
1612 mismatch_rate = float(other_mismatch)/float(base_size)
|
|
1613 # if plus_sup > minus_sup and plus_sup > other_mismatch:
|
|
1614 # if plus_sup > minus_sup and mismatch_rate < 0.05:
|
|
1615 if plus_sup > minus_sup:
|
|
1616 return '+'
|
|
1617 # if minus_sup > plus_sup and minus_sup > other_mismatch:
|
|
1618 # if minus_sup > plus_sup and mismatch_rate < 0.05:
|
|
1619 if minus_sup > plus_sup:
|
|
1620 return '-'
|
|
1621 if plus_sup == minus_sup:
|
|
1622 return '+-'
|
|
1623 return '.'
|
|
1624
|
|
1625
|
|
1626 def __extractMcContextsByOneRead(self, chr, strand, mismapProb, refSeq, refStart, refSeqLen, readSeq, readQual):
|
|
1627 """
|
|
1628 extract mC context by one read.
|
|
1629 """
|
|
1630
|
|
1631 logging.debug("extractMcContextsByOneRead(%s, %s, %g, %s, %d, %d, %s, %s)" % (chr, strand, mismapProb, refSeq, refStart, refSeqLen, readSeq, readQual))
|
|
1632
|
|
1633 nogap_refseq = self.clearGap(refSeq)
|
|
1634 bases = list(refSeq)
|
|
1635 last_pos = len(nogap_refseq) - 1
|
|
1636 pos = -1
|
|
1637 while True:
|
|
1638 try:
|
|
1639 pos = bases.index("C", pos + 1)
|
|
1640 num_gaps = refSeq.count("-", 0, pos)
|
|
1641 real_pos = pos - num_gaps
|
|
1642 ctx_type = self.mcContextType(nogap_refseq, real_pos)
|
|
1643 ctx_pos = refStart + real_pos
|
|
1644 if ctx_type == None:
|
|
1645 if strand == "+":
|
|
1646 ctx_type = self.mcContextType(self.targetSeqD, ctx_pos)
|
|
1647 elif strand == "-":
|
|
1648 ctx_type = self.mcContextType(self.targetSeqC, ctx_pos)
|
|
1649 if ctx_type == None:
|
|
1650 continue
|
|
1651 if strand == "-":
|
|
1652 ctx_pos = refSeqLen - ctx_pos - 1
|
|
1653 line = "%d\t%s\t%s\t%s\n" % (ctx_pos, strand, ctx_type, readSeq[pos])
|
|
1654 base_name = self.mcContextFileBaseName(ctx_pos)
|
|
1655 if base_name not in self.mcDetectData:
|
|
1656 self.mcDetectData[base_name] = []
|
|
1657 self.mcDetectData[base_name].append(line)
|
|
1658 except IndexError:
|
|
1659 logging.debug("extractMcContextsByOneRead#IndexError: %s %d %s %s %s %d" % (chr, ctx_pos, strand, ctx_type, readSeq, pos))
|
|
1660 except ValueError:
|
|
1661 break
|
|
1662
|
|
1663 def qualityCharToErrorProb(self, qualityChar):
|
|
1664 """
|
|
1665 convert FASTQ (Sanger) quality character to error probability.
|
|
1666 """
|
|
1667 return 10**((ord('!')-ord(qualityChar))*0.1)
|
|
1668
|
|
1669 def outputMcContextData(self):
|
|
1670 """
|
|
1671 output mC context data to file.
|
|
1672 """
|
|
1673
|
|
1674 logging.info("Output mC detection data start.")
|
|
1675
|
|
1676 for base_name in self.mcDetectData.keys():
|
|
1677 fpath = self.mcContextFilePathByName(base_name)
|
|
1678 logging.debug("%s: %d" % (fpath, len(self.mcDetectData[base_name])))
|
|
1679 fo = open(fpath, "a")
|
|
1680 fo.write("".join(self.mcDetectData[base_name]))
|
|
1681 fo.close()
|
|
1682 self.mcDetectData.clear()
|
|
1683
|
|
1684 logging.info("Output mC detection data done.")
|
|
1685
|
|
1686
|
|
1687 def mcContextHash(self):
|
|
1688 h = {}
|
|
1689 for strand in self.strands():
|
|
1690 h[strand] = {}
|
|
1691 for mc_ctx in self.mcContextTypes():
|
|
1692 h[strand][mc_ctx] = {}
|
|
1693
|
|
1694 return h
|
|
1695
|
|
1696
|
|
1697 def isC(self, seq):
|
|
1698 return seq[0:1].upper() == "C"
|
|
1699
|
|
1700
|
|
1701 def isT(self, seq):
|
|
1702 return seq[0:1].upper() == "T"
|
|
1703
|
|
1704
|
|
1705 def calcCoverage(self, seqAry, strand):
|
|
1706 """
|
|
1707 count the number of C and T (or G and A), calculate mC ratio.
|
|
1708 """
|
|
1709
|
|
1710 num_c = 0.0
|
|
1711 num_t = 0.0
|
|
1712 num_all = 0
|
|
1713 (C, T) = ('C', 'T')
|
|
1714 if strand == '-':
|
|
1715 (C, T) = ('G', 'A')
|
|
1716 for i in seqAry:
|
|
1717 num_all += 1
|
|
1718 if i[0] == C:
|
|
1719 num_c += i[1]
|
|
1720 elif i[0] == T:
|
|
1721 num_t += i[1]
|
|
1722
|
|
1723 num_ct = num_c + num_t
|
|
1724 if num_all == 0 or num_ct == 0:
|
|
1725 return (0, 0)
|
|
1726 # return (num_all, num_c/num_all)
|
|
1727 return (num_all, num_c/num_ct)
|
|
1728
|
|
1729
|
|
1730 def mafMismapValue(self, aLine):
|
|
1731 """
|
|
1732 get mismap value by maf "a" line.
|
|
1733 """
|
|
1734
|
|
1735 mismap_fields = filter((lambda s: s.startswith('mismap=')), aLine.split())
|
|
1736 if len(mismap_fields) > 0:
|
|
1737 return float(mismap_fields[0][7:])
|
|
1738 else:
|
|
1739 return None
|
|
1740
|
|
1741
|
|
1742 def mcContextFilePath(self, pos):
|
|
1743 """
|
|
1744 get mC context data file path with specified position.
|
|
1745 """
|
|
1746
|
|
1747 return self.mcContextFilePathByName(self.mcContextFileBaseName(pos))
|
|
1748
|
|
1749
|
|
1750 def mcContextFilePathByName(self, name):
|
|
1751 """
|
|
1752 get mC context data file path with specified name.
|
|
1753 """
|
|
1754
|
|
1755 return "%s/%s/%s.tsv" % (self.localDir, self.targetChr, name)
|
|
1756
|
|
1757
|
|
1758 def mcContextFileBaseName(self, pos):
|
|
1759 """
|
|
1760 get mC context data file name with specified chromosome number.
|
|
1761 """
|
|
1762
|
|
1763 return "%010d" % (int(pos) / 100000)
|
|
1764
|
|
1765
|
|
1766 def mcContextLocalFilePath(self, chrNo):
|
|
1767 """
|
|
1768 get local mC context data file path with specified chromosome number.
|
|
1769 """
|
|
1770
|
|
1771 return "%s/%s.tsv" % (self.localDir, chrNo)
|
|
1772
|
|
1773
|
|
1774 def mcContextGlobalFilePath(self, chrNo):
|
|
1775 """
|
|
1776 get global mC context data file path with specified chromosome number.
|
|
1777 """
|
|
1778
|
|
1779 return "%s/%s.tsv" % (self.mcContextDir, chrNo)
|
|
1780
|
|
1781
|
|
1782 class SamAlnParser(BsfCallBase):
|
|
1783
|
|
1784 def __init__(self, samfile, aln):
|
|
1785 self.samfile = samfile
|
|
1786 self.aln = aln
|
|
1787
|
|
1788 self.refName = None
|
|
1789 self.refSeq = None
|
|
1790 self.refSeqLen = None
|
|
1791
|
|
1792 self.strand = None
|
|
1793
|
|
1794 self.alnRefStart = None
|
|
1795 self.alnRefSeq = None
|
|
1796 self.alnRefSeqLen = None
|
|
1797
|
|
1798 self.alnReadSeq = None
|
|
1799
|
|
1800
|
|
1801 def setRefGenome(self, name, sequence, length = None):
|
|
1802 if length is None:
|
|
1803 length = len(sequence)
|
|
1804
|
|
1805 self.refName = name
|
|
1806 self.refSeq = sequence
|
|
1807 self.refSeqLen = length
|
|
1808
|
|
1809
|
|
1810 def referenceName(self):
|
|
1811 if self.aln.tid >= 0:
|
|
1812 return self.samfile.getrname(self.aln.tid)
|
|
1813 else:
|
|
1814 return None
|
|
1815
|
|
1816
|
|
1817 def getStrand(self):
|
|
1818 if self.aln.is_reverse:
|
|
1819 return "-"
|
|
1820 else:
|
|
1821 return "+"
|
|
1822
|
|
1823
|
|
1824 def parseCigar(self, cigar):
|
|
1825 return [{"op": v[-1:], "num": int(v[:-1])} for v in re.findall('\d+[A-Z=]', cigar)]
|
|
1826
|
|
1827
|
|
1828 def alignmentSequences(self, refSeqPos, readSeq, cigars):
|
|
1829 refs = []
|
|
1830 rest_refseq = 0
|
|
1831
|
|
1832 reads = []
|
|
1833 read_pos = 0
|
|
1834
|
|
1835 for cigar in cigars:
|
|
1836 if cigar["op"] == "M":
|
|
1837 refs.append(self.refSeq[refSeqPos:refSeqPos+cigar["num"]])
|
|
1838 refSeqPos += cigar["num"]
|
|
1839 reads.append(readSeq[read_pos:read_pos+cigar["num"]])
|
|
1840 read_pos += cigar["num"]
|
|
1841 elif cigar["op"] == "I":
|
|
1842 refs.append("-" * cigar["num"])
|
|
1843 reads.append(readSeq[read_pos:read_pos+cigar["num"]])
|
|
1844 read_pos += cigar["num"]
|
|
1845 elif cigar["op"] == "P":
|
|
1846 refs.append("-" * cigar["num"])
|
|
1847 reads.append("-" * cigar["num"])
|
|
1848 elif cigar["op"] == "D" or cigar["op"] == "N":
|
|
1849 rest_refseq += cigar["num"]
|
|
1850 reads.append("-" * cigar["num"])
|
|
1851 elif cigar["op"] == "S":
|
|
1852 read_pos += cigar["num"]
|
|
1853
|
|
1854 if rest_refseq > 0:
|
|
1855 refs.append(self.refSeq[refSeqPos:refSeqPos+rest_refseq])
|
|
1856
|
|
1857 return {"reference": "".join(refs), "read": "".join(reads)}
|
|
1858
|
|
1859
|
|
1860 def parse(self):
|
|
1861 self.strand = self.getStrand()
|
|
1862
|
|
1863 self.alnRefStart = self.aln.pos
|
|
1864 read_seq = self.aln.seq
|
|
1865 cigars = self.parseCigar(self.aln.cigarstring)
|
|
1866 alignment = self.alignmentSequences(self.alnRefStart, read_seq, cigars)
|
|
1867
|
|
1868 if self.strand == "+":
|
|
1869 self.alnRefSeq = alignment["reference"]
|
|
1870 self.alnReadSeq = alignment["read"]
|
|
1871 elif self.strand == "-":
|
|
1872 nogap_refseq = self.clearGap(alignment["reference"])
|
|
1873 self.alnRefStart = self.complementStartPosition(self.refSeqLen, self.alnRefStart, len(nogap_refseq))
|
|
1874 self.alnRefSeq = self.complementSeq(alignment["reference"])
|
|
1875 self.alnReadSeq = self.complementSeq(alignment["read"])
|
|
1876
|