Mercurial > repos > trinity_ctat > ctat_trinity_rnaseq
comparison ctat_trinity_wrapper.py @ 0:045dadbbb0a2 draft default tip
Upload ctat tools.
author | trinity_ctat |
---|---|
date | Tue, 17 Jul 2018 11:50:42 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:045dadbbb0a2 |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 ''' | |
4 trinity_runner.py | |
5 This program is used as a wrapper for Trinity to allow an automatic rerun of failed jobs. It takes arguments for a typical Trinity run: | |
6 ~ Required args ~ | |
7 Input files - single or paired (left and right) | |
8 File type (fasta, fastq) | |
9 Max memory - this I need to derive somehow from the dynamic runner using Galaxy slots | |
10 | |
11 ~ Optional args ~ | |
12 Output directory - this allows users to run the same job over in case it walltime'd out or failed for recoverable reasons. | |
13 | |
14 -- | |
15 Created Tuesday, 7 March 2017. | |
16 Carrie Ganote | |
17 | |
18 Licensed to Indiana University under Creative Commons 3.0 | |
19 ''' | |
20 import subprocess32 | |
21 import argparse | |
22 import logging as log | |
23 import sys | |
24 import os | |
25 import errno | |
26 from datetime import datetime | |
27 | |
28 TRINITY_OUT_DIR = "trinity_out_dir" | |
29 | |
30 def main(*args): | |
31 parser = argparse.ArgumentParser(description="") | |
32 parser.add_argument("-o","--output", help="Name of output directory") | |
33 parser.add_argument("-q","--seqType", help="Type of reads; fa or fq") | |
34 parser.add_argument("-m","--max_memory", help="How much memory to allocate? Or maybe how many cpus?") | |
35 parser.add_argument("-p","--mem_per_cpu", help="Memory PER CPU, in GB, in case we want to multiply mem x cpu at runtime") | |
36 parser.add_argument("-s","--single", help="Single read file input") | |
37 parser.add_argument("-l","--left", help="Left read file from paired inputs") | |
38 parser.add_argument("-r","--right", help="Right read file from paired inputs") | |
39 parser.add_argument("-v","--verbose", help="Enable debugging messages to be displayed", action='store_true') | |
40 parser.add_argument("-g","--log", help="Log file") | |
41 parser.add_argument("-t","--timing", help="Timing file, if it exists", default=None) | |
42 parser.add_argument("-d","--dir", help="if supplying a rerunnable job, this is the (hopefully unique) name of the directory to run it in.") | |
43 parser.add_argument("-u","--user", help="Username to run job under") | |
44 parser.add_argument("-f","--fullpath", help="if supplying a rerunnable job, this is the full path (except the user and dir names) to run the job in.") | |
45 parser.add_argument("-c","--CPU", help="CPUs, either a hard coded numer or from Galaxy slots") | |
46 # parser.add_argument("-","--", help="") | |
47 args = parser.parse_args() | |
48 | |
49 if args.verbose: | |
50 log.basicConfig(format='%(message)s',level=log.DEBUG) | |
51 cmd = ["Trinity"] | |
52 | |
53 ### Add rerun ability ########################################### | |
54 # This variable tells us later whether to copy the files back to the job working directory | |
55 copyback = False | |
56 if args.dir and args.user and args.fullpath: | |
57 cleandir = args.dir | |
58 chars = "\\`*_{}[]()>#+-.!$&;| " | |
59 for c in chars: | |
60 if c in cleandir: | |
61 cleandir = cleandir.replace(c, "_") | |
62 rerunPath = "%s/%s/%s" % (args.fullpath, args.user, cleandir) | |
63 print "Rerunpath is ",rerunPath | |
64 try: | |
65 os.makedirs(rerunPath) | |
66 print "Created dir ",rerunPath | |
67 except OSError as exc: | |
68 if exc.errno == errno.EEXIST and os.path.isdir(rerunPath): | |
69 pass | |
70 else: | |
71 raise | |
72 copyback = os.getcwd() | |
73 outdir = copyback + "/" + TRINITY_OUT_DIR | |
74 try: | |
75 os.makedirs(outdir) | |
76 print "Created dir ",outdir | |
77 except OSError as exc: | |
78 if exc.errno == errno.EEXIST and os.path.isdir(outdir): | |
79 pass | |
80 else: | |
81 raise | |
82 os.chdir(rerunPath) | |
83 | |
84 ### Add information for reads ################################### | |
85 if args.left and args.right: | |
86 cmd += ["--left",args.left,"--right", args.right] | |
87 elif args.single: | |
88 cmd += ["--single",args.single] | |
89 else: | |
90 raise Exception ("Need input files in order to run Trinity!") | |
91 | |
92 ### Add seqtype ################################################## | |
93 if args.seqType: | |
94 cmd += ["--seqType",args.seqType] | |
95 else: | |
96 raise Exception ("Please specify a file type for your reads!") | |
97 | |
98 ### Memory and CPU management #################################### | |
99 if args.mem_per_cpu and not args.max_memory: | |
100 if args.CPU: | |
101 memry = int(args.CPU) * int(args.mem_per_cpu) | |
102 memstr = "%dG" % (memry) | |
103 cmd += ["--max_memory",memstr] | |
104 else: | |
105 memry = 2 * int(args.mem_per_cpu) | |
106 memstr = "%dG" % (memry) | |
107 cmd += ["--max_memory",memstr] | |
108 elif args.max_memory and not args.mem_per_cpu: | |
109 cmd += ["--max_memory",args.max_memory] | |
110 else: | |
111 raise Exception ("Please pick Memory per cpu, or max mem, but not both.") | |
112 if args.CPU: | |
113 cmd += ["--CPU", args.CPU] | |
114 | |
115 ### Enough args, let's run it #################################### | |
116 print "About to write to %s" % args.log | |
117 out = open(args.log, 'w') | |
118 totalattempts = attempts = 2 | |
119 ec = 1 | |
120 finish = 1 | |
121 out.write("Command is:\n%s\n" % (" ".join(cmd))) | |
122 | |
123 ### There is definitely some value in running the job more than once, especially if it dies for stupid reasons.. ### | |
124 while ec != 0 and attempts > 0 and finish != 0: | |
125 | |
126 dt = datetime.now() | |
127 dtstr = dt.strftime("%d/%m/%y %H:%M") | |
128 out.write("Beginning attempt %d of Trinity job at %s\n" % (totalattempts - attempts +1, dtstr) ) | |
129 attempts -= 1 | |
130 ec = subprocess32.call(cmd, shell=False, stdin=None, stdout=out, stderr=out, timeout=None) | |
131 out.write("Trinity exited with status %d\n" % ec) | |
132 | |
133 greplog = open("greplog", 'w') | |
134 cmds = ["grep", 'All commands completed successfully', args.log] | |
135 finish = subprocess32.call(cmds,shell=False, stdin=None, stdout=greplog, stderr=greplog, timeout=None) | |
136 greplog.close() | |
137 out.write("Finished and found the success command with grep code %d\n" % finish) | |
138 | |
139 if ec == 0 and args.timing is not None: | |
140 if copyback is not False: | |
141 cwd = os.getcwd() | |
142 dest = copyback + "/" + TRINITY_OUT_DIR + "/Trinity.fasta" | |
143 src = cwd + "/" + TRINITY_OUT_DIR + "/Trinity.fasta" | |
144 print "copying trinity outputs from %s to %s" % (src, dest) | |
145 os.symlink(src, dest) | |
146 | |
147 #copy the timing file into the log | |
148 try: | |
149 handle = open (args.timing, 'r') | |
150 for line in handle: | |
151 out.write(line) | |
152 handle.close() | |
153 except (OSError, IOError) as e: | |
154 print "Oops, no timing file found? ",e | |
155 | |
156 | |
157 out.close() | |
158 exit (ec) | |
159 | |
160 if __name__ == "__main__": | |
161 main(*sys.argv) | |
162 |