Mercurial > repos > yufei-luo > s_mart
view commons/tools/LaunchMatcherInParallel.py @ 19:9bcfa7936eec
Deleted selected files
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:23:29 -0400 |
parents | 94ab73e8a190 |
children |
line wrap: on
line source
#!/usr/bin/env python # Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. from commons.core.LoggerFactory import LoggerFactory from commons.core.sql.DbFactory import DbFactory from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory from commons.core.launcher.Launcher import Launcher from commons.core.launcher.LauncherUtils import LauncherUtils from commons.core.utils.FileUtils import FileUtils from commons.core.utils.RepetOptionParser import RepetOptionParser from commons.core.checker.ConfigChecker import ConfigRules, ConfigChecker from commons.core.coord.AlignUtils import AlignUtils import shutil import os LOG_DEPTH = "repet.tools" class LaunchMatcherInParallel(object): def __init__(self, align="", queryFileName="", subjectFileName="", evalue="1e-10", doJoin=False, keepConflict=False, prefix="", alignPattern = ".*\.align", \ config = "", groupId = "", maxFileSize = 1000000, mergeResults=True, workingDir="tmpMatcher", doClean = False, verbosity = 0): self._alignFileName = align self._queryFileName = queryFileName self.setSubjectFileName(subjectFileName) self.setOutPrefix(prefix) self._alignPattern = alignPattern self._doJoin = doJoin self._eValue = evalue self._keepConflict = keepConflict self._configFileName = config self.setGroupId(groupId) self._maxFileSize = maxFileSize self._mergeResults = mergeResults self._doClean = doClean self._workingDir = workingDir self._verbosity = verbosity self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity) self._jobSectionName = "jobs" def setAttributesFromCmdLine(self): description = "Launch Matcher in parallel." epilog = "\nExample 1: launch without verbosity and keep temporary files.\n" epilog += "\t$ python LaunchMatcherInParallel.py -a in.align -v 0" epilog += "\n\t" epilog += "\nExample 2: launch with verbosity to have errors (level 1) and basic information (level 2), and delete temporary files.\n" epilog += "\t$ python LaunchMatcherInParallel.py -a in.align -q query.fa -s subject.fa -o query -c -v 2" parser = RepetOptionParser(description = description, epilog = epilog) parser.add_option("-a", "--align", dest = "align", action = "store", type = "string", help = "input align file name [compulsory] [format: align]", default = "") parser.add_option("-q", "--query", dest = "query", action = "store", type = "string", help = "query fasta file name [optional] [format: fasta]", default = "") parser.add_option("-s", "--subject", dest = "subject", action = "store", type = "string", help = "subject fasta file name [optional] [format: fasta]", default = "") parser.add_option("-e", "--evalue", dest = "evalue", action = "store", type = "string", help = "E-value filter [default: 1e10]", default = "1e-10") parser.add_option("-j", "--join", dest = "doJoin", action = "store_true", help = "join matches [default: False]", default = False) parser.add_option("-k", "--keepConflict",dest = "keepConflict", action = "store_true", help = "keep conflicting subjects [default: False]", default = False) parser.add_option("-o", "--outPrefix", dest = "outPrefix", action = "store", type = "string", help = "output file prefix [default: align file name]", default = "") parser.add_option("-p", "--alignPattern",dest = "alignPattern", action = "store", type = "string", help = "align file pattern [default: .*\.align]", default = ".*\.align") parser.add_option("-n", "--maxFileSize",dest = "maxFileSize", action = "store", type = "int", help = "max file size (1 file for 1 job) [default: 100000]", default = 10000) parser.add_option("-m", "--notMergeResults",dest = "notMergeResults", action = "store_false", help = "don't merge results files [default: True]", default = True) parser.add_option("-w", "--workingDir",dest = "workingDir", action = "store", type = "string", help = "working directory [default: tmpMatcher]", default = "tmpMatcher") parser.add_option("-c", "--clean", dest = "doClean", action = "store_true", help = "clean temporary files [default: False]", default = False) parser.add_option("-v", "--verbosity", dest = "verbosity", action = "store", type = "int", help = "verbosity [default: 1]", default = 1) options = parser.parse_args()[0] self._setAttributesFromOptions(options) def _setAttributesFromOptions(self, options): self.setAlignFileName(options.align) self.setQueryFileName(options.query) self.setSubjectFileName(options.subject) self.setEvalue(options.evalue) self.setDoJoin(options.doJoin) self.setKeepConflicts(options.keepConflict) self.setOutPrefix(options.outPrefix) self.setAlignPattern(options.alignPattern) self.setMaxFileSize(options.maxFileSize) self.setMergeResults(options.notMergeResults) self.setWorkingDir(options.workingDir) self.setDoClean(options.doClean) self.setVerbosity(options.verbosity) def setAlignFileName(self, alignFileName): self._alignFileName = alignFileName def setQueryFileName(self, queryFileName): self._queryFileName = queryFileName def setSubjectFileName(self, subjectFileName): self._subjectFileName = subjectFileName def setEvalue(self, evalue): self._eValue = evalue def setDoJoin(self, doJoin): self._doJoin = doJoin def setKeepConflicts(self, keepConflict): self._keepConflict = keepConflict def setOutPrefix(self, outPrefix): if outPrefix == "": self._outPrefix = self._alignFileName else: self._outPrefix = outPrefix def setAlignPattern(self, alignPattern): self._alignPattern = alignPattern def setGroupId(self, groupId): if groupId == "": self._groupId = "Matcher_%s" % os.getpid() else: self._groupId = groupId def setMaxFileSize(self, maxFileSize): self._maxFileSize = maxFileSize def setMergeResults(self, mergeResults): self._mergeResults = mergeResults def setWorkingDir(self, workingDir): self._workingDir = workingDir def setDoClean(self, doClean): self._doClean = doClean def setVerbosity(self, verbosity): self._verbosity = verbosity def _checkOptions(self): if self._alignFileName == "": self._logAndRaise("ERROR: Missing input align file name") def _logAndRaise(self, errorMsg): self._log.error(errorMsg) raise Exception(errorMsg) def _checkConfig(self): iConfigRules = ConfigRules() iConfigRules.addRuleSection(section=self._jobSectionName, mandatory=True) iConfigRules.addRuleOption(section=self._jobSectionName, option ="resources", mandatory=True, type="string") iConfigRules.addRuleOption(section=self._jobSectionName, option ="tmpDir", mandatory=True, type="string") iConfigRules.addRuleOption(section=self._jobSectionName, option ="copy", mandatory=True, type="bool") iConfigRules.addRuleOption(section=self._jobSectionName, option ="clean", mandatory=True, type="bool") iConfigChecker = ConfigChecker(self._configFileName, iConfigRules) self._iConfig = iConfigChecker.getConfig() self._setAttributesFromConfig() def _setAttributesFromConfig(self): self._resources = self._iConfig.get(self._jobSectionName, "resources") self._tmpDir = self._iConfig.get(self._jobSectionName, "tmpDir") self._isCopyOnNode = self._iConfig.get(self._jobSectionName, "copy") self._doClean = self._iConfig.get(self._jobSectionName, "clean") if self._isCopyOnNode and not self._tmpDir: self._isCopyOnNode = False self._log.debug("The copy option is: %s." % self._isCopyOnNode) def _getLaunchMatcherCmd(self, iLauncher, file): lArgs = [] lArgs.append("-a %s" % file) if self._queryFileName: lArgs.append("-q %s" % self._queryFileName) if self._subjectFileName: lArgs.append("-s %s" % self._subjectFileName) lArgs.append("-e %s" % self._eValue) lArgs.append("-o %s" % file) if self._doJoin: lArgs.append("-j") if self._keepConflict: lArgs.append("-k") lArgs.append("-v %i" % (self._verbosity - 1)) return iLauncher.getSystemCommand("LaunchMatcher.py", lArgs) def _splitAlignFilePerSeq(self): lAlign = AlignUtils.getAlignListFromFile(self._alignFileName) lAlignList = AlignUtils.splitAlignListByQueryName(lAlign) inputFileNameWithoutExtension = os.path.splitext(os.path.basename(self._alignFileName))[0] AlignUtils.createAlignFiles(lAlignList, inputFileNameWithoutExtension, self._workingDir) def _writeTabHeader(self, outTabFileName): with open(outTabFileName, 'w') as f: f.write("query.name\tquery.start\tquery.end\tquery.length\tquery.length.%\tmatch.length.%\tsubject.name\tsubject.start\tsubject.end\tsubject.length\tsubject.length.%\tE.value\tScore\tIdentity\tpath\n") def run(self): LoggerFactory.setLevel(self._log, self._verbosity) self._checkConfig() self._checkOptions() self._log.info("START LaunchMatcherInParallel") self._log.debug("Align file name: %s" % self._alignFileName) self._log.debug("Query file name: %s" % self._queryFileName) self._log.debug("Subject file name: %s" % self._subjectFileName) if not os.path.exists(self._workingDir): os.makedirs(self._workingDir) else: self._doClean = False self._splitAlignFilePerSeq() os.chdir(self._workingDir) os.symlink("../%s" % self._queryFileName, self._queryFileName) if self._queryFileName != self._subjectFileName: os.symlink("../%s" % self._subjectFileName, self._subjectFileName) cDir = os.getcwd() if not self._tmpDir: self._tmpDir = cDir acronym = "Matcher" iDb = DbFactory.createInstance() jobdb = TableJobAdaptatorFactory.createInstance(iDb, "jobs") iLauncher = Launcher(jobdb, os.getcwd(), "", "", cDir, self._tmpDir, "jobs", self._resources, self._groupId, acronym, chooseTemplateWithCopy = self._isCopyOnNode) lCmdsTuples = [] lCmdSize = [] lCmdCopy = [] lFiles = FileUtils.getFileNamesList(".", self._alignPattern) lFileSizeTuples = [] for fileName in lFiles: fileSize = os.path.getsize(fileName) lFileSizeTuples.append((fileName, fileSize)) lFileSizeList = LauncherUtils.createHomogeneousSizeList(lFileSizeTuples, self._maxFileSize) for lFiles in lFileSizeList: lCmds = [] lCmdStart = [] lCmdFinish = [] if self._queryFileName: lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (cDir, self._queryFileName, self._queryFileName)) if self._subjectFileName and self._subjectFileName != self._queryFileName: lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (cDir, self._subjectFileName, self._subjectFileName)) for file in lFiles: lCmds.append(self._getLaunchMatcherCmd(iLauncher, file)) lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (cDir, file, file)) lCmdFinish.append("if os.path.exists(\"%s.match.path\"):" % file) lCmdFinish.append("\tshutil.move(\"%s.match.path\", \"%s/.\" )" % (file, cDir)) lCmdFinish.append("if os.path.exists(\"%s.match.tab\"):" % file) lCmdFinish.append("\tshutil.move(\"%s.match.tab\", \"%s/.\" )" % (file, cDir)) lCmdsTuples.append(iLauncher.prepareCommands_withoutIndentation(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy)) iLauncher.runLauncherForMultipleJobs("Matcher", lCmdsTuples, self._doClean, self._isCopyOnNode) if self._mergeResults: FileUtils.catFilesByPattern("*.match.path", "../%s.match.path" % self._outPrefix) if self._queryFileName or self._subjectFileName: outTabFileName = "../%s.match.tab" % self._outPrefix self._writeTabHeader(outTabFileName) FileUtils.catFilesByPattern("*.match.tab", outTabFileName, skipHeaders = True) os.chdir("..") if self._doClean and self._mergeResults: self._log.warning("Working directory will be cleaned") shutil.rmtree(self._workingDir) self._log.info("END LaunchMatchInParallel") if __name__ == "__main__": iLaunch = LaunchMatcherInParallel() iLaunch.setAttributesFromCmdLine() iLaunch.run()