Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/planemo/cwl/cwl2script/cwl2script.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 import argparse | |
2 import cwltool.main | |
3 import sys | |
4 import os | |
5 import schema_salad | |
6 import logging | |
7 from cwltool.process import checkRequirements, shortname, adjustFiles | |
8 import shellescape | |
9 import re | |
10 import copy | |
11 import json | |
12 | |
13 needs_shell_quoting = re.compile(r"""(^$|[\s|&;()<>\'"$@])""").search | |
14 glob_metacharacters = re.compile(r"""[\[\]\*?]""").search | |
15 | |
16 def maybe_quote(arg): | |
17 return shellescape.quote(arg) if needs_shell_quoting(arg) else arg | |
18 | |
19 def generateScriptForTool(tool, job, outdir): | |
20 for j in tool.job(job, "", None, outdir=outdir): | |
21 return ("""mkdir -p %s # output directory | |
22 mkdir -p %s # temporary directory | |
23 %s%s%s | |
24 rm -r %s # clean up temporary directory | |
25 """ % (maybe_quote(j.outdir), maybe_quote(j.tmpdir), | |
26 " ".join([maybe_quote(arg) for arg in (j.command_line)]), | |
27 ' < %s' % maybe_quote(j.stdin) if j.stdin else '', | |
28 ' > %s' % maybe_quote(os.path.join(j.outdir, j.stdout)) if j.stdout else '', | |
29 maybe_quote(j.tmpdir)), | |
30 j.outdir, j.tmpdir) | |
31 | |
32 | |
33 def generateScriptForWorkflow(cwlwf, cwljob, outdir): | |
34 promises = {} | |
35 jobs = {} | |
36 script = ["#!/bin/sh", | |
37 "", | |
38 "# Workflow generated from %s using cwl2script" % (cwlwf.tool["id"]), | |
39 "" | |
40 "set -x", | |
41 "" | |
42 ] | |
43 | |
44 outdirs = [] | |
45 | |
46 for inp in cwlwf.tool["inputs"]: | |
47 promises[inp["id"]] = (cwlwf, cwljob[shortname(inp["id"])]) | |
48 | |
49 alloutputs_fufilled = False | |
50 while not alloutputs_fufilled: | |
51 # Iteratively go over the workflow steps, adding jobs to the script as their | |
52 # dependencies are fufilled by upstream workflow inputs or | |
53 # step outputs. Loop exits when the workflow outputs | |
54 # are satisfied. | |
55 | |
56 alloutputs_fufilled = True | |
57 | |
58 progress = False | |
59 for step in cwlwf.steps: | |
60 if step.tool["id"] not in jobs: | |
61 stepinputs_fufilled = True | |
62 for inp in step.tool["inputs"]: | |
63 if "source" in inp and inp["source"] not in promises: | |
64 stepinputs_fufilled = False | |
65 if stepinputs_fufilled: | |
66 jobobj = {} | |
67 | |
68 # TODO: Handle multiple inbound links | |
69 # TODO: Handle scatter/gather | |
70 # (both are discussed in section 5.1.2 in CWL spec draft-2) | |
71 | |
72 script.append("# Run step %s" % step.tool["id"]) | |
73 | |
74 for inp in step.tool["inputs"]: | |
75 if "source" in inp: | |
76 jobobj[shortname(inp["id"])] = promises[inp["source"]][1] | |
77 script.append("# depends on step %s" % promises[inp["source"]][0].tool["id"]) | |
78 elif "default" in inp: | |
79 d = copy.copy(inp["default"]) | |
80 jobobj[shortname(inp["id"])] = d | |
81 | |
82 (wfjob, joboutdir, jobtmpdir) = generateScriptForTool(step.embedded_tool, jobobj, None) | |
83 outdirs.append(joboutdir) | |
84 | |
85 jobs[step.tool["id"]] = True | |
86 | |
87 script.append(wfjob) | |
88 | |
89 for out in step.tool["outputs"]: | |
90 for toolout in step.embedded_tool.tool["outputs"]: | |
91 if shortname(toolout["id"]) == shortname(out["id"]): | |
92 if toolout["type"] != "File": | |
93 raise Exception("Only supports file outputs") | |
94 if glob_metacharacters(toolout["outputBinding"]["glob"]): | |
95 raise Exception("Only support glob with concrete filename.") | |
96 promises[out["id"]] = (step, {"class":"File", "path": os.path.join(joboutdir, toolout["outputBinding"]["glob"])}) | |
97 progress = True | |
98 | |
99 for out in cwlwf.tool["outputs"]: | |
100 if "source" in out: | |
101 if out["source"] not in promises: | |
102 alloutputs_fufilled = False | |
103 | |
104 if not alloutputs_fufilled and not progress: | |
105 raise Exception("Not making progress") | |
106 | |
107 outobj = {} | |
108 script.append("# Move output files to the current directory") | |
109 | |
110 for out in cwlwf.tool["outputs"]: | |
111 f = promises[out["source"]][1] | |
112 script.append("mv %s ." % (maybe_quote(f["path"]))) | |
113 f["path"] = os.path.basename(f["path"]) | |
114 | |
115 if f.get("secondaryFiles"): | |
116 script.append("mv %s ." % (' '.join([maybe_quote(sf["path"]) for sf in f["secondaryFiles"]]))) | |
117 for sf in f["secondaryFiles"]: | |
118 sf["path"] = os.path.basename(sf["path"]) | |
119 | |
120 outobj[shortname(out["id"])] = f | |
121 | |
122 script.append("") | |
123 script.append("# Clean up staging output directories") | |
124 script.append("rm -r %s" % (' '.join([maybe_quote(od) for od in outdirs]))) | |
125 script.append("") | |
126 | |
127 script.append("# Generate final output object") | |
128 script.append("echo '%s'" % json.dumps(outobj, indent=4)) | |
129 | |
130 return "\n".join(script) | |
131 | |
132 | |
133 | |
134 supportedProcessRequirements = ["SchemaDefRequirement"] | |
135 | |
136 def main(args=None): | |
137 parser = argparse.ArgumentParser() | |
138 parser.add_argument("cwltool", type=str) | |
139 parser.add_argument("cwljob", type=str) | |
140 | |
141 parser.add_argument("--conformance-test", action="store_true") | |
142 parser.add_argument("--no-container", action="store_true") | |
143 parser.add_argument("--basedir", type=str) | |
144 parser.add_argument("--outdir", type=str, default=os.getcwd()) | |
145 | |
146 options = parser.parse_args(args) | |
147 | |
148 uri = "file://" + os.path.abspath(options.cwljob) | |
149 | |
150 if options.conformance_test: | |
151 loader = schema_salad.ref_resolver.Loader({}) | |
152 else: | |
153 loader = schema_salad.ref_resolver.Loader({ | |
154 "@base": uri, | |
155 "path": { | |
156 "@type": "@id" | |
157 } | |
158 }) | |
159 | |
160 job, _ = loader.resolve_ref(uri) | |
161 | |
162 t = cwltool.main.load_tool(options.cwltool, False, False, cwltool.workflow.defaultMakeTool, True) | |
163 | |
164 if type(t) == int: | |
165 return t | |
166 | |
167 try: | |
168 checkRequirements(t.tool, supportedProcessRequirements) | |
169 except Exception as e: | |
170 logging.error(e) | |
171 return 33 | |
172 | |
173 for inp in t.tool["inputs"]: | |
174 if shortname(inp["id"]) in job: | |
175 pass | |
176 elif shortname(inp["id"]) not in job and "default" in inp: | |
177 job[shortname(inp["id"])] = copy.copy(inp["default"]) | |
178 elif shortname(inp["id"]) not in job and inp["type"][0] == "null": | |
179 pass | |
180 else: | |
181 raise Exception("Missing inputs `%s`" % shortname(inp["id"])) | |
182 | |
183 if options.conformance_test: | |
184 sys.stdout.write(json.dumps(cwltool.main.single_job_executor(t, job, options.basedir, options, conformance_test=True), indent=4)) | |
185 return 0 | |
186 | |
187 if not options.basedir: | |
188 options.basedir = os.path.dirname(os.path.abspath(options.cwljob)) | |
189 | |
190 outdir = options.outdir | |
191 | |
192 if t.tool["class"] == "Workflow": | |
193 print generateScriptForWorkflow(t, job, outdir) | |
194 elif t.tool["class"] == "CommandLineTool": | |
195 print generateScriptForTool(t, job, outdir) | |
196 | |
197 return 0 | |
198 | |
199 if __name__=="__main__": | |
200 sys.exit(main(sys.argv[1:])) |