comparison commons/tools/srptBlasterMatcher.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
comparison
equal deleted inserted replaced
17:b0e8584489e6 18:94ab73e8a190
1 #!/usr/bin/env python
2
3 """
4 This program takes a query directory as input,
5 then launches Blaster and/or Matcher on each file in it,
6 finally results are optionally gathered in a single file.
7 """
8
9 import os
10 import sys
11 import getopt
12 import logging
13 import glob
14 import ConfigParser
15
16 from pyRepet.launcher.programLauncher import programLauncher
17 from pyRepet.launcher import Launcher
18 from pyRepet.sql.RepetJobMySQL import RepetJob
19 from commons.core.coord.Align import Align
20
21
22 def help():
23 print
24 print "usage: %s [ options ]" % ( sys.argv[0].split("/")[-1] )
25 print "options:"
26 print " -h: this help"
27 print " -g: name of the group identifier (same for all the jobs)"
28 print " -q: name of the query directory"
29 print " -S: suffix in the query directory (default='*.fa' for Blaster, '*.align' for Matcher)"
30 print " -s: absolute path to the subject databank"
31 print " -Q: resources needed on the cluster)"
32 print " -d: absolute path to the temporary directory"
33 print " -m: mix of Blaster and/or Matcher"
34 print " 1: launch Blaster only"
35 print " 2: launch Matcher only (on '*.align' query files)"
36 print " 3: launch Blaster+Matcher in the same job"
37 print " -B: parameters for Blaster (e.g. \"-a -n tblastx\")"
38 print " -M: parameters for Matcher (e.g. \"-j\")"
39 print " -Z: collect all the results into a single file"
40 print " align (after Blaster)"
41 print " path/tab (after Matcher)"
42 print " -C: configuration file from TEdenovo or TEannot pipeline"
43 print " -t: name of the table recording the jobs (default=jobs)"
44 print " -p: absolute path to project directory (if jobs management via files)"
45 print " -c: clean (remove job launch files and job stdout)"
46 print " -v: verbose (default=0/1/2)"
47 print
48
49
50 def filterRedundantMatches( inFile, outFile ):
51 """
52 When a pairwise alignment is launched ~ all-by-all (ie one batch against all chunks),
53 one filters the redundant matches. For instance we keep 'chunk3-1-100-chunk7-11-110-...'
54 and we discards 'chunk7-11-110-chunk3-1-100-...'.
55 Also we keep 'chunk5-1-100-chunk5-11-110-...' and we discards
56 'chunk5-11-110-chunk5-1-100-...'.
57 For this of course the results need to be sorted by query, on plus strand,
58 and in ascending coordinates (always the case with Blaster).
59 """
60 inFileHandler = open( inFile, "r" )
61 outFileHandler = open( outFile, "w" )
62 iAlign = Align()
63 countMatches = 0
64 tick = 100000
65 while True:
66 line = inFileHandler.readline()
67 if line == "":
68 break
69 countMatches += 1
70 iAlign.setFromString( line )
71 if "chunk" not in iAlign.range_query.seqname \
72 or "chunk" not in iAlign.range_subject.seqname:
73 print "ERROR: 'chunk' not in seqname"
74 sys.exit(1)
75 if int(iAlign.range_query.seqname.split("chunk")[1]) < int(iAlign.range_subject.seqname.split("chunk")[1]):
76 iAlign.write( outFileHandler )
77 elif int(iAlign.range_query.seqname.split("chunk")[1]) == int(iAlign.range_subject.seqname.split("chunk")[1]):
78 if iAlign.range_query.getMin() < iAlign.range_subject.getMin():
79 iAlign.write( outFileHandler )
80 if countMatches % tick == 0: # need to free buffer frequently as file can be big
81 outFileHandler.flush()
82 os.fsync( outFileHandler.fileno() )
83 inFileHandler.close()
84 outFileHandler.close()
85
86
87 def runCollect( groupid, collect, allByAll ):
88 """
89 Gather the results of each job in a single job and adapt path ID if necessary.
90 """
91 if verbose > 0:
92 print "concatenate the results of each job"; sys.stdout.flush()
93
94 # retrieve the list of the files
95 lFiles = glob.glob( "*.%s" % ( collect ) )
96 lFiles.sort()
97
98 # concatenate all the individual files
99 if os.path.exists( "%s.%s_tmp" % ( groupid, collect ) ):
100 os.remove( "%s.%s_tmp" % ( groupid, collect ) )
101 for resFile in lFiles:
102 prg = "cat"
103 cmd = prg
104 cmd += " %s >> %s.%s_tmp" % ( resFile, groupid, collect )
105 pL.launch( prg, cmd )
106 if clean == True:
107 prg = "rm"
108 cmd = prg
109 cmd += " -f %s" % ( resFile )
110 pL.launch( prg, cmd )
111
112 if os.path.exists( "%s.%s" % ( groupid, collect ) ):
113 os.remove( "%s.%s" % ( groupid, collect ) )
114
115 if collect == "align":
116 if not allByAll:
117 os.system( "mv %s.%s_tmp %s.%s" % ( groupid, collect, groupid, collect ) )
118 else:
119 filterRedundantMatches( "%s.%s_tmp" % ( groupid, collect ),
120 "%s.%s" % ( groupid, collect ) )
121
122 # adapt the path IDs if necessary
123 elif collect == "path":
124 prg = os.environ["REPET_PATH"] + "/bin/"
125 if os.path.exists( prg + "pathnum2id" ):
126 prg += "pathnum2id"
127 cmd = prg
128 cmd += " -i %s.path_tmp" % ( groupid )
129 cmd += " -o %s.path" % ( groupid )
130 cmd += " -v %i" % ( verbose - 1 )
131 pL.launch( prg, cmd )
132 else:
133 prg += "pathnum2id.py"
134 cmd = prg
135 cmd += " -i %s.path_tmp" % ( groupid )
136 cmd += " -o %s.path" % ( groupid )
137 pL.launch( prg, cmd )
138
139 elif collect == "tab":
140 prg = os.environ["REPET_PATH"] + "/bin/"
141 if os.path.exists( prg + "tabnum2id" ):
142 prg += "tabnum2id"
143 cmd = prg
144 cmd += " -i %s.tab_tmp" % ( groupid )
145 cmd += " -o %s.tab" % ( groupid )
146 cmd += " -v %i" % ( verbose - 1 )
147 pL.launch( prg, cmd )
148 else:
149 prg += "tabnum2id.py"
150 cmd = prg
151 cmd += " -i %s.tab_tmp" % ( groupid )
152 cmd += " -o %s.tab" % ( groupid )
153 pL.launch( prg, cmd )
154
155 if clean == True:
156 os.remove( "%s.%s_tmp" % ( groupid, collect ) )
157
158
159 def main():
160 """
161 This program takes a query directory as input,
162 then launches Blaster and/or Matcher on each file in it,
163 finally results are optionally gathered in a single file.
164 """
165
166 groupid = ""
167 queryDir = ""
168 patternSuffix = ""
169 subjectBank = ""
170 queue = ""
171 tmpDir = ""
172 mix = ""
173 paramBlaster = ""
174 paramMatcher = ""
175 collect = ""
176 configFileName = ""
177 jobTable = "jobs"
178 projectDir = ""
179 global clean
180 clean = False
181 global verbose
182 verbose = 0
183 allByAll = False
184
185 try:
186 opts, args = getopt.getopt(sys.argv[1:],"hg:q:S:s:Q:d:m:B:M:Z:C:t:p:cv:")
187 except getopt.GetoptError, err:
188 print str(err)
189 help()
190 sys.exit(1)
191 for o,a in opts:
192 if o == "-h":
193 help()
194 sys.exit(0)
195 elif o == "-g":
196 groupid = a
197 elif o == "-q":
198 queryDir = a
199 if queryDir[-1] == "/":
200 queryDir = queryDir[:-1]
201 elif o == "-S":
202 patternSuffix = a
203 elif o == "-s":
204 subjectBank = a
205 elif o == "-Q":
206 queue = a
207 elif o == "-d":
208 tmpDir = a
209 if tmpDir[-1] == "/":
210 tmpDir = tmpDir[:-1]
211 elif o == "-m":
212 mix = a
213 elif o == "-B":
214 paramBlaster = a
215 elif o == "-M":
216 paramMatcher = a
217 elif o == "-C":
218 configFileName = a
219 elif o == "-Z":
220 collect = a
221 elif o == "-t":
222 jobTable = a
223 elif o == "-p":
224 projectDir = a
225 elif o == "-c":
226 clean = True
227 elif o == "-v":
228 verbose = int(a)
229
230 if groupid == "":
231 print "ERROR: missing group identifier (-g)"
232 help()
233 sys.exit(1)
234
235 if queryDir == "":
236 print "ERROR: missing query directory (-q)"
237 help()
238 sys.exit(1)
239
240 if mix in [ "1", "3" ] and subjectBank == "":
241 print "ERROR: missing subject bank for Blaster"
242 sys.exit(1)
243
244 if os.environ["REPET_JOBS"] == "files" and projectDir == "":
245 print "ERROR: missing compulsory options for jobs management via files"
246 help()
247 sys.exit(1)
248
249 if not os.path.exists( queryDir ):
250 print "ERROR: can't find query directory '%s'" % ( queryDir )
251 sys.exit(1)
252
253 if verbose > 0:
254 print "START %s" % ( sys.argv[0].split("/")[-1] )
255 sys.stdout.flush()
256
257
258 logFileName = "%s_pid%s.log" % ( groupid, os.getpid() )
259 handler = logging.FileHandler( logFileName )
260 formatter = logging.Formatter( "%(asctime)s %(levelname)s: %(message)s" )
261 handler.setFormatter( formatter )
262 logging.getLogger('').addHandler( handler )
263 logging.getLogger('').setLevel( logging.DEBUG )
264 logging.info( "started" )
265
266
267 if configFileName != "":
268 if not os.path.exists( configFileName ):
269 print "ERROR: configuration file '%s' doesn't exist" % ( configFileName )
270 sys.exit(1)
271 config = ConfigParser.ConfigParser()
272 config.readfp( open(configFileName) )
273 host = config.get("repet_env","repet_host")
274 user = config.get("repet_env","repet_user")
275 passwd = config.get("repet_env","repet_pw")
276 dbname = config.get("repet_env","repet_db")
277 else:
278 host = os.environ["REPET_HOST"]
279 user = os.environ["REPET_USER"]
280 passwd = os.environ["REPET_PW"]
281 dbname = os.environ["REPET_DB"]
282
283 if os.environ["REPET_JOBS"] == "files":
284 jobdb = RepetJob( dbname = projectDir + "/" + os.environ["REPET_DB"] )
285 elif os.environ["REPET_JOBS"] == "MySQL":
286 jobdb = RepetJob( user, host, passwd, dbname )
287 else:
288 print "ERROR: REPET_JOBS is '%s'" % ( os.environ["REPET_JOBS"] )
289 sys.exit(1)
290
291
292 currentDir = os.getcwd()
293 if tmpDir == "":
294 tmpDir = currentDir
295
296 global pL
297 pL = programLauncher()
298
299 if "-a" in paramBlaster:
300 allByAll = True
301
302
303 # if Blaster will be launched, prepare the subject data if necessary
304 if mix != "2" and not os.path.exists( "%s_cut" % ( subjectBank ) ):
305 if verbose > 0:
306 print "prepare subject bank '%s'..." % ( subjectBank.split("/")[-1] )
307 sys.stdout.flush()
308 prg = "blaster"
309 cmd = prg
310 cmd += " -q %s" % ( subjectBank )
311 cmd += " %s" % ( paramBlaster )
312 cmd += " -P"
313 pL.launch( prg, cmd )
314
315
316 # launch Blaster only (on '*.fa' in queryDir)
317 if mix == "1":
318 launcher = Launcher.BlasterLauncher( jobdb, queryDir, subjectBank, paramBlaster, currentDir, tmpDir, jobTable, queue, groupid, "Blaster" )
319 if patternSuffix == "":
320 patternSuffix = "*.fa"
321 launcher.run( patternSuffix, verbose )
322 if clean == True:
323 launcher.clean()
324
325 # launch Matcher only (on '*.align' in queryDir; don't use '-q' or '-s', only '-m')
326 elif mix == "2":
327 launcher = Launcher.MatcherLauncher( jobdb, queryDir, subjectBank, paramMatcher, currentDir, tmpDir, jobTable, queue, groupid, "Matcher" )
328 if patternSuffix == "":
329 patternSuffix = "*.align"
330 launcher.run( patternSuffix, verbose )
331 if clean == True:
332 launcher.clean()
333
334 # launch Blaster+Matcher (on '*.fa' in queryDir)
335 elif mix == "3":
336 launcher = Launcher.Launcher( jobdb, queryDir, subjectBank, paramBlaster, currentDir, tmpDir, jobTable, queue, groupid, "BlasterMatcher" )
337 launcher.beginRun()
338 if patternSuffix == "":
339 patternSuffix = "*.fa"
340 lFaFiles = glob.glob( queryDir + "/" + patternSuffix )
341 if len(lFaFiles) == 0:
342 print "ERROR: query directory '%s' is empty of suffix '%s'" % ( queryDir, patternSuffix )
343 sys.exit(1)
344 count = 0
345 for inFaName in lFaFiles:
346 count += 1
347 launcher.acronyme = "BlasterMatcher_%i" % ( count )
348 launcher.job.jobname = launcher.acronyme
349 prefix = os.path.basename( inFaName )
350 cmd_start = ""
351 cmd_start += "if not os.path.exists( \"" + prefix + "\" ):\n"
352 cmd_start += "\tos.system( \"cp " + inFaName + " .\" )\n"
353 launchB = ""
354 launchB += "blaster"
355 launchB += " -q %s" % ( prefix )
356 launchB += " -s %s" % ( subjectBank )
357 launchB += " -B %s" % ( prefix )
358 if paramBlaster != "":
359 launchB += " %s" % ( paramBlaster )
360 cleanB = ""
361 cleanB += "if not os.path.exists( \"%s/%s.param\" ):\n" % ( currentDir, prefix )
362 cleanB += "\tos.system( \"mv %s.param %s\" )\n" % ( prefix, currentDir )
363 cleanB += "if os.path.exists( \"" + prefix + ".Nstretch.map\" ):\n"
364 cleanB += "\tos.remove( \"" + prefix + ".Nstretch.map\" )\n"
365 cleanB += "if os.path.exists( \"" + prefix + "_cut\" ):\n"
366 cleanB += "\tos.system( \"rm -f " + prefix + "_cut*\" )\n"
367 cleanB += "if os.path.exists( \"" + prefix + ".raw\" ):\n"
368 cleanB += "\tos.remove( \"" + prefix + ".raw\" )\n"
369 cleanB += "if os.path.exists( \"" + prefix + ".seq_treated\" ):\n"
370 cleanB += "\tos.remove( \"" + prefix + ".seq_treated\" )\n"
371 launchM = ""
372 launchM += os.environ["REPET_PATH"] + "/bin/matcher"
373 launchM += " -m %s.align" % ( prefix )
374 launchM += " -q %s" % ( prefix )
375 launchM += " -s %s" % ( subjectBank )
376 if paramMatcher != "":
377 launchM += " %s" % ( paramMatcher )
378 cleanM = ""
379 s = ""
380 if "-a" in paramMatcher:
381 s = "match"
382 else:
383 s = "clean_match"
384 if collect == "path":
385 cleanM += "if not os.path.exists( \"%s/%s.align.%s.path\" ):\n" % ( currentDir, prefix, s )
386 cleanM += "\tos.system( \"mv %s.align.%s.path %s\" )\n" % ( prefix, s, currentDir )
387 cleanM += "if os.path.exists( \"" + prefix + ".align."+s+".tab\" ):\n"
388 cleanM += "\tos.remove( \"" + prefix + ".align."+s+".tab\" )\n"
389 elif collect == "tab":
390 cleanM += "if not os.path.exists( \"%s/%s.align.%s.tab\" ):\n" % ( currentDir, prefix, s )
391 cleanM += "\tos.system( \"mv %s.align.%s.tab %s\" )\n" % ( prefix, s, currentDir )
392 cleanM += "if os.path.exists( \"" + prefix + ".align."+s+".path\" ):\n"
393 cleanM += "\tos.remove( \"" + prefix + ".align."+s+".path\" )\n"
394 cleanM += "if not os.path.exists( \"%s/%s.align.%s.param\" ):\n" % ( currentDir, prefix, s )
395 cleanM += "\tos.system( \"mv %s.align.%s.param %s\" )\n" % ( prefix, s, currentDir )
396 if tmpDir != currentDir:
397 cleanM += "if os.path.exists( \"%s\" ):\n" % ( prefix )
398 cleanM += "\tos.remove( \"%s\" )\n" % ( prefix )
399 if clean == True:
400 cleanM += "if os.path.exists( \"" + prefix + ".align\" ):\n"
401 cleanM += "\tos.remove( \"" + prefix + ".align\" )\n"
402 else:
403 cleanM += "if not os.path.exists( \"%s/%s.align\" ):\n" % ( currentDir, prefix )
404 cleanM += "\tos.system( \"mv %s.align %s\" )\n" % ( prefix, currentDir )
405 cleanM += "if os.path.exists( \"" + prefix + ".align."+s+".fa\" ):\n"
406 cleanM += "\tos.remove( \"" + prefix + ".align."+s+".fa\" )\n"
407 cleanM += "if os.path.exists( \"" + prefix + ".align."+s+".map\" ):\n"
408 cleanM += "\tos.remove( \"" + prefix + ".align."+s+".map\" )\n"
409 cmd_start += "print \"" + launchB + "\"; sys.stdout.flush()\n"
410 cmd_start += "log = os.system( \"" + launchB + "\" )\n"
411 cmd_start += "if log != 0:\n"
412 cmd_start += launcher.cmd_test( launcher.job, "error", loop=1 )
413 cmd_start += "\tsys.exit(1)\n"
414 cmd_start += cleanB
415 cmd_start += "print \"" + launchM + "\"; sys.stdout.flush()\n"
416 cmd_start += "log = os.system( \"" + launchM + "\" )\n"
417 cmd_start += cleanM
418 launcher.runSingleJob( cmd_start )
419 launcher.acronyme = "BlasterMatcher"
420 launcher._nbJobs = count
421 launcher.endRun()
422 if clean == True:
423 launcher.clean( "BlasterMatcher_*" )
424
425 else:
426 print "ERROR: option '-m %s' not recognized" % ( mix )
427 sys.exit(1)
428
429
430 if collect != "":
431 if collect in [ "align", "path", "tab" ]:
432 runCollect( groupid, collect, allByAll )
433 else:
434 print "ERROR: collect '%s' not implemented" % ( collect )
435 sys.exit(1)
436
437
438 logging.info( "finished" )
439
440 if verbose > 0:
441 print "END %s" % ( sys.argv[0].split("/")[-1] )
442 sys.stdout.flush()
443
444 return 0
445
446
447 if __name__ == "__main__":
448 main()