Mercurial > repos > yufei-luo > s_mart
diff commons/tools/srptBlasterMatcher.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/tools/srptBlasterMatcher.py Mon Apr 29 03:20:15 2013 -0400 @@ -0,0 +1,448 @@ +#!/usr/bin/env python + +""" +This program takes a query directory as input, +then launches Blaster and/or Matcher on each file in it, +finally results are optionally gathered in a single file. +""" + +import os +import sys +import getopt +import logging +import glob +import ConfigParser + +from pyRepet.launcher.programLauncher import programLauncher +from pyRepet.launcher import Launcher +from pyRepet.sql.RepetJobMySQL import RepetJob +from commons.core.coord.Align import Align + + +def help(): + print + print "usage: %s [ options ]" % ( sys.argv[0].split("/")[-1] ) + print "options:" + print " -h: this help" + print " -g: name of the group identifier (same for all the jobs)" + print " -q: name of the query directory" + print " -S: suffix in the query directory (default='*.fa' for Blaster, '*.align' for Matcher)" + print " -s: absolute path to the subject databank" + print " -Q: resources needed on the cluster)" + print " -d: absolute path to the temporary directory" + print " -m: mix of Blaster and/or Matcher" + print " 1: launch Blaster only" + print " 2: launch Matcher only (on '*.align' query files)" + print " 3: launch Blaster+Matcher in the same job" + print " -B: parameters for Blaster (e.g. \"-a -n tblastx\")" + print " -M: parameters for Matcher (e.g. \"-j\")" + print " -Z: collect all the results into a single file" + print " align (after Blaster)" + print " path/tab (after Matcher)" + print " -C: configuration file from TEdenovo or TEannot pipeline" + print " -t: name of the table recording the jobs (default=jobs)" + print " -p: absolute path to project directory (if jobs management via files)" + print " -c: clean (remove job launch files and job stdout)" + print " -v: verbose (default=0/1/2)" + print + + +def filterRedundantMatches( inFile, outFile ): + """ + When a pairwise alignment is launched ~ all-by-all (ie one batch against all chunks), + one filters the redundant matches. For instance we keep 'chunk3-1-100-chunk7-11-110-...' + and we discards 'chunk7-11-110-chunk3-1-100-...'. + Also we keep 'chunk5-1-100-chunk5-11-110-...' and we discards + 'chunk5-11-110-chunk5-1-100-...'. + For this of course the results need to be sorted by query, on plus strand, + and in ascending coordinates (always the case with Blaster). + """ + inFileHandler = open( inFile, "r" ) + outFileHandler = open( outFile, "w" ) + iAlign = Align() + countMatches = 0 + tick = 100000 + while True: + line = inFileHandler.readline() + if line == "": + break + countMatches += 1 + iAlign.setFromString( line ) + if "chunk" not in iAlign.range_query.seqname \ + or "chunk" not in iAlign.range_subject.seqname: + print "ERROR: 'chunk' not in seqname" + sys.exit(1) + if int(iAlign.range_query.seqname.split("chunk")[1]) < int(iAlign.range_subject.seqname.split("chunk")[1]): + iAlign.write( outFileHandler ) + elif int(iAlign.range_query.seqname.split("chunk")[1]) == int(iAlign.range_subject.seqname.split("chunk")[1]): + if iAlign.range_query.getMin() < iAlign.range_subject.getMin(): + iAlign.write( outFileHandler ) + if countMatches % tick == 0: # need to free buffer frequently as file can be big + outFileHandler.flush() + os.fsync( outFileHandler.fileno() ) + inFileHandler.close() + outFileHandler.close() + + +def runCollect( groupid, collect, allByAll ): + """ + Gather the results of each job in a single job and adapt path ID if necessary. + """ + if verbose > 0: + print "concatenate the results of each job"; sys.stdout.flush() + + # retrieve the list of the files + lFiles = glob.glob( "*.%s" % ( collect ) ) + lFiles.sort() + + # concatenate all the individual files + if os.path.exists( "%s.%s_tmp" % ( groupid, collect ) ): + os.remove( "%s.%s_tmp" % ( groupid, collect ) ) + for resFile in lFiles: + prg = "cat" + cmd = prg + cmd += " %s >> %s.%s_tmp" % ( resFile, groupid, collect ) + pL.launch( prg, cmd ) + if clean == True: + prg = "rm" + cmd = prg + cmd += " -f %s" % ( resFile ) + pL.launch( prg, cmd ) + + if os.path.exists( "%s.%s" % ( groupid, collect ) ): + os.remove( "%s.%s" % ( groupid, collect ) ) + + if collect == "align": + if not allByAll: + os.system( "mv %s.%s_tmp %s.%s" % ( groupid, collect, groupid, collect ) ) + else: + filterRedundantMatches( "%s.%s_tmp" % ( groupid, collect ), + "%s.%s" % ( groupid, collect ) ) + + # adapt the path IDs if necessary + elif collect == "path": + prg = os.environ["REPET_PATH"] + "/bin/" + if os.path.exists( prg + "pathnum2id" ): + prg += "pathnum2id" + cmd = prg + cmd += " -i %s.path_tmp" % ( groupid ) + cmd += " -o %s.path" % ( groupid ) + cmd += " -v %i" % ( verbose - 1 ) + pL.launch( prg, cmd ) + else: + prg += "pathnum2id.py" + cmd = prg + cmd += " -i %s.path_tmp" % ( groupid ) + cmd += " -o %s.path" % ( groupid ) + pL.launch( prg, cmd ) + + elif collect == "tab": + prg = os.environ["REPET_PATH"] + "/bin/" + if os.path.exists( prg + "tabnum2id" ): + prg += "tabnum2id" + cmd = prg + cmd += " -i %s.tab_tmp" % ( groupid ) + cmd += " -o %s.tab" % ( groupid ) + cmd += " -v %i" % ( verbose - 1 ) + pL.launch( prg, cmd ) + else: + prg += "tabnum2id.py" + cmd = prg + cmd += " -i %s.tab_tmp" % ( groupid ) + cmd += " -o %s.tab" % ( groupid ) + pL.launch( prg, cmd ) + + if clean == True: + os.remove( "%s.%s_tmp" % ( groupid, collect ) ) + + +def main(): + """ + This program takes a query directory as input, + then launches Blaster and/or Matcher on each file in it, + finally results are optionally gathered in a single file. + """ + + groupid = "" + queryDir = "" + patternSuffix = "" + subjectBank = "" + queue = "" + tmpDir = "" + mix = "" + paramBlaster = "" + paramMatcher = "" + collect = "" + configFileName = "" + jobTable = "jobs" + projectDir = "" + global clean + clean = False + global verbose + verbose = 0 + allByAll = False + + try: + opts, args = getopt.getopt(sys.argv[1:],"hg:q:S:s:Q:d:m:B:M:Z:C:t:p:cv:") + except getopt.GetoptError, err: + print str(err) + help() + sys.exit(1) + for o,a in opts: + if o == "-h": + help() + sys.exit(0) + elif o == "-g": + groupid = a + elif o == "-q": + queryDir = a + if queryDir[-1] == "/": + queryDir = queryDir[:-1] + elif o == "-S": + patternSuffix = a + elif o == "-s": + subjectBank = a + elif o == "-Q": + queue = a + elif o == "-d": + tmpDir = a + if tmpDir[-1] == "/": + tmpDir = tmpDir[:-1] + elif o == "-m": + mix = a + elif o == "-B": + paramBlaster = a + elif o == "-M": + paramMatcher = a + elif o == "-C": + configFileName = a + elif o == "-Z": + collect = a + elif o == "-t": + jobTable = a + elif o == "-p": + projectDir = a + elif o == "-c": + clean = True + elif o == "-v": + verbose = int(a) + + if groupid == "": + print "ERROR: missing group identifier (-g)" + help() + sys.exit(1) + + if queryDir == "": + print "ERROR: missing query directory (-q)" + help() + sys.exit(1) + + if mix in [ "1", "3" ] and subjectBank == "": + print "ERROR: missing subject bank for Blaster" + sys.exit(1) + + if os.environ["REPET_JOBS"] == "files" and projectDir == "": + print "ERROR: missing compulsory options for jobs management via files" + help() + sys.exit(1) + + if not os.path.exists( queryDir ): + print "ERROR: can't find query directory '%s'" % ( queryDir ) + sys.exit(1) + + if verbose > 0: + print "START %s" % ( sys.argv[0].split("/")[-1] ) + sys.stdout.flush() + + + logFileName = "%s_pid%s.log" % ( groupid, os.getpid() ) + handler = logging.FileHandler( logFileName ) + formatter = logging.Formatter( "%(asctime)s %(levelname)s: %(message)s" ) + handler.setFormatter( formatter ) + logging.getLogger('').addHandler( handler ) + logging.getLogger('').setLevel( logging.DEBUG ) + logging.info( "started" ) + + + if configFileName != "": + if not os.path.exists( configFileName ): + print "ERROR: configuration file '%s' doesn't exist" % ( configFileName ) + sys.exit(1) + config = ConfigParser.ConfigParser() + config.readfp( open(configFileName) ) + host = config.get("repet_env","repet_host") + user = config.get("repet_env","repet_user") + passwd = config.get("repet_env","repet_pw") + dbname = config.get("repet_env","repet_db") + else: + host = os.environ["REPET_HOST"] + user = os.environ["REPET_USER"] + passwd = os.environ["REPET_PW"] + dbname = os.environ["REPET_DB"] + + if os.environ["REPET_JOBS"] == "files": + jobdb = RepetJob( dbname = projectDir + "/" + os.environ["REPET_DB"] ) + elif os.environ["REPET_JOBS"] == "MySQL": + jobdb = RepetJob( user, host, passwd, dbname ) + else: + print "ERROR: REPET_JOBS is '%s'" % ( os.environ["REPET_JOBS"] ) + sys.exit(1) + + + currentDir = os.getcwd() + if tmpDir == "": + tmpDir = currentDir + + global pL + pL = programLauncher() + + if "-a" in paramBlaster: + allByAll = True + + + # if Blaster will be launched, prepare the subject data if necessary + if mix != "2" and not os.path.exists( "%s_cut" % ( subjectBank ) ): + if verbose > 0: + print "prepare subject bank '%s'..." % ( subjectBank.split("/")[-1] ) + sys.stdout.flush() + prg = "blaster" + cmd = prg + cmd += " -q %s" % ( subjectBank ) + cmd += " %s" % ( paramBlaster ) + cmd += " -P" + pL.launch( prg, cmd ) + + + # launch Blaster only (on '*.fa' in queryDir) + if mix == "1": + launcher = Launcher.BlasterLauncher( jobdb, queryDir, subjectBank, paramBlaster, currentDir, tmpDir, jobTable, queue, groupid, "Blaster" ) + if patternSuffix == "": + patternSuffix = "*.fa" + launcher.run( patternSuffix, verbose ) + if clean == True: + launcher.clean() + + # launch Matcher only (on '*.align' in queryDir; don't use '-q' or '-s', only '-m') + elif mix == "2": + launcher = Launcher.MatcherLauncher( jobdb, queryDir, subjectBank, paramMatcher, currentDir, tmpDir, jobTable, queue, groupid, "Matcher" ) + if patternSuffix == "": + patternSuffix = "*.align" + launcher.run( patternSuffix, verbose ) + if clean == True: + launcher.clean() + + # launch Blaster+Matcher (on '*.fa' in queryDir) + elif mix == "3": + launcher = Launcher.Launcher( jobdb, queryDir, subjectBank, paramBlaster, currentDir, tmpDir, jobTable, queue, groupid, "BlasterMatcher" ) + launcher.beginRun() + if patternSuffix == "": + patternSuffix = "*.fa" + lFaFiles = glob.glob( queryDir + "/" + patternSuffix ) + if len(lFaFiles) == 0: + print "ERROR: query directory '%s' is empty of suffix '%s'" % ( queryDir, patternSuffix ) + sys.exit(1) + count = 0 + for inFaName in lFaFiles: + count += 1 + launcher.acronyme = "BlasterMatcher_%i" % ( count ) + launcher.job.jobname = launcher.acronyme + prefix = os.path.basename( inFaName ) + cmd_start = "" + cmd_start += "if not os.path.exists( \"" + prefix + "\" ):\n" + cmd_start += "\tos.system( \"cp " + inFaName + " .\" )\n" + launchB = "" + launchB += "blaster" + launchB += " -q %s" % ( prefix ) + launchB += " -s %s" % ( subjectBank ) + launchB += " -B %s" % ( prefix ) + if paramBlaster != "": + launchB += " %s" % ( paramBlaster ) + cleanB = "" + cleanB += "if not os.path.exists( \"%s/%s.param\" ):\n" % ( currentDir, prefix ) + cleanB += "\tos.system( \"mv %s.param %s\" )\n" % ( prefix, currentDir ) + cleanB += "if os.path.exists( \"" + prefix + ".Nstretch.map\" ):\n" + cleanB += "\tos.remove( \"" + prefix + ".Nstretch.map\" )\n" + cleanB += "if os.path.exists( \"" + prefix + "_cut\" ):\n" + cleanB += "\tos.system( \"rm -f " + prefix + "_cut*\" )\n" + cleanB += "if os.path.exists( \"" + prefix + ".raw\" ):\n" + cleanB += "\tos.remove( \"" + prefix + ".raw\" )\n" + cleanB += "if os.path.exists( \"" + prefix + ".seq_treated\" ):\n" + cleanB += "\tos.remove( \"" + prefix + ".seq_treated\" )\n" + launchM = "" + launchM += os.environ["REPET_PATH"] + "/bin/matcher" + launchM += " -m %s.align" % ( prefix ) + launchM += " -q %s" % ( prefix ) + launchM += " -s %s" % ( subjectBank ) + if paramMatcher != "": + launchM += " %s" % ( paramMatcher ) + cleanM = "" + s = "" + if "-a" in paramMatcher: + s = "match" + else: + s = "clean_match" + if collect == "path": + cleanM += "if not os.path.exists( \"%s/%s.align.%s.path\" ):\n" % ( currentDir, prefix, s ) + cleanM += "\tos.system( \"mv %s.align.%s.path %s\" )\n" % ( prefix, s, currentDir ) + cleanM += "if os.path.exists( \"" + prefix + ".align."+s+".tab\" ):\n" + cleanM += "\tos.remove( \"" + prefix + ".align."+s+".tab\" )\n" + elif collect == "tab": + cleanM += "if not os.path.exists( \"%s/%s.align.%s.tab\" ):\n" % ( currentDir, prefix, s ) + cleanM += "\tos.system( \"mv %s.align.%s.tab %s\" )\n" % ( prefix, s, currentDir ) + cleanM += "if os.path.exists( \"" + prefix + ".align."+s+".path\" ):\n" + cleanM += "\tos.remove( \"" + prefix + ".align."+s+".path\" )\n" + cleanM += "if not os.path.exists( \"%s/%s.align.%s.param\" ):\n" % ( currentDir, prefix, s ) + cleanM += "\tos.system( \"mv %s.align.%s.param %s\" )\n" % ( prefix, s, currentDir ) + if tmpDir != currentDir: + cleanM += "if os.path.exists( \"%s\" ):\n" % ( prefix ) + cleanM += "\tos.remove( \"%s\" )\n" % ( prefix ) + if clean == True: + cleanM += "if os.path.exists( \"" + prefix + ".align\" ):\n" + cleanM += "\tos.remove( \"" + prefix + ".align\" )\n" + else: + cleanM += "if not os.path.exists( \"%s/%s.align\" ):\n" % ( currentDir, prefix ) + cleanM += "\tos.system( \"mv %s.align %s\" )\n" % ( prefix, currentDir ) + cleanM += "if os.path.exists( \"" + prefix + ".align."+s+".fa\" ):\n" + cleanM += "\tos.remove( \"" + prefix + ".align."+s+".fa\" )\n" + cleanM += "if os.path.exists( \"" + prefix + ".align."+s+".map\" ):\n" + cleanM += "\tos.remove( \"" + prefix + ".align."+s+".map\" )\n" + cmd_start += "print \"" + launchB + "\"; sys.stdout.flush()\n" + cmd_start += "log = os.system( \"" + launchB + "\" )\n" + cmd_start += "if log != 0:\n" + cmd_start += launcher.cmd_test( launcher.job, "error", loop=1 ) + cmd_start += "\tsys.exit(1)\n" + cmd_start += cleanB + cmd_start += "print \"" + launchM + "\"; sys.stdout.flush()\n" + cmd_start += "log = os.system( \"" + launchM + "\" )\n" + cmd_start += cleanM + launcher.runSingleJob( cmd_start ) + launcher.acronyme = "BlasterMatcher" + launcher._nbJobs = count + launcher.endRun() + if clean == True: + launcher.clean( "BlasterMatcher_*" ) + + else: + print "ERROR: option '-m %s' not recognized" % ( mix ) + sys.exit(1) + + + if collect != "": + if collect in [ "align", "path", "tab" ]: + runCollect( groupid, collect, allByAll ) + else: + print "ERROR: collect '%s' not implemented" % ( collect ) + sys.exit(1) + + + logging.info( "finished" ) + + if verbose > 0: + print "END %s" % ( sys.argv[0].split("/")[-1] ) + sys.stdout.flush() + + return 0 + + +if __name__ == "__main__": + main()