0
|
1 #!/usr/bin/env python
|
|
2
|
|
3 '''
|
|
4 trinity_runner.py
|
|
5 This program is used as a wrapper for Trinity to allow an automatic rerun of failed jobs. It takes arguments for a typical Trinity run:
|
|
6 ~ Required args ~
|
|
7 Input files - single or paired (left and right)
|
|
8 File type (fasta, fastq)
|
|
9 Max memory - this I need to derive somehow from the dynamic runner using Galaxy slots
|
|
10
|
|
11 ~ Optional args ~
|
|
12 Output directory - this allows users to run the same job over in case it walltime'd out or failed for recoverable reasons.
|
|
13
|
|
14 --
|
|
15 Created Tuesday, 7 March 2017.
|
|
16 Carrie Ganote
|
|
17
|
|
18 Licensed to Indiana University under Creative Commons 3.0
|
|
19 '''
|
|
20 import subprocess32
|
|
21 import argparse
|
|
22 import logging as log
|
|
23 import sys
|
|
24 import os
|
|
25 import errno
|
|
26 from datetime import datetime
|
|
27
|
|
28 TRINITY_OUT_DIR = "trinity_out_dir"
|
|
29
|
|
30 def main(*args):
|
|
31 parser = argparse.ArgumentParser(description="")
|
|
32 parser.add_argument("-o","--output", help="Name of output directory")
|
|
33 parser.add_argument("-q","--seqType", help="Type of reads; fa or fq")
|
|
34 parser.add_argument("-m","--max_memory", help="How much memory to allocate? Or maybe how many cpus?")
|
|
35 parser.add_argument("-p","--mem_per_cpu", help="Memory PER CPU, in GB, in case we want to multiply mem x cpu at runtime")
|
|
36 parser.add_argument("-s","--single", help="Single read file input")
|
|
37 parser.add_argument("-l","--left", help="Left read file from paired inputs")
|
|
38 parser.add_argument("-r","--right", help="Right read file from paired inputs")
|
|
39 parser.add_argument("-v","--verbose", help="Enable debugging messages to be displayed", action='store_true')
|
|
40 parser.add_argument("-g","--log", help="Log file")
|
|
41 parser.add_argument("-t","--timing", help="Timing file, if it exists", default=None)
|
|
42 parser.add_argument("-d","--dir", help="if supplying a rerunnable job, this is the (hopefully unique) name of the directory to run it in.")
|
|
43 parser.add_argument("-u","--user", help="Username to run job under")
|
|
44 parser.add_argument("-f","--fullpath", help="if supplying a rerunnable job, this is the full path (except the user and dir names) to run the job in.")
|
|
45 parser.add_argument("-c","--CPU", help="CPUs, either a hard coded numer or from Galaxy slots")
|
|
46 # parser.add_argument("-","--", help="")
|
|
47 args = parser.parse_args()
|
|
48
|
|
49 if args.verbose:
|
|
50 log.basicConfig(format='%(message)s',level=log.DEBUG)
|
|
51 cmd = ["Trinity"]
|
|
52
|
|
53 ### Add rerun ability ###########################################
|
|
54 # This variable tells us later whether to copy the files back to the job working directory
|
|
55 copyback = False
|
|
56 if args.dir and args.user and args.fullpath:
|
|
57 cleandir = args.dir
|
|
58 chars = "\\`*_{}[]()>#+-.!$&;| "
|
|
59 for c in chars:
|
|
60 if c in cleandir:
|
|
61 cleandir = cleandir.replace(c, "_")
|
|
62 rerunPath = "%s/%s/%s" % (args.fullpath, args.user, cleandir)
|
|
63 print "Rerunpath is ",rerunPath
|
|
64 try:
|
|
65 os.makedirs(rerunPath)
|
|
66 print "Created dir ",rerunPath
|
|
67 except OSError as exc:
|
|
68 if exc.errno == errno.EEXIST and os.path.isdir(rerunPath):
|
|
69 pass
|
|
70 else:
|
|
71 raise
|
|
72 copyback = os.getcwd()
|
|
73 outdir = copyback + "/" + TRINITY_OUT_DIR
|
|
74 try:
|
|
75 os.makedirs(outdir)
|
|
76 print "Created dir ",outdir
|
|
77 except OSError as exc:
|
|
78 if exc.errno == errno.EEXIST and os.path.isdir(outdir):
|
|
79 pass
|
|
80 else:
|
|
81 raise
|
|
82 os.chdir(rerunPath)
|
|
83
|
|
84 ### Add information for reads ###################################
|
|
85 if args.left and args.right:
|
|
86 cmd += ["--left",args.left,"--right", args.right]
|
|
87 elif args.single:
|
|
88 cmd += ["--single",args.single]
|
|
89 else:
|
|
90 raise Exception ("Need input files in order to run Trinity!")
|
|
91
|
|
92 ### Add seqtype ##################################################
|
|
93 if args.seqType:
|
|
94 cmd += ["--seqType",args.seqType]
|
|
95 else:
|
|
96 raise Exception ("Please specify a file type for your reads!")
|
|
97
|
|
98 ### Memory and CPU management ####################################
|
|
99 if args.mem_per_cpu and not args.max_memory:
|
|
100 if args.CPU:
|
|
101 memry = int(args.CPU) * int(args.mem_per_cpu)
|
|
102 memstr = "%dG" % (memry)
|
|
103 cmd += ["--max_memory",memstr]
|
|
104 else:
|
|
105 memry = 2 * int(args.mem_per_cpu)
|
|
106 memstr = "%dG" % (memry)
|
|
107 cmd += ["--max_memory",memstr]
|
|
108 elif args.max_memory and not args.mem_per_cpu:
|
|
109 cmd += ["--max_memory",args.max_memory]
|
|
110 else:
|
|
111 raise Exception ("Please pick Memory per cpu, or max mem, but not both.")
|
|
112 if args.CPU:
|
|
113 cmd += ["--CPU", args.CPU]
|
|
114
|
|
115 ### Enough args, let's run it ####################################
|
|
116 print "About to write to %s" % args.log
|
|
117 out = open(args.log, 'w')
|
|
118 totalattempts = attempts = 2
|
|
119 ec = 1
|
|
120 finish = 1
|
|
121 out.write("Command is:\n%s\n" % (" ".join(cmd)))
|
|
122
|
|
123 ### There is definitely some value in running the job more than once, especially if it dies for stupid reasons.. ###
|
|
124 while ec != 0 and attempts > 0 and finish != 0:
|
|
125
|
|
126 dt = datetime.now()
|
|
127 dtstr = dt.strftime("%d/%m/%y %H:%M")
|
|
128 out.write("Beginning attempt %d of Trinity job at %s\n" % (totalattempts - attempts +1, dtstr) )
|
|
129 attempts -= 1
|
|
130 ec = subprocess32.call(cmd, shell=False, stdin=None, stdout=out, stderr=out, timeout=None)
|
|
131 out.write("Trinity exited with status %d\n" % ec)
|
|
132
|
|
133 greplog = open("greplog", 'w')
|
|
134 cmds = ["grep", 'All commands completed successfully', args.log]
|
|
135 finish = subprocess32.call(cmds,shell=False, stdin=None, stdout=greplog, stderr=greplog, timeout=None)
|
|
136 greplog.close()
|
|
137 out.write("Finished and found the success command with grep code %d\n" % finish)
|
|
138
|
|
139 if ec == 0 and args.timing is not None:
|
|
140 if copyback is not False:
|
|
141 cwd = os.getcwd()
|
|
142 dest = copyback + "/" + TRINITY_OUT_DIR + "/Trinity.fasta"
|
|
143 src = cwd + "/" + TRINITY_OUT_DIR + "/Trinity.fasta"
|
|
144 print "copying trinity outputs from %s to %s" % (src, dest)
|
|
145 os.symlink(src, dest)
|
|
146
|
|
147 #copy the timing file into the log
|
|
148 try:
|
|
149 handle = open (args.timing, 'r')
|
|
150 for line in handle:
|
|
151 out.write(line)
|
|
152 handle.close()
|
|
153 except (OSError, IOError) as e:
|
|
154 print "Oops, no timing file found? ",e
|
|
155
|
|
156
|
|
157 out.close()
|
|
158 exit (ec)
|
|
159
|
|
160 if __name__ == "__main__":
|
|
161 main(*sys.argv)
|
|
162
|