Mercurial > repos > yutaka-saito > bsfcall
diff bin/bsfcall.py @ 0:06f8460885ff
migrate from GitHub
author | yutaka-saito |
---|---|
date | Sun, 19 Apr 2015 20:51:13 +0900 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bin/bsfcall.py Sun Apr 19 20:51:13 2015 +0900 @@ -0,0 +1,1876 @@ +#!/usr/bin/env python +""" +Bisulfighter::bsf-call + +Bisulfighter (http://epigenome.cbrc.jp/bisulfighter) +by National Institute of Advanced Industrial Science and Technology (AIST) +is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License. +http://creativecommons.org/licenses/by-nc-sa/3.0/ +""" + +__version__= "1.3" + +import sys +import os +import glob +import threading +import subprocess +import Queue +from datetime import datetime +import hashlib +from string import maketrans +import gzip +import bz2 +import zipfile +import logging +from time import sleep +from shutil import copy +import re +# import pysam + +class BsfCallBase(object): + """ + base class for BsfCall, LastExecutor, McDetector. + define functions that is used in each sub classes. + """ + + def splitFilePath(self, filePath): + """ + split file path to directory path, file name (with file extension), + file name (without file extension) and file extension. + if extension of filePath is '.gz', '.gz' extension is ignored. + """ + dir_name, file_name = os.path.split(filePath) + base_name, ext = os.path.splitext(file_name) + prog = None + if (ext == '.gz' or ext == '.gzip' or ext == '.bz2' or ext == '.bzip2' or ext == '.zip'): + prog = ext[1:] + base_name, ext = os.path.splitext(base_name) + if len(ext) > 1: + ext = ext[1:] + return (dir_name, file_name, base_name, ext, prog) + + + def readNameByReadFile(self, readFilePath): + """ + get read name by read file path. + """ + dir_name, file_name, read_name, ext, prog = self.splitFilePath(readFilePath) + return read_name + + + def secondReadFilePathByFirstReadFilePath(self, readFile, secondReadType = None): + """ + get second read file path by first read file path. + if first read file path is '/path/to/read_file_1.fastq' second read file + path is '/path/to/read_file_2.fastq' + if secondReadType is specified, the extension of second read file is its + value. + """ + fpath = "" + dir_name, file_name, basename, ext, prog = self.splitFilePath(readFile) + if secondReadType: + ext = ".%s" % secondReadType + if prog is not None: + fpath = "%s/%s2%s.%s" % (dir_name, basename[0:-1], ext, prog) + else: + fpath = "%s/%s2%s" % (dir_name, basename[0:-1], ext) + return fpath + + + def pairedEndReadNumbers(self): + return (1, 2) + + + def clearGap(self, seq): + return seq.replace("-", "") + + + def complementStartPosition(self, genomeLen, subseqStart, subseqLen): + return genomeLen - subseqStart - subseqLen + + + def complementSeq(self, seq): + return seq.translate(maketrans("ATGCatgc", "TACGtacg"))[::-1] + + + def mcContextType(self, genomeSeq, cBasePos, strand='+'): + """ + get mC context type (CG, CHG, CHH) by genome sequence and C base position. + if no mC context found, return None. + """ + + try: + if strand == '+': + if genomeSeq[cBasePos + 1] == "G": + return "CG" + else: + if genomeSeq[cBasePos + 2] == "G": + return "CHG" + else: + return "CHH" + elif strand == '-': + if genomeSeq[cBasePos - 1] == "C": + return "CG" + else: + if genomeSeq[cBasePos - 2] == "C": + return "CHG" + else: + return "CHH" + return None + except IndexError: + return None + + + def chrSort(self, a, b): + return cmp(a, b) + + + def strands(self): + return ("+", "-") + + + def mcContextTypes(self): + return ("CG", "CHG", "CHH") + + + def bzip2File(self, filePath, wait = True, log = False): + """ + bzip2 file. If wait argument is False, without waiting for bzip2 process to be + completed, this function returns immediately. + """ + + if log: + logging.info("bzip2 start: %s" % filePath) + + dirpath, fname = os.path.split(filePath) + cmd = "bzip2 %s" % fname + p = subprocess.Popen(cmd, shell = True, cwd = dirpath) + if wait: + p.wait() + + if log: + logging.info("bzip2 done: %s" % filePath) + + + def isGzipFile(self, filePath): + return filePath[-3:] == ".gz" or filePath[-5:] == ".gzip" + + + def isBzip2File(self, filePath): + return filePath[-4:] == ".bz2" or filePath[-6:] == ".bzip2" + + + def isZipFile(self, filePath): + return filePath[-4:] == ".zip" + + + def isMafFile(self, filePath): + f = open(filePath, "rb") + data = f.read(1) + f.close() + if re.match("[\x20-\x7E]", data): + f = open(filePath, "r") + first_line = f.readline() + if first_line[0:5] != "track" and first_line[0] != "#" and first_line[0:2] != "a ": + f.close() + return False + + if first_line[0:2] == "a ": + cnt = 2 + else: + cnt = 1 + + while True: + line = f.readline() + if line == "": + break + if line[0] == "#" or line.strip() == "": + continue + + if cnt == 1 and line[0:2] != "a ": + f.close() + return False + + if cnt == 2 and line[0:2] != "s ": + f.close() + return False + + if cnt == 3: + f.close() + if line[0:2] == "s ": + return True + else: + return False + + cnt += 1 + + f.close() + return False + else: + return False + + + def isBamFile(self, filePath): + bgzf_magic = b"\x1f\x8b\x08\x04" + + f = open(filePath, "rb") + data = f.read(4) + f.close() + + return data == bgzf_magic + + + def isSamFile(self, filePath): + f = open(filePath, "rb") + data = f.read(1) + if data == "@": + tag = f.read(2) + if tag == "HD" or tag == "SQ" or tag == "RG" or tag == "CO": + f.close() + return True + + f.seek(0) + data = f.read(1) + f.close() + if re.match("[\x20-\x7E]", data): + f = open(filePath, "r") + line = f.readline() + f.close() + return len(line.split("\t")) > 10 + else: + return False + + + def scriptDir(self): + return os.path.dirname(os.path.abspath(sys.argv[0])) + + + def chrnoFromFastaDescription(self, description): + """ + get chromosome number by fasta description line. + """ + + return description.strip()[1:].strip() + + + def chrsByRefGenome(self, refGenome): + """ + get all chromosome numbers that is included in the reference genome. + """ + + chrs = [] + for line in open(refGenome, "r"): + line = line.strip() + if line[0] == ">": + chrs.append(self.chrnoFromFastaDescription(line)) + + return chrs + + + def readRefGenome(self, refGenome, refGenomeBuf, refGenomeChr): + """ + read the reference genome fasta file. + """ + + logging.info("BsfCallBase::readRefGenome: %s" % refGenome) + chr = None + buf = [] + fin = open(refGenome, 'r') + for line in fin: + if line[0] == '>': + chr = self.chrnoFromFastaDescription(line) + logging.info("BsfCallBase::readRefGenome: chr=%s" % chr) + if len(buf) > 0: + refGenomeBuf[refGenomeChr[-1]]=''.join(buf) + del buf[:] + refGenomeChr.append(chr) + elif chr != None: + buf.append(line.strip().upper()) + else: + logging.fatal("BsfCallBase::readRefGenome: the specified reference genome file \"%s\" is malformed." % refGenome) + fin.close() + refGenomeBuf[refGenomeChr[-1]]=''.join(buf) + logging.info("BsfCallBase::readRefGenome: done.") + return + + + def lastalOpts(self, lastOpt): + """ + get options for lastal by bsf-call --last option + """ + + return " ".join(lastOpt.split(",")) + + + def mergeOpts(self): + return "" + + + def filterOpts(self, mismapProb, scoreThres, isPairedEnd): + """ + get filtering option. this option is specified to last-map-probs or + last-pair-probs. + """ + + option = "" + if isPairedEnd: + option = "-m%s" % str(mismapProb) + else: + option = "-s%d -m%s" % (scoreThres, str(mismapProb)) + + return option + + + def isPairedEnd(self, readAttr): + return self.pairedEndReadNumbers()[1] in readAttr + + + def jobIdByQsubOutput(self, qsubOutput): + """ + get job id submitted by qsub command and its output. + """ + + fields = qsubOutput.strip().split() + + return fields[2] + + + def waitForSubmitJobs(self, jobIds, checkInterval = 10): + """ + wait for all jobs that have been submitted with qsub command to finish. + """ + + error_jobs = [] + while True: + all_done = True + qstat = os.popen("qstat") + for line in qstat: + fields = line.strip().split() + if fields[0] in jobIds: + if fields[4].find('E') > 0: + if fields[0] not in error_jobs: + logging.fatal("Error has occurred: Job ID=%s" % fields[0]) + error_jobs.append(fields[0]) + else: + all_done = False + break + qstat.close() + + if all_done: + break + else: + sleep(checkInterval) + + return + + + def logJobSubmit(self, msg, jobId, cmd = None): + logging.info("Submit job: %s --> Job ID = %s" % (msg, jobId)) + if cmd: + logging.info(cmd) + + + def bamMapq2Mismap(self, mapq): + return pow(0.1, (float(mapq) / 10)) + + + def getAllMappingResultFiles(self, resultDirs): + mapping_result_files = [] + + for result_dir in resultDirs: + for root, dirs, files in os.walk(result_dir): + for filename in files: + logging.info("McDetector::getAllMappingResultFiles: %s" % filename) + mapping_result_file = os.path.join(root, filename) + mapping_result_files.append(mapping_result_file) + + return mapping_result_files + + +class BsfCall(BsfCallBase): + """ + class to execute bsf-call process. + """ + + def __init__(self, refGenome, readFilePaths, cmdOpts): + """ + constructor of BsfCall + """ + + self.refGenome = refGenome + self.readFilePaths = readFilePaths + self.reads = [] + self.opts = cmdOpts + + self.dataDir = None + self.genomeDir = None + self.mcContextDir = None + + self.readInFh1 = None + self.readInFh2 = None + self.numReads = {1: 0, 2: 0} + + self.mappingResultDirs = [] + self.mappingResultFiles = [] + + self.setDataDir() + self.setLogger() + + logging.info("bsf-call start.") + self.logOption() + + # self.numReadsPerFile = self.sizeForSplitRead(self.opts["split_read_size"]) + + if self.opts["mapping_dir"]: + self.opts["only_mcdetection"] = True + else: + self.opts["only_mcdetection"] = False + + + def execute(self): + """ + execute bsf-call process. + """ + + try: + if self.opts["mapping_dir"]: + # Only mc detection + self.mappingResultDirs = self.opts["mapping_dir"].split(",") + self.mappingResultFiles = self.getAllMappingResultFiles(self.mappingResultDirs) + self.opts["mapping_result_files"] = self.mappingResultFiles + else: + self.makeIndexFile() + self.prepareForReads() + self.mappingResultDirs = self.processMapping() + + logging.debug("BsfCall:execute: mapping result directories are: %s" % ','.join(self.mappingResultDirs)) + self.processMcDetection(self.mappingResultDirs, self.opts["local_dir"]) + + logging.info("bsf-call done.") + except: + logging.exception("Exception has occurred.") + + + def processMapping(self): + """ + run read mapping and filtering process. + """ + + logging.info("Mapping and filtering process start.") + + result_dirs = [] + for read_attr in self.reads: + self.runLast(read_attr) + result_dirs.append(read_attr["results_dir"]) + + logging.info("Mapping and filtering process done.") + + return result_dirs + + + def processMcDetection(self, resultDirs, localDir = None): + """ + run mC detection process. + """ + + logging.info("mC detection process start.") + + mc_detector = McDetector(self.refGenome, resultDirs, self.mcContextDir, self.opts) + mc_detector.execute(self.opts["output"], self.opts["num_threads"]) + + logging.info("mC detection process done.") + + + def setDataDir(self): + """ + create directries to store the files of the bsf-call process. + """ + + if self.opts["work_dir"]: + self.dataDir = self.opts["work_dir"] + else: + self.dataDir = self.autoWorkDir() + + self.mcContextDir = "%s/mc_contexts" % self.dataDir + + if not os.path.exists(self.dataDir): + os.makedirs(self.dataDir) + + if not os.path.exists(self.mcContextDir): + os.makedirs(self.mcContextDir) + + + def setLogger(self): + """ + create logger to store the logs of the bsf-call process. + """ + + log_level = logging.INFO + # log_level = logging.DEBUG + + log_file = "%s/bsf-call.log" % self.dataDir + file_logger = logging.FileHandler(filename=log_file) + + file_logger.setLevel(log_level) + file_logger.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s')) + + logging.getLogger().addHandler(file_logger) + logging.getLogger().setLevel(log_level) + + + def logOption(self): + """ + output bsf-call option values and arguments to the log. + """ + + if self.opts["mapping_dir"]: + logging.info("Mapping result directory is specified. Only mC detection is executed.") + logging.info(" Mapping result directory: %s" % self.opts["mapping_dir"]) + logging.info(" Reference genome: %s" % self.refGenome) + # logging.info(" Read BAM file: %s" % ("Yes" if self.opts["read_bam"] else "No")) + # logging.info(" Read SAM file: %s" % ("Yes" if self.opts["read_sam"] else "No")) + else: + logging.info("Reference genome: %s" % self.refGenome) + logging.info("Read files: %s" % self.readFilePaths) + logging.info("Working directory: %s" % self.dataDir) + logging.info("Options:") + logging.info(" Threshold of the alignment score at filtering: %d" % self.opts["aln_score_thres"]) + # logging.info(" Paired-end direction: %s" % self.opts["pe_direction"]) + # logging.info(" Options for LAST: %s" % self.opts["last_opts"]) + + logging.info(" Threshold of read coverate: %d" % self.opts["coverage"]) + logging.info(" Threshold of mC ratio: %s" % str(self.opts["lower_bound"])) + logging.info(" Threshold of the mismap probability at filtering: %s" % str(self.opts["aln_mismap_prob_thres"])) + logging.info(" Working directory: %s" % self.dataDir) + logging.info(" Local directory: %s" % self.opts["local_dir"]) + logging.info(" Output file: %s" % (self.opts["output"] if self.opts["output"] else "(stdout)")) + # logging.info(" Use cluster: %s" % ("Yes" if self.opts["use_cluster"] else "No")) + # logging.info(" Queue name: %s" % self.opts["queue_list"]) + logging.info(" Number of threads: %d" % self.opts["num_threads"]) + # logging.info(" Split read size: %s" % self.opts["split_read_size"]) + + + def prepareForReads(self): + """ + create directories to store split reads and result files. + """ + + for read_no, read_path in enumerate(self.readFilePaths): + readNo = read_no + 1 + logging.info("Preparations for a read file start: %d: %s" % (readNo, read_path)) + + data_dir = "%s/%d" % (self.dataDir, readNo) + read = {"base_dir": data_dir, "path": read_path, "reads_dir": data_dir + "/reads", "results_dir": data_dir + "/results"} + + if not os.path.exists(data_dir): + os.makedirs(data_dir) + os.makedirs(read["reads_dir"]) + os.makedirs(read["results_dir"]) + + pe_no = self.pairedEndReadNumbers()[0] + for readpath in read_path.split(","): + dir_name, file_name, base_name, ext, prog = self.splitFilePath(readpath) + file_type = self.checkReadFileType(readpath) + read[pe_no] = {"name": base_name, "fname": file_name, "type": file_type, "path": readpath} + pe_no += 1 + + is_paired_end = self.isPairedEnd(read) + logging.info("Paired-end: %s" % is_paired_end) + logging.info(" Forward: %s" % read[1]["path"]) + if is_paired_end: + logging.info(" Reverse: %s" % read[2]["path"]) + + logging.info("Preparations for a read file done") + self.reads.append(read) + return + + + def runLast(self, readAttr): + """ + run LAST programs to map reads and filtering. + """ + + is_paired_end = self.isPairedEnd(readAttr) + filter_option = self.filterOpts(self.opts["aln_mismap_prob_thres"], self.opts["aln_score_thres"], is_paired_end) + + if is_paired_end: + logging.info('BsfCall::runLast: PairedEnd') + last_exec = LastExecutorPairedEnd(self.refGenome, self.dataDir, readAttr["reads_dir"], readAttr["results_dir"], self.opts["num_threads"]) + else: + logging.info('BsfCall::runLast: Not PairedEnd') + last_exec = LastExecutorSingle(self.refGenome, self.dataDir, readAttr["reads_dir"], readAttr["results_dir"]) + + # last_exec.execute(readAttr, self.opts["num_threads"], self.lastalOpts(self.opts["last_opts"]), self.mergeOpts(), filter_option) + # last_exec.execute(readAttr, 1, self.lastalOpts(self.opts["last_opts"]), self.mergeOpts(), filter_option) + last_exec.execute(readAttr, 1, "", self.mergeOpts(), filter_option) + + + def makeIndexFile(self): + """ + create index file of reference genome. + """ + + directions = [] + if not os.path.exists("%s.f.prj" % self.refGenome): + directions.append("f") + if not os.path.exists("%s.r.prj" % self.refGenome): + directions.append("r") + + if len(directions) > 0: + logging.info("Make index file start.") + last_executor = LastExecutor(self.refGenome, self.dataDir) + last_executor.lastdb(directions, self.opts["num_threads"] > 1) + logging.info("Make index file done.") + + + def checkReadFileType(self, readFilePath): + """ + get read file type. + """ + + name, ext = os.path.splitext(readFilePath) + if ext == ".gz": + name, ext = os.path.splitext(name) + + if len(ext) > 1: + ext = ext[1:] + + file_type = None + + if ext == "sra" or "fastq" or "fasta": + file_type = ext + elif ext == "fa": + file_type = "fasta" + else: + f = open(readFilePath, "r") + first_char = f.read(1) + if first_char == "@": + file_type = "fastq" + elif first_char == ">": + file_type = "fasta" + else: + file_type = "sra" + f.close() + + return file_type + + + def splitedReadFilePath(self, outputDir, start, end, readDirection, ext): + """ + get splitted read file path. + """ + + return "%s/%010d-%010d_%d.%s" % (outputDir, start, end, readDirection, ext) + + + def fastqDumpedFilePath(self, outputDir, readName, readDirection = None): + """ + get output file path for fastq-dump command. + """ + + path = "%s/%s" % (outputDir, readName) + if readDirection: + path += "_%d" % readDirection + + return path + ".fastq" + + + def autoWorkDir(self): + """ + get working directory path determined automatically. + """ + + now = datetime.now() + + s = ",".join(self.readFilePaths) + self.refGenome + for key, value in self.opts.items(): + s += ("%s:%s" % (key, value)) + h = hashlib.md5(s).hexdigest() + + return "%s-%06d-%s" % (now.strftime("%Y%m%d-%H%M%S"), now.microsecond, h[0:16]) + + +class BsfCallCluster(BsfCall): + """ + class to execute bsf-call process on pc cluster. + """ + + def __init__(self, refGenome, readFilePaths, cmdOpts): + """ + constructor of BsfCallCluster + """ + + BsfCall.__init__(self, refGenome, readFilePaths, cmdOpts) + + self.lastExecutor = LastExecutorCluster(self.refGenome, self.opts) + self.mappingJobIds = [] + + + def processMapping(self): + """ + if bsf-call is executed on cluster, mapping and filtering job is submitted + when read file is splitted. + therefore, in this function, only wait for all jobs to finish. + """ + + logging.info("Waiting for all jobs to finish.") + + self.waitForSubmitJobs(self.mappingJobIds) + + logging.info("Mapping and filtering process done.") + + return self.lastExecutor.resultDirs + + + def processMcDetection(self, resultDirs, localDir = None): + """ + for each chromosome number, submit mC detection process job to the cluster. + after all jobs have been finished, output mC detection result. + """ + + logging.info("mC detection process start.") + + chrs = self.chrsByRefGenome(self.refGenome) + job_ids = [] + for chr_no in chrs: + job_id = self.submitMcDetectionJob(resultDirs, chr_no) + job_ids.append(job_id) + sleep(1) + logging.info("Submitted jobs: %s" % ",".join(job_ids)) + + self.waitForSubmitJobs(job_ids) + + mc_detector = McDetector(self.refGenome, resultDirs, self.mcContextDir, self.opts) + + mc_detector.chrs = chrs + mc_detector.output(self.opts["output"]) + + logging.info("mC detection process done.") + + + def submitMcDetectionJob(self, resultDirs, chrNo): + """ + submit mC detection process job to the cluster. + """ + + argv = self.qsubRemoteCommandArgv(resultDirs, chrNo) + cmd = self.qsubCommand(chrNo, " ".join(map((lambda s: '"' + s + '"'), argv))) + + qsub = os.popen(cmd) + out = qsub.read() + qsub.close() + + job_id = self.jobIdByQsubOutput(out) + self.logJobSubmit("mC detection job: %s: Mapping result directories: %s" % (chrNo, resultDirs), job_id) + + return job_id + + + def qsubRemoteCommandArgv(self, resultDirs, chrNo): + """ + get arguments for mC detection program (mc-detector.py). + """ + + argv = [] + + argv.append(self.refGenome) + argv.append(",".join(resultDirs)) + argv.append(self.mcContextDir) + argv.append(chrNo) + argv.append(str(self.opts["only_mcdetection"])) + argv.append(str(self.opts["lower_bound"])) + argv.append(str(self.opts["coverage"])) + argv.append(str(self.opts["aln_mismap_prob_thres"])) + + # if self.opts["read_bam"]: + # argv.append("BAM") + # elif self.opts["read_sam"]: + # argv.append("SAM") + # else: + # argv.append("MAF") + argv.append("MAF") + + if self.opts["local_dir"]: + argv.append(self.opts["local_dir"]) + + return argv + + + def qsubCommand(self, chrNo, cmdArgs): + """ + get qsub command to submit mC detection job. + """ + + remote_cmd = os.path.join(self.scriptDir(), "mc-detector.py") + out_file = os.path.join(self.mcContextDir, "mc-detector-%s.out" % chrNo) + err_file = os.path.join(self.mcContextDir, "mc-detector-%s.err" % chrNo) + + if self.opts["queue_list"]: + cmd = "qsub -o %s -e %s -q %s -pe openmpi %d -cwd -b y %s %s" % (out_file, err_file, self.opts["queue_list"], self.opts['num_threads'], remote_cmd, cmdArgs) + else: + cmd = "qsub -o %s -e %s -pe openmpi %d -cwd -b y %s %s" % (out_file, err_file, self.opts['num_threads'], remote_cmd, cmdArgs) + + return cmd + + + def afterProcessSplitRead(self, readFile, readAttr = None): + """ + this function is called after output splitted one read file. + if bsf-call is executed on pc cluster, mapping and filtering job is submitted + after output one splitted read file. + """ + + if self.isPairedEnd(readAttr): + fpath, ext = os.path.splitext(readFile) + if fpath[-1:] == "2": + read_file = "%s1.%s" % (fpath[0:-1], readAttr[1]["type"]) + job_id = self.lastExecutor.executeOneRead(readFile, readAttr) + self.mappingJobIds.append(job_id) + else: + job_id = self.lastExecutor.executeOneRead(readFile, readAttr) + self.mappingJobIds.append(job_id) + + +class LastExecutor(BsfCallBase): + """ + class to run LAST programs to map read and filtering. + """ + + def __init__(self, refGenome, baseDir = ".", readsDir = None, resultsDir = None, numThreads = 1): + """ + constructor of LastExecutor + """ + + self.refGenome = refGenome + self.baseDir = baseDir + self.readsDir = readsDir + self.resultsDir = resultsDir + self.queue = Queue.Queue() + self.lock = threading.Lock() + self.numThreads = numThreads + + + def execute(self, readAttr, numThreads = 1, lastalOpts = "", mergeOpts = "", filterOpts = ""): + """ + enqueue all splited read files to the queue. + create and start threads to execute read mapping and filtering process. + wait for all threads to finish. + """ + + self.enqueue(readAttr) + + if self.queue.qsize()==0: + logging.fatal("LastExecutor::execute: Error: queue size=%d" % self.queue.qsize()) + sys.exit(1) + + logging.info("Queued %d files." % self.queue.qsize()) + + threads = [] + for i in range(numThreads): + t = threading.Thread(target=self.worker, args=(readAttr, lastalOpts, mergeOpts, filterOpts)) + t.daemon = True + threads.append(t) + t.start() + + i = 1 + for thread in threads: + logging.info("Waiting for %d th thread." % i) + thread.join() + logging.info("Joined %d th thread." % i) + i+=1 + + + def worker(self, readAttr, lastalOpts, mergeOpts, filterOpts): + """ + thread worker to execute LAST. + dequeue read file path from the queue and execute read mapping filtering + process. + """ + if self.queue.empty(): + logging.info("LastExecutor::worker: Queue is empty.") + while not self.queue.empty(): + fpath = self.queue.get_nowait() + self.runLast(fpath, readAttr, lastalOpts, mergeOpts, filterOpts) + return + + + def runLast(self, readFile, readAttr, lastalOpts, mergeOpts, filterOpts, rmInFiles = True): + """ + execute LAST programs to map read and filtering. + """ + + cmd = self.batchCmd(readFile, readAttr, lastalOpts, mergeOpts, filterOpts, rmInFiles) + logging.info("LastExecutor::runLast: command=%s" % cmd) + p = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) + # out, error = p.communicate() + p.wait() + + error_msg = p.communicate()[1] + if len(error_msg) > 0: + logging.fatal(error_msg) + + + def enqueue(self, readAttr): + """ + enqueue all splitted read files to the queue. + """ + + # logging.info("%s/*_1.%s.bz2" % (self.readsDir, readAttr[1]["type"])) + # for read_file in glob.glob("%s/*_1.%s" % (self.readsDir, readAttr[1]["type"])): + # self.queue.put(read_file) + self.queue.put(readAttr[1]["path"]) + + + def lastdb(self, directions, parallel = False): + """ + execute lastdb command to create index file of reference genome. + """ + + cmds = [] + for direction in directions: + cmds.append(self.lastdbCmd(direction)) + + if parallel: + processes = [] + for cmd in cmds: + logging.info(cmd) + p = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.STDOUT) + out = p.stdout + processes.append({"p": p, "out": out}) + for process in processes: + process["p"].wait() + out_data = process["out"].read() + if len(out_data) > 0: + logging.info(out_data) + else: + for cmd in cmds: + logging.info(cmd) + p = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.STDOUT) + out = p.stdout + p.wait() + out_data = out.read() + if len(out_data) > 0: + logging.info(out_data) + + + def lastdbCmd(self, direction): + """ + get lastdb command to create index file of reference genome. + """ + # return "lastdb -w2 -u bisulfite_%s.seed %s.%s %s" % (direction, self.refGenome, direction, self.refGenome) + return "lastdb -uBIS%s %s.%s %s" % (direction.upper(), self.refGenome, direction, self.refGenome) + + + def lastalBsCmd(self, readFile, opts = ""): + """ + get lastal command to map read. + """ + # s_opt = self.lastalSopt(direction) + read_name = self.readNameByReadFile(readFile) + return "TMPDIR=%s last-bisulfite.sh %s.%s %s.%s %s > %s" % (self.readsDir, self.refGenome, 'f', self.refGenome, 'r', readFile, self.mappingResultFilePath(read_name)) + + def lastalBsPairCmd(self, readFile1, readFile2, opts = ""): + """ + get lastal command to map read. + """ + # s_opt = self.lastalSopt(direction) + read_name = self.readNameByReadFile(readFile) + return "PARALLEL=-j%d TMPDIR=%s last-bisulfite-pair.sh %s.%s %s.%s %s %s > %s" % (self.numThreads, self.readsDir, self.refGenome, 'f', self.refGenome, 'r', readFile1, readFile2, self.mappingResultFilePath(read_name)) + + + # def mergeCmd(self, forwardFile, reverseFile, outputFile, opts = "", rmInFiles = True): + def mergeCmd(self, inputFile, outputFile, opts = "", rmInFiles = True): + """ + get command to merge lastal output. + """ + cmd = "last-merge-batches %s > %s" % (inputFile, outputFile) + if rmInFiles: + cmd += "; rm %s" % inputFile + return cmd + + + # def mappingAndMergeCmd(self, readFile, lastalOpts = "", mergeOpts = "", rmInFiles = True): + def mappingPairCmd(self, readFile1, readFile2, lastalOpts = "", mergeOpts = "", rmInFiles = True): + """ + get read mapping and filtering command. + """ + read_name = self.readNameByReadFile(readFile) + n, ext = os.path.splitext(readFile) + if ext == ".gz": + n, ext = os.path.splitext(n) + lastal_qopt = self.lastalQopt(ext[1:]) + lastal_opt = "%s %s" % (lastalOpts, lastal_qopt) + mapping_file = self.mappingResultFilePath(read_name) + + return self.lastalBsPairCmd(readFile1, readFile2, lastal_opt) + + + # def mappingResultFilePath(self, readName, direction): + def mappingResultFilePath(self, readName): + """ + get read mapping result file path. + """ + + return "%s/%s.maf" % (self.resultsDir, readName) + + + def mergeResultFilePath(self, readName): + """ + get merge result file path. + """ + + return "%s/%s.merge.maf" % (self.resultsDir, readName) + + + def filterResultFilePath(self, readName): + """ + get filtering result file path. + """ + + return "%s/%s.maf" % (self.resultsDir, readName) + + + def lastalSopt(self, direction): + """ + get -s option for lastal. + """ + + opt = "" + if direction == "f": + opt = "-s1" + elif direction == "r": + opt = "-s0" + + return opt + + + def lastalQopt(self, fileType): + """ + get -Q option for lastal. + """ + + opt = "" + if fileType == "fasta": + opt = "-Q0" + elif fileType == "fastq": + opt = "-Q1" + + return opt + + +class LastExecutorCluster(LastExecutor): + """ + class to run LAST programs on pc cluster. + """ + + def __init__(self, refGenome, bsfCallOpts): + """ + constructor of LastExecutorCluster + """ + + self.refGenome = refGenome + self.opts = bsfCallOpts + + self.resultDirs = [] + + + def executeOneRead(self, readFile, readAttr): + """ + execute read mapping and filtering process with specified read. + on pc cluster, submit mapping and filtering job and return job id. + """ + + if readAttr["results_dir"] not in self.resultDirs: + self.resultDirs.append(readAttr["results_dir"]) + + lastal_opts = self.lastalOpts(self.opts["last_opts"]) + merge_opts = self.mergeOpts() + filter_opts = self.filterOpts(self.opts["aln_mismap_prob_thres"], self.opts["aln_score_thres"], self.isPairedEnd(readAttr)) + job_id = self.submitJob(readFile, readAttr, lastal_opts, merge_opts, filter_opts, self.opts["queue_list"]) + + return job_id + + + def submitJob(self, readFile, readAttr, lastalOpts = "", mergeOpts = "", filterOpts = "", queueName = None): + """ + submit read mapping and filtering process job. + """ + + job_id = None + + read_name = self.readNameByReadFile(readFile)[0:-2] + out_file = self.qsubStdoutFilePath(readAttr["base_dir"], read_name) + err_file = self.qsubStderrFilePath(readAttr["base_dir"], read_name) + remote_cmd = self.remoteCommand(readAttr) + remote_cmd_args = " ".join(map((lambda s: '"' + s + '"'), self.remoteCommandArgv(read_name, readAttr, lastalOpts, filterOpts))) + + if queueName: + cmd = "qsub -o %s -e %s -q %s -cwd %s %s" % (out_file, err_file, queueName, remote_cmd, remote_cmd_args) + else: + cmd = "qsub -o %s -e %s -cwd %s %s" % (out_file, err_file, remote_cmd, remote_cmd_args) + + qsub = os.popen(cmd) + out = qsub.read() + qsub.close() + + job_id = self.jobIdByQsubOutput(out) + + dir_path, file_name, base_name, ext, prog = self.splitFilePath(readFile) + self.logJobSubmit("Mapping and filtering: read: %s/%s" % (dir_path, base_name[0:-2]), job_id) + + return job_id + + + def remoteCommand(self, readAttr): + """ + get read mapping and filtering command path to submit by qsub command. + """ + + if self.isPairedEnd(readAttr): + return os.path.join(self.scriptDir(), "mapping-p.sh") + else: + return os.path.join(self.scriptDir(), "mapping-s.sh") + + + def remoteCommandArgv(self, readName, readAttr, lastalOpts, filterOpts): + """ + get read mapping and filtering command arguments. + """ + + argv = [] + + argv.append(readAttr["reads_dir"]) + argv.append(readAttr["results_dir"]) + argv.append(self.refGenome) + argv.append(filterOpts) + + argv.append(readName) + argv.append(readAttr[1]["type"]) + argv.append("%s %s" % (lastalOpts, self.lastalQopt(readAttr[1]["type"]))) + + if self.isPairedEnd(readAttr): + argv.append(readName) + argv.append(readAttr[2]["type"]) + argv.append("%s %s" % (lastalOpts, self.lastalQopt(readAttr[2]["type"]))) + + return argv + + + def qsubStdoutFilePath(self, dirPath, readName): + """ + get qsub command standard output file path. + """ + + return "%s/mapping_%s.out" % (dirPath, readName) + + + def qsubStderrFilePath(self, dirPath, readName): + """ + get qsub command standard error file path. + """ + + return "%s/mapping_%s.err" % (dirPath, readName) + + +class LastExecutorSingle(LastExecutor): + """ + class to run LAST programs to map single read and filtering. + """ + + def __init__(self, refGenome, baseDir, readsDir, resultsDir): + """ + constructor of LastExecutorSingle + """ + + LastExecutor.__init__(self, refGenome, baseDir, readsDir, resultsDir) + + + def batchCmd(self, readFile1, readAttr, lastalOpts = "", mergeOpts = "", filterOpts = "", rmInFiles = True): + """ + get batch command to map read and filtering. + """ + read_name = self.readNameByReadFile(readFile1) + out_file = self.filterResultFilePath(read_name[0:-2]) + return "TMPDIR=%s last-bisulfite.sh %s.f %s.r %s > %s/%s.maf" % (self.readsDir, self.refGenome, self.refGenome, readFile1, self.resultsDir, read_name) + # cmds = [] + # cmds.append(self.mappingAndMergeCmd(readFile, lastalOpts, mergeOpts, rmInFiles)) + # cmds.append(self.mappingPairCmd(readFile1, readFile2, lastalOpts, mergeOpts, rmInFiles)) + # cmds.append(self.filterCmd(self.mergeResultFilePath(read_name), out_file, filterOpts, rmInFiles)) + # cmds.append(self.filterCmd(self.mappingResultFilePath(read_name), out_file, filterOpts, rmInFiles)) + # cmds.append("bzip2 %s" % out_file) + # return "; ".join(cmds) + + + def filterCmd(self, inputFile, outputFile, opts = "", rmInFile = True): + """ + get filter command. + """ + + cmd = "last-map-probs %s %s > %s" % (opts, inputFile, outputFile) + if rmInFile: + cmd += "; rm %s" % inputFile + + return cmd + + +class LastExecutorPairedEnd(LastExecutor): + """ + class to run LAST programs to map paired-end read and filtering. + """ + + def __init__(self, refGenome, baseDir, readsDir, resultsDir, numThreads): + """ + constructor of LastExecutorPairedEnd + """ + LastExecutor.__init__(self, refGenome, baseDir, readsDir, resultsDir) + + + def batchCmd(self, readFile1, readAttr, lastalOpts = "", mergeOpts = "", filterOpts = "", rmInFiles = True): + """ + get batch command to map read and filtering. + """ + # readFile2 = self.secondReadFilePathByFirstReadFilePath(readFile1, readAttr[2]["type"]) + # read_name1 = self.readNameByReadFile(readFile1) + # read_name2 = self.readNameByReadFile(read_files[1]) + # merge_result_file = "%s %s" % (self.mergeResultFilePath(read_name1), self.mergeResultFilePath(read_name2)) + # mapping_result_file = "%s %s" % (self.mappingResultFilePath(read_name1), self.mappingResultFilePath(read_name2)) + # out_file = self.filterResultFilePath(read_name1[0:-2]) + # return "TMPDIR=%s last-bisulfite-paired.sh %s.f %s.r %s %s > %s" % (self.baseDir, self.refGenome, self.refGenome, readFile1, readFile2, out_file) + read_name1 = self.readNameByReadFile(readAttr[1]["path"]) + read_name2 = self.readNameByReadFile(readAttr[2]["path"]) + return "PARALLEL=-j%d TMPDIR=%s last-bisulfite-paired.sh %s.f %s.r %s %s > %s/%s,%s.maf" % (self.numThreads, self.readsDir, self.refGenome, self.refGenome, readAttr[1]["path"], readAttr[2]["path"], self.resultsDir, read_name1, read_name2) + + + def filterCmd(self, inputFile, outputFile, opts = "", rmInFile = True): + """ + get filter command. + """ + + cmd = "last-pair-probs %s %s > %s" % (opts, inputFile, outputFile) + if rmInFile: + cmd += "; rm %s" % inputFile + + return cmd + + +class McDetector(BsfCallBase): + """ + class to execute mC detection process. + """ + + def __init__(self, refGenome, resultDirs, mcContextDir, options): + """ + constructor of McDetector + """ + + self.refGenome = refGenome + self.refGenomeBuf = {} + self.refGenomeChr = [] + + self.mappingResultDirs = resultDirs + self.mcContextDir = mcContextDir + self.lowerBound = options["lower_bound"] + self.coverageThreshold = options["coverage"] + self.onlyMcDetection = options["only_mcdetection"] + + self.opts = options + + self.mappingResultFiles = [] + + if self.onlyMcDetection: + self.mismapThreshold = options["aln_mismap_prob_thres"] + # self.readBam = options["read_bam"] + # self.readSam = options["read_sam"] + self.readBam = False + self.readSam = False + if "mapping_result_files" in options: + self.mappingResultFiles = options["mapping_result_files"] + else: + self.mappingResultFiles = self.getAllMappingResultFiles(resultDirs) + else: + self.mismapThreshold = 1e-10 + self.readBam = False + self.readSam = False + self.mappingResultFiles = self.getAllMappingResultFiles(resultDirs) + + if len(self.mappingResultFiles)==0: + logging.fatal("McDetector::__init__: error: no mapping result file found.") + sys.exit(1) + + if options["local_dir"]: + self.localDir = options["local_dir"] + else: + self.localDir = mcContextDir + + self.readRefGenome(self.refGenome, self.refGenomeBuf, self.refGenomeChr) + + logging.debug("McDetector::__init__: mappingResultDirs=%s" % ','.join(self.mappingResultDirs)) + + + def execute(self, outputFile, numWorkers = 1): + """ + execute mC detection process and output result. + """ + + self.process() + self.output(outputFile) + + + def process(self): + """ + execute mC detection process. + """ + + logging.info("mC detection process start") + + if len(self.mappingResultFiles)==0: + logging.fatal("McDetector::process: error: no mapping result found.") + sys.exit(1) + + for mapping_result_file in self.mappingResultFiles: + logging.info("Parsing mapping result file: %s" % mapping_result_file) + if self.onlyMcDetection: + if not (self.readBam or self.readSam): + if self.isGzipFile(mapping_result_file) or self.isMafFile(mapping_result_file): + self.processMafFile(mapping_result_file) + else: + # BAM or SAM + if self.readBam and self.isBamFile(mapping_result_file): + self.processSamFile(mapping_result_file) + elif self.readSam and self.isSamFile(mapping_result_file): + self.processSamFile(mapping_result_file) + else: + self.processMafFile(mapping_result_file) + + logging.info("Parsing mapping result file done") + + if self.mcContextDir != self.localDir: + copy(self.mcContextLocalFilePath(self.targetChr), self.mcContextDir) + + + def output(self, outputFile): + """ + output mC detection result. + """ + + logging.info("McDetector::output: outputFile=%s" % outputFile) + popen_args = ['sort', '-k1'] + list = glob.glob("%s/*.bsf" % self.localDir) + if len(list)==0: + logging.fatal("McDetect::output: no *._bsf_ files found in %s" % self.localDir) + sys.exit(1) + for bsf_file in list: + if os.path.getsize(bsf_file) > 0: + popen_args.append(bsf_file) + logging.info("McDetector::output: added bsf_file=\"%s\"" % bsf_file) + logging.debug("McDetector::output: popen_args=%s" % ' '.join(popen_args)) + fout = open(outputFile, 'w') + if len(popen_args) > 2: + pipe = subprocess.Popen(popen_args, stdout=subprocess.PIPE) + block = [] + for line in pipe.stdout: + (chr, ctx_pos, strand, mc_ctx, base, conf) = line.strip().split("\t") + ctx_pos = int(ctx_pos) + conf = float(conf) + if len(block)==0 or block[-1][0]==chr: + block.append([chr, ctx_pos, strand, mc_ctx, base, conf]) + else: + self.outputOneChrBlock(fout, block) + del block[:] + block.append([chr, ctx_pos, strand, mc_ctx, base, conf]) + if len(block) > 0: + self.outputOneChrBlock(fout, block) + pipe.stdout.close() + else: + logging.info("McDetector::output: no result files found.") + fout.close() + + + def outputOneChrBlock(self, fout, block): + """ + output mC detection result for one chromosome. + """ + + chr = block[0][0] + logging.info("McDetector::outputOneChrBlock: chr=%s" % chr) + mc_contexts = {} + for b in sorted(block, key=lambda block: block[1]): + try: + ctx_pos, strand, mc_ctx, base, conf = b[1:] + if not ctx_pos in mc_contexts: + mc_contexts[ctx_pos] = {} + if not strand in mc_contexts[ctx_pos]: + mc_contexts[ctx_pos][strand] = {} + if not mc_ctx in mc_contexts[ctx_pos][strand]: + mc_contexts[ctx_pos][strand][mc_ctx] = [] + mc_contexts[ctx_pos][strand][mc_ctx].append([base, conf]) + except ValueError, e: + logging.warning("McDetect::outputOneChrBlock: value error: %s: %s -> %s" % (fpath, line.strip(), e.args[0])) + + num_entry = 0 + if len(mc_contexts.keys()) > 0: + for pos in sorted(mc_contexts.keys()): + for strand in mc_contexts[pos].keys(): + for mc_ctx in mc_contexts[pos][strand].keys(): + coverage, mc_ratio = self.calcCoverage(mc_contexts[pos][strand][mc_ctx], strand) + if coverage >= self.coverageThreshold and mc_ratio >= self.lowerBound: + fout.write("%s\t%d\t%s\t%s\t%g\t%d\n" % (chr, pos, strand, mc_ctx, mc_ratio, coverage)) + num_entry += 1 + else: + logging.info("McDetect::outputOneChrBlock: rejected: chr=%s pos=%d strand=%s coverage=%g mc_ratio=%g" % (chr, pos, strand, coverage, mc_ratio)) + # self.bzip2File(fpath, False) + logging.info("McDetector::outputOneChrBlock: number of entries %s" % num_entry) + + + def processMafFile(self, resultFile): + """ + read mapping result file, and output mC context data file. + """ + + file_name = self.splitFilePath(resultFile)[1] + outputFile = "%s/%s.bsf" % (self.localDir, file_name) + try: + fin = open(resultFile, 'r') + fout = open(outputFile, 'w') + block = [] + for line in fin: + if line[0] == '#' or line[0] == 'p' or line[0] == "\n": + continue + if line[0] == 'a' or line[0] == 's' or line[0] == 'q': + block.append(line.strip()) + if len(block)==4: + if block[0][0]=='a' and block[1][0]=='s' and block[2][0]=='s' and block[3][0]=='q': + mismap_prob = float(block[0].split('=')[2]) + if mismap_prob <= self.mismapThreshold: + b1 = block[1].split() + chr = b1[1] + ref_seq = b1[-1].upper() + ref_start = int(b1[2]) # 0-based position + read_seq = block[2].split()[-1] + read_len = len(read_seq) + read_qual = block[3].split()[-1] + strand = self.findStrandFromAlignment(chr, ref_start, read_seq, read_len) + # strand = block[2].split()[4] + if strand == '+' or strand == '-': + lines = self.extractMcContextsByOneRead(chr, strand, mismap_prob, ref_seq, ref_start, read_seq, read_qual, read_len) + for line in lines: + fout.write(line) + logging.debug("processMafFile: a maf block(%s) is successfully captured." % strand) + elif strand == '+-': + for st in ('+', '-'): + lines = self.extractMcContextsByOneRead(chr, st, mismap_prob, ref_seq, ref_start, read_seq, read_qual, read_len) + for line in lines: + fout.write(line) + logging.debug("processMafFile: a maf block(%s) is successfully captured." % strand) + else: + logging.debug("processMafFile: a maf block does not show strand-specific info.") + else: + logging.info("processMafFile: alignment \"%s\" has greater mismap prob. than the threshold." % block[0]) + del block[:] + else: + logging.fatal("processMafFile: error: unexpected malformed maf block is found.") + logging.fatal("block 1: \"%s\"\n" % block[0]) + logging.fatal("block 2: \"%s\"\n" % block[1]) + logging.fatal("block 3: \"%s\"\n" % block[2]) + logging.fatal("block 4: \"%s\"\n" % block[3]) + sys.exit(1) + fin.close() + fout.close() + + if len(block) > 0: + logging.fatal("McDetect::processMafFile: error: possible malformed MAF file.") + for b in block: + logging.fatal(b) + sys.exit(1) + except IOError: + logging.fatal("McDetect::processMafFile: error: unable to read a MAF file \"%s\"" % resultFile) + sys.exit(1) + return + + + def processSamFile(self, samFile): + """ + read mapping BAM/SAM file, and output mC context data file. + """ + + logging.info("Process BAM/SAM file start: %s" % samFile) + + samfile = None + try: + if self.readBam: + samfile = pysam.Samfile(samFile, "rb") + else: + samfile = pysam.Samfile(samFile, "r") + + counter = 1 + for aln in samfile.fetch(until_eof = True): + samaln = SamAlnParser(samfile, aln) + samaln.setRefGenome(self.targetChr, self.targetSeqD, self.targetSeqLen) + samaln.parse() + + chr = samaln.referenceName() + + if (chr is None) or (chr != self.targetChr): + continue + + if aln.mapq: + mismap = self.bamMapq2Mismap(aln.mapq) + if mismap - self.mismapThreshold > 0: + continue + + read_seq = samaln.alnReadSeq.replace("t", "C") + self.extractMcContextsByOneRead(chr, samaln.strand, 0, samaln.alnRefSeq.upper(), samaln.alnRefStart, self.targetSeqLen, read_seq, read_qual) + + counter += 1 + if counter > 500000: + self.outputMcContextData() + counter = 1 + + samfile.close() + self.outputMcContextData() + except Exception, e: + logging.fatal(samFile) + logging.fatal(" %s" % str(type(e))) + logging.fatal(" %s" % str(e)) + if samfile: + samfile.close() + + def extractMcContextsByOneRead(self, chr, strand, mismapProb, refSeq, refStart, readSeq, readQual, readLen): + """ + extract mC context by one read. + """ + + lines = [] + if strand == '+': + real_pos = refStart + for i in range(readLen): + if readSeq[i] == '-': + continue + # if refSeq[i] == 'C' and (readSeq[i] == 'C' or readSeq[i] == 'T'): + if refSeq[i] == 'C': + baseConf = self.qualityCharToErrorProb(readQual[i]) + ctx_type = self.mcContextType(self.refGenomeBuf[chr], real_pos, strand) + line = "%s\t%d\t%s\t%s\t%s\t%g\n" % (chr, real_pos, strand, ctx_type, readSeq[i], (1-mismapProb) * (1-baseConf)) + # logging.debug("extractMcContextsByOneRead: line=%s" % line) + lines.append(line) + real_pos += 1 + if strand == '-': + real_pos = refStart + for i in range(readLen): + if readSeq[i] == '-': + continue + # if refSeq[i] == 'G' and (readSeq[i] == 'G' or readSeq[i] == 'A'): + if refSeq[i] == 'G': + baseConf = self.qualityCharToErrorProb(readQual[i]) + ctx_type = self.mcContextType(self.refGenomeBuf[chr], real_pos, strand) + read_base = self.complementaryBase(readSeq[i]) + line = "%s\t%d\t%s\t%s\t%s\t%g\n" % (chr, real_pos, strand, ctx_type, readSeq[i], (1-mismapProb) * (1-baseConf)) + # logging.debug("extractMcContextsByOneRead: line=%s" % line) + lines.append(line) + real_pos += 1 + return lines + + + def complementaryBase(self, base): + """ + compute a complemetary base. + """ + + if base == 'A': return 'T' + if base == 'C': return 'G' + if base == 'G': return 'C' + if base == 'T': return 'A' + if base == 'N': return 'N' + if base == 'a': return 't' + if base == 'c': return 'g' + if base == 'g': return 'c' + if base == 't': return 'a' + if base == 'n': return 'n' + sys.exit(1) + + + def findStrandFromAlignment(self, chr, ref_start, read_seq, read_len): + plus_sup = 0 + minus_sup = 0 + other_mismatch = 0 + base_size = 0 + ref_seq = self.refGenomeBuf[chr] + for i in range(read_len): + j = ref_start + i + if ref_seq[j]=='C' and read_seq[i]=='T': + plus_sup += 1 + base_size += 1 + elif ref_seq[j]=='G' and read_seq[i]=='A': + minus_sup += 1 + base_size += 1 + elif (ref_seq[j]!='-' and read_seq[i]!='-') and ref_seq[j]!=read_seq[i]: + other_mismatch += 1 + base_size += 1 + if base_size==0: + return '.' + mismatch_rate = float(other_mismatch)/float(base_size) + # if plus_sup > minus_sup and plus_sup > other_mismatch: + # if plus_sup > minus_sup and mismatch_rate < 0.05: + if plus_sup > minus_sup: + return '+' + # if minus_sup > plus_sup and minus_sup > other_mismatch: + # if minus_sup > plus_sup and mismatch_rate < 0.05: + if minus_sup > plus_sup: + return '-' + if plus_sup == minus_sup: + return '+-' + return '.' + + + def __extractMcContextsByOneRead(self, chr, strand, mismapProb, refSeq, refStart, refSeqLen, readSeq, readQual): + """ + extract mC context by one read. + """ + + logging.debug("extractMcContextsByOneRead(%s, %s, %g, %s, %d, %d, %s, %s)" % (chr, strand, mismapProb, refSeq, refStart, refSeqLen, readSeq, readQual)) + + nogap_refseq = self.clearGap(refSeq) + bases = list(refSeq) + last_pos = len(nogap_refseq) - 1 + pos = -1 + while True: + try: + pos = bases.index("C", pos + 1) + num_gaps = refSeq.count("-", 0, pos) + real_pos = pos - num_gaps + ctx_type = self.mcContextType(nogap_refseq, real_pos) + ctx_pos = refStart + real_pos + if ctx_type == None: + if strand == "+": + ctx_type = self.mcContextType(self.targetSeqD, ctx_pos) + elif strand == "-": + ctx_type = self.mcContextType(self.targetSeqC, ctx_pos) + if ctx_type == None: + continue + if strand == "-": + ctx_pos = refSeqLen - ctx_pos - 1 + line = "%d\t%s\t%s\t%s\n" % (ctx_pos, strand, ctx_type, readSeq[pos]) + base_name = self.mcContextFileBaseName(ctx_pos) + if base_name not in self.mcDetectData: + self.mcDetectData[base_name] = [] + self.mcDetectData[base_name].append(line) + except IndexError: + logging.debug("extractMcContextsByOneRead#IndexError: %s %d %s %s %s %d" % (chr, ctx_pos, strand, ctx_type, readSeq, pos)) + except ValueError: + break + + def qualityCharToErrorProb(self, qualityChar): + """ + convert FASTQ (Sanger) quality character to error probability. + """ + return 10**((ord('!')-ord(qualityChar))*0.1) + + def outputMcContextData(self): + """ + output mC context data to file. + """ + + logging.info("Output mC detection data start.") + + for base_name in self.mcDetectData.keys(): + fpath = self.mcContextFilePathByName(base_name) + logging.debug("%s: %d" % (fpath, len(self.mcDetectData[base_name]))) + fo = open(fpath, "a") + fo.write("".join(self.mcDetectData[base_name])) + fo.close() + self.mcDetectData.clear() + + logging.info("Output mC detection data done.") + + + def mcContextHash(self): + h = {} + for strand in self.strands(): + h[strand] = {} + for mc_ctx in self.mcContextTypes(): + h[strand][mc_ctx] = {} + + return h + + + def isC(self, seq): + return seq[0:1].upper() == "C" + + + def isT(self, seq): + return seq[0:1].upper() == "T" + + + def calcCoverage(self, seqAry, strand): + """ + count the number of C and T (or G and A), calculate mC ratio. + """ + + num_c = 0.0 + num_t = 0.0 + num_all = 0 + (C, T) = ('C', 'T') + if strand == '-': + (C, T) = ('G', 'A') + for i in seqAry: + num_all += 1 + if i[0] == C: + num_c += i[1] + elif i[0] == T: + num_t += i[1] + + num_ct = num_c + num_t + if num_all == 0 or num_ct == 0: + return (0, 0) + # return (num_all, num_c/num_all) + return (num_all, num_c/num_ct) + + + def mafMismapValue(self, aLine): + """ + get mismap value by maf "a" line. + """ + + mismap_fields = filter((lambda s: s.startswith('mismap=')), aLine.split()) + if len(mismap_fields) > 0: + return float(mismap_fields[0][7:]) + else: + return None + + + def mcContextFilePath(self, pos): + """ + get mC context data file path with specified position. + """ + + return self.mcContextFilePathByName(self.mcContextFileBaseName(pos)) + + + def mcContextFilePathByName(self, name): + """ + get mC context data file path with specified name. + """ + + return "%s/%s/%s.tsv" % (self.localDir, self.targetChr, name) + + + def mcContextFileBaseName(self, pos): + """ + get mC context data file name with specified chromosome number. + """ + + return "%010d" % (int(pos) / 100000) + + + def mcContextLocalFilePath(self, chrNo): + """ + get local mC context data file path with specified chromosome number. + """ + + return "%s/%s.tsv" % (self.localDir, chrNo) + + + def mcContextGlobalFilePath(self, chrNo): + """ + get global mC context data file path with specified chromosome number. + """ + + return "%s/%s.tsv" % (self.mcContextDir, chrNo) + + +class SamAlnParser(BsfCallBase): + + def __init__(self, samfile, aln): + self.samfile = samfile + self.aln = aln + + self.refName = None + self.refSeq = None + self.refSeqLen = None + + self.strand = None + + self.alnRefStart = None + self.alnRefSeq = None + self.alnRefSeqLen = None + + self.alnReadSeq = None + + + def setRefGenome(self, name, sequence, length = None): + if length is None: + length = len(sequence) + + self.refName = name + self.refSeq = sequence + self.refSeqLen = length + + + def referenceName(self): + if self.aln.tid >= 0: + return self.samfile.getrname(self.aln.tid) + else: + return None + + + def getStrand(self): + if self.aln.is_reverse: + return "-" + else: + return "+" + + + def parseCigar(self, cigar): + return [{"op": v[-1:], "num": int(v[:-1])} for v in re.findall('\d+[A-Z=]', cigar)] + + + def alignmentSequences(self, refSeqPos, readSeq, cigars): + refs = [] + rest_refseq = 0 + + reads = [] + read_pos = 0 + + for cigar in cigars: + if cigar["op"] == "M": + refs.append(self.refSeq[refSeqPos:refSeqPos+cigar["num"]]) + refSeqPos += cigar["num"] + reads.append(readSeq[read_pos:read_pos+cigar["num"]]) + read_pos += cigar["num"] + elif cigar["op"] == "I": + refs.append("-" * cigar["num"]) + reads.append(readSeq[read_pos:read_pos+cigar["num"]]) + read_pos += cigar["num"] + elif cigar["op"] == "P": + refs.append("-" * cigar["num"]) + reads.append("-" * cigar["num"]) + elif cigar["op"] == "D" or cigar["op"] == "N": + rest_refseq += cigar["num"] + reads.append("-" * cigar["num"]) + elif cigar["op"] == "S": + read_pos += cigar["num"] + + if rest_refseq > 0: + refs.append(self.refSeq[refSeqPos:refSeqPos+rest_refseq]) + + return {"reference": "".join(refs), "read": "".join(reads)} + + + def parse(self): + self.strand = self.getStrand() + + self.alnRefStart = self.aln.pos + read_seq = self.aln.seq + cigars = self.parseCigar(self.aln.cigarstring) + alignment = self.alignmentSequences(self.alnRefStart, read_seq, cigars) + + if self.strand == "+": + self.alnRefSeq = alignment["reference"] + self.alnReadSeq = alignment["read"] + elif self.strand == "-": + nogap_refseq = self.clearGap(alignment["reference"]) + self.alnRefStart = self.complementStartPosition(self.refSeqLen, self.alnRefStart, len(nogap_refseq)) + self.alnRefSeq = self.complementSeq(alignment["reference"]) + self.alnReadSeq = self.complementSeq(alignment["read"]) +