Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/sandboxjs.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 """Evaluate CWL Javascript Expressions in a sandbox.""" | |
2 from __future__ import absolute_import | |
3 | |
4 import errno | |
5 import json | |
6 import os | |
7 import re | |
8 import select | |
9 import sys | |
10 import threading | |
11 from io import BytesIO | |
12 from typing import cast, Any, Dict, List, Optional, Tuple, Union | |
13 | |
14 import six | |
15 from future.utils import raise_from | |
16 from pkg_resources import resource_stream | |
17 from typing_extensions import Text # pylint: disable=unused-import | |
18 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
19 | |
20 from schema_salad.utils import json_dumps | |
21 | |
22 from .loghandler import _logger | |
23 from .utils import onWindows, processes_to_kill, subprocess | |
24 try: | |
25 import queue # type: ignore | |
26 except ImportError: | |
27 import Queue as queue # type: ignore | |
28 | |
29 | |
30 class JavascriptException(Exception): | |
31 pass | |
32 | |
33 | |
34 JSON = Union[Dict[Text, Any], List[Any], Text, int, float, bool, None] | |
35 | |
36 localdata = threading.local() | |
37 | |
38 default_timeout = 20 | |
39 have_node_slim = False | |
40 # minimum acceptable version of nodejs engine | |
41 minimum_node_version_str = '0.10.26' | |
42 | |
43 def check_js_threshold_version(working_alias): | |
44 # type: (str) -> bool | |
45 """ | |
46 Check if the nodeJS engine version on the system with the allowed minimum version. | |
47 | |
48 https://github.com/nodejs/node/blob/master/CHANGELOG.md#nodejs-changelog | |
49 """ | |
50 # parse nodejs version into int Tuple: 'v4.2.6\n' -> [4, 2, 6] | |
51 current_version_str = subprocess.check_output( | |
52 [working_alias, "-v"]).decode('utf-8') | |
53 | |
54 current_version = [int(v) for v in current_version_str.strip().strip('v').split('.')] | |
55 minimum_node_version = [int(v) for v in minimum_node_version_str.split('.')] | |
56 | |
57 return current_version >= minimum_node_version | |
58 | |
59 | |
60 def new_js_proc(js_text, force_docker_pull=False): | |
61 # type: (Text, bool) -> subprocess.Popen | |
62 | |
63 required_node_version, docker = (False,)*2 | |
64 nodejs = None | |
65 trynodes = ("nodejs", "node") | |
66 for n in trynodes: | |
67 try: | |
68 if subprocess.check_output([n, "--eval", "process.stdout.write('t')"]).decode('utf-8') != "t": | |
69 continue | |
70 else: | |
71 nodejs = subprocess.Popen([n, "--eval", js_text], | |
72 stdin=subprocess.PIPE, | |
73 stdout=subprocess.PIPE, | |
74 stderr=subprocess.PIPE) | |
75 processes_to_kill.append(nodejs) | |
76 required_node_version = check_js_threshold_version(n) | |
77 break | |
78 except (subprocess.CalledProcessError, OSError): | |
79 pass | |
80 | |
81 if nodejs is None or nodejs is not None and required_node_version is False: | |
82 try: | |
83 nodeimg = "node:slim" | |
84 global have_node_slim | |
85 | |
86 if not have_node_slim: | |
87 dockerimgs = subprocess.check_output(["docker", "images", "-q", nodeimg]).decode('utf-8') | |
88 # if output is an empty string | |
89 if (len(dockerimgs.split("\n")) <= 1) or force_docker_pull: | |
90 # pull node:slim docker container | |
91 nodejsimg = subprocess.check_output(["docker", "pull", nodeimg]).decode('utf-8') | |
92 _logger.info("Pulled Docker image %s %s", nodeimg, nodejsimg) | |
93 have_node_slim = True | |
94 nodejs = subprocess.Popen(["docker", "run", | |
95 "--attach=STDIN", "--attach=STDOUT", "--attach=STDERR", | |
96 "--sig-proxy=true", "--interactive", | |
97 "--rm", nodeimg, "node", "--eval", js_text], | |
98 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
99 processes_to_kill.append(nodejs) | |
100 docker = True | |
101 except OSError as e: | |
102 if e.errno == errno.ENOENT: | |
103 pass | |
104 else: | |
105 raise | |
106 except subprocess.CalledProcessError: | |
107 pass | |
108 | |
109 # docker failed and nodejs not on system | |
110 if nodejs is None: | |
111 raise JavascriptException( | |
112 u"cwltool requires Node.js engine to evaluate and validate " | |
113 u"Javascript expressions, but couldn't find it. Tried {}, " | |
114 u"docker run node:slim".format(u", ".join(trynodes))) | |
115 | |
116 # docker failed, but nodejs is installed on system but the version is below the required version | |
117 if docker is False and required_node_version is False: | |
118 raise JavascriptException( | |
119 u'cwltool requires minimum v{} version of Node.js engine.'.format( | |
120 minimum_node_version_str), | |
121 u'Try updating: https://docs.npmjs.com/getting-started/installing-node') | |
122 | |
123 return nodejs | |
124 | |
125 PROCESS_FINISHED_STR = "r1cepzbhUTxtykz5XTC4\n" | |
126 | |
127 def exec_js_process(js_text, # type: Text | |
128 timeout=default_timeout, # type: float | |
129 js_console=False, # type: bool | |
130 context=None, # type: Optional[Text] | |
131 force_docker_pull=False, # type: bool | |
132 ): | |
133 # type: (...) -> Tuple[int, Text, Text] | |
134 | |
135 if not hasattr(localdata, "procs"): | |
136 localdata.procs = {} | |
137 | |
138 if js_console and context is not None: | |
139 raise NotImplementedError("js_console=True and context not implemented") | |
140 | |
141 if js_console: | |
142 js_engine = 'cwlNodeEngineJSConsole.js' | |
143 _logger.warning( | |
144 "Running with support for javascript console in expressions (DO NOT USE IN PRODUCTION)") | |
145 elif context is not None: | |
146 js_engine = "cwlNodeEngineWithContext.js" | |
147 else: | |
148 js_engine = 'cwlNodeEngine.js' | |
149 | |
150 created_new_process = False | |
151 | |
152 if context is not None: | |
153 nodejs = localdata.procs.get((js_engine, context)) | |
154 else: | |
155 nodejs = localdata.procs.get(js_engine) | |
156 | |
157 if nodejs is None \ | |
158 or nodejs.poll() is not None \ | |
159 or onWindows(): | |
160 res = resource_stream(__name__, js_engine) | |
161 js_engine_code = res.read().decode('utf-8') | |
162 | |
163 created_new_process = True | |
164 | |
165 new_proc = new_js_proc(js_engine_code, force_docker_pull=force_docker_pull) | |
166 | |
167 if context is None: | |
168 localdata.procs[js_engine] = new_proc | |
169 nodejs = new_proc | |
170 else: | |
171 localdata.procs[(js_engine, context)] = new_proc | |
172 nodejs = new_proc | |
173 | |
174 killed = [] | |
175 | |
176 def terminate(): # type: () -> None | |
177 """Kill the node process if it exceeds timeout limit.""" | |
178 try: | |
179 killed.append(True) | |
180 nodejs.kill() | |
181 except OSError: | |
182 pass | |
183 | |
184 timer = threading.Timer(timeout, terminate) | |
185 timer.daemon = True | |
186 timer.start() | |
187 | |
188 stdin_text = u"" | |
189 if created_new_process and context is not None: | |
190 stdin_text = json_dumps(context) + "\n" | |
191 stdin_text += json_dumps(js_text) + "\n" | |
192 | |
193 stdin_buf = BytesIO(stdin_text.encode('utf-8')) | |
194 stdout_buf = BytesIO() | |
195 stderr_buf = BytesIO() | |
196 | |
197 rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO] | |
198 wselect = [nodejs.stdin] # type: List[BytesIO] | |
199 | |
200 | |
201 def process_finished(): # type: () -> bool | |
202 return stdout_buf.getvalue().decode('utf-8').endswith(PROCESS_FINISHED_STR) and \ | |
203 stderr_buf.getvalue().decode('utf-8').endswith(PROCESS_FINISHED_STR) | |
204 | |
205 # On windows system standard input/output are not handled properly by select module | |
206 # (modules like pywin32, msvcrt, gevent don't work either) | |
207 if sys.platform == 'win32': | |
208 READ_BYTES_SIZE = 512 | |
209 | |
210 # creating queue for reading from a thread to queue | |
211 input_queue = queue.Queue() | |
212 output_queue = queue.Queue() | |
213 error_queue = queue.Queue() | |
214 | |
215 # To tell threads that output has ended and threads can safely exit | |
216 no_more_output = threading.Lock() | |
217 no_more_output.acquire() | |
218 no_more_error = threading.Lock() | |
219 no_more_error.acquire() | |
220 | |
221 # put constructed command to input queue which then will be passed to nodejs's stdin | |
222 def put_input(input_queue): | |
223 while True: | |
224 buf = stdin_buf.read(READ_BYTES_SIZE) | |
225 if buf: | |
226 input_queue.put(buf) | |
227 else: | |
228 break | |
229 | |
230 # get the output from nodejs's stdout and continue till output ends | |
231 def get_output(output_queue): | |
232 while not no_more_output.acquire(False): | |
233 buf = os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE) | |
234 if buf: | |
235 output_queue.put(buf) | |
236 | |
237 # get the output from nodejs's stderr and continue till error output ends | |
238 def get_error(error_queue): | |
239 while not no_more_error.acquire(False): | |
240 buf = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE) | |
241 if buf: | |
242 error_queue.put(buf) | |
243 | |
244 # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively | |
245 input_thread = threading.Thread(target=put_input, args=(input_queue,)) | |
246 input_thread.daemon = True | |
247 input_thread.start() | |
248 output_thread = threading.Thread(target=get_output, args=(output_queue,)) | |
249 output_thread.daemon = True | |
250 output_thread.start() | |
251 error_thread = threading.Thread(target=get_error, args=(error_queue,)) | |
252 error_thread.daemon = True | |
253 error_thread.start() | |
254 | |
255 finished = False | |
256 | |
257 while not finished and timer.is_alive(): | |
258 try: | |
259 if nodejs.stdin in wselect: | |
260 if not input_queue.empty(): | |
261 os.write(nodejs.stdin.fileno(), input_queue.get()) | |
262 elif not input_thread.is_alive(): | |
263 wselect = [] | |
264 if nodejs.stdout in rselect: | |
265 if not output_queue.empty(): | |
266 stdout_buf.write(output_queue.get()) | |
267 | |
268 if nodejs.stderr in rselect: | |
269 if not error_queue.empty(): | |
270 stderr_buf.write(error_queue.get()) | |
271 | |
272 if process_finished() and error_queue.empty() and output_queue.empty(): | |
273 finished = True | |
274 no_more_output.release() | |
275 no_more_error.release() | |
276 except OSError: | |
277 break | |
278 | |
279 else: | |
280 while not process_finished() and timer.is_alive(): | |
281 rready, wready, _ = select.select(rselect, wselect, []) | |
282 try: | |
283 if nodejs.stdin in wready: | |
284 buf = stdin_buf.read(select.PIPE_BUF) | |
285 if buf: | |
286 os.write(nodejs.stdin.fileno(), buf) | |
287 for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)): | |
288 if pipes[0] in rready: | |
289 buf = os.read(pipes[0].fileno(), select.PIPE_BUF) | |
290 if buf: | |
291 pipes[1].write(buf) | |
292 except OSError: | |
293 break | |
294 timer.cancel() | |
295 | |
296 stdin_buf.close() | |
297 stdoutdata = stdout_buf.getvalue()[:-len(PROCESS_FINISHED_STR) - 1] | |
298 stderrdata = stderr_buf.getvalue()[:-len(PROCESS_FINISHED_STR) - 1] | |
299 | |
300 nodejs.poll() | |
301 | |
302 if nodejs.poll() not in (None, 0): | |
303 if killed: | |
304 returncode = -1 | |
305 else: | |
306 returncode = nodejs.returncode | |
307 else: | |
308 returncode = 0 | |
309 # On windows currently a new instance of nodejs process is used due to | |
310 # problem with blocking on read operation on windows | |
311 if onWindows(): | |
312 nodejs.kill() | |
313 | |
314 return returncode, stdoutdata.decode('utf-8'), stderrdata.decode('utf-8') | |
315 | |
316 def code_fragment_to_js(jscript, jslib=""): | |
317 # type: (Text, Text) -> Text | |
318 if isinstance(jscript, six.string_types) \ | |
319 and len(jscript) > 1 and jscript[0] == '{': | |
320 inner_js = jscript | |
321 else: | |
322 inner_js = "{return (%s);}" % jscript | |
323 | |
324 return u"\"use strict\";\n{}\n(function(){})()".format(jslib, inner_js) | |
325 | |
326 def execjs(js, # type: Text | |
327 jslib, # type: Text | |
328 timeout, # type: float | |
329 force_docker_pull=False, # type: bool | |
330 debug=False, # type: bool | |
331 js_console=False # type: bool | |
332 ): # type: (...) -> JSON | |
333 | |
334 fn = code_fragment_to_js(js, jslib) | |
335 | |
336 returncode, stdout, stderr = exec_js_process( | |
337 fn, timeout, js_console=js_console, | |
338 force_docker_pull=force_docker_pull) | |
339 | |
340 if js_console: | |
341 if stderr is not None: | |
342 _logger.info("Javascript console output:") | |
343 _logger.info("----------------------------------------") | |
344 _logger.info('\n'.join(re.findall( | |
345 r'^[[](?:log|err)[]].*$', stderr, flags=re.MULTILINE))) | |
346 _logger.info("----------------------------------------") | |
347 | |
348 def stdfmt(data): # type: (Text) -> Text | |
349 if "\n" in data: | |
350 return "\n" + data.strip() | |
351 return data | |
352 | |
353 def fn_linenum(): # type: () -> Text | |
354 lines = fn.splitlines() | |
355 ofs = 0 | |
356 maxlines = 99 | |
357 if len(lines) > maxlines: | |
358 ofs = len(lines) - maxlines | |
359 lines = lines[-maxlines:] | |
360 return u"\n".join(u"%02i %s" % (i + ofs + 1, b) for i, b in enumerate(lines)) | |
361 | |
362 if returncode != 0: | |
363 if debug: | |
364 info = u"returncode was: %s\nscript was:\n%s\nstdout was: %s\nstderr was: %s\n" %\ | |
365 (returncode, fn_linenum(), stdfmt(stdout), stdfmt(stderr)) | |
366 else: | |
367 info = u"Javascript expression was: %s\nstdout was: %s\nstderr was: %s" %\ | |
368 (js, stdfmt(stdout), stdfmt(stderr)) | |
369 | |
370 if returncode == -1: | |
371 raise JavascriptException( | |
372 u"Long-running script killed after {} seconds: {}".format( | |
373 timeout, info)) | |
374 else: | |
375 raise JavascriptException(info) | |
376 | |
377 try: | |
378 return cast(JSON, json.loads(stdout)) | |
379 except ValueError as err: | |
380 raise_from(JavascriptException( | |
381 u"{}\nscript was:\n{}\nstdout was: '{}'\nstderr was: '{}'\n".format( | |
382 err, fn_linenum(), stdout, stderr)), err) |