Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/sandboxjs.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Evaluate CWL Javascript Expressions in a sandbox.""" | |
2 | |
3 import errno | |
4 import json | |
5 import os | |
6 import queue | |
7 import re | |
8 import select | |
9 import subprocess # nosec | |
10 import sys | |
11 import threading | |
12 from io import BytesIO | |
13 from typing import List, Optional, Tuple, cast | |
14 | |
15 from pkg_resources import resource_stream | |
16 from schema_salad.utils import json_dumps | |
17 | |
18 from .loghandler import _logger | |
19 from .utils import CWLOutputType, onWindows, processes_to_kill | |
20 | |
21 | |
22 class JavascriptException(Exception): | |
23 pass | |
24 | |
25 | |
26 localdata = threading.local() | |
27 | |
28 default_timeout = 20 | |
29 have_node_slim = False | |
30 # minimum acceptable version of nodejs engine | |
31 minimum_node_version_str = "0.10.26" | |
32 | |
33 | |
34 def check_js_threshold_version(working_alias: str) -> bool: | |
35 """ | |
36 Check if the nodeJS engine version on the system with the allowed minimum version. | |
37 | |
38 https://github.com/nodejs/node/blob/master/CHANGELOG.md#nodejs-changelog | |
39 """ | |
40 # parse nodejs version into int Tuple: 'v4.2.6\n' -> [4, 2, 6] | |
41 current_version_str = subprocess.check_output( # nosec | |
42 [working_alias, "-v"], universal_newlines=True | |
43 ) | |
44 | |
45 current_version = [ | |
46 int(v) for v in current_version_str.strip().strip("v").split(".") | |
47 ] | |
48 minimum_node_version = [int(v) for v in minimum_node_version_str.split(".")] | |
49 | |
50 return current_version >= minimum_node_version | |
51 | |
52 | |
53 def new_js_proc(js_text: str, force_docker_pull: bool = False): | |
54 # type: (...) -> subprocess.Popen[str] | |
55 | |
56 required_node_version, docker = (False,) * 2 | |
57 nodejs = None # type: Optional[subprocess.Popen[str]] | |
58 trynodes = ("nodejs", "node") | |
59 for n in trynodes: | |
60 try: | |
61 if ( | |
62 subprocess.check_output( # nosec | |
63 [n, "--eval", "process.stdout.write('t')"], universal_newlines=True | |
64 ) | |
65 != "t" | |
66 ): | |
67 continue | |
68 else: | |
69 nodejs = subprocess.Popen( # nosec | |
70 [n, "--eval", js_text], | |
71 stdin=subprocess.PIPE, | |
72 stdout=subprocess.PIPE, | |
73 stderr=subprocess.PIPE, | |
74 universal_newlines=True, | |
75 ) | |
76 processes_to_kill.append(nodejs) | |
77 required_node_version = check_js_threshold_version(n) | |
78 break | |
79 except (subprocess.CalledProcessError, OSError): | |
80 pass | |
81 | |
82 if nodejs is None or nodejs is not None and required_node_version is False: | |
83 try: | |
84 nodeimg = "node:slim" | |
85 global have_node_slim | |
86 | |
87 if not have_node_slim: | |
88 dockerimgs = subprocess.check_output( # nosec | |
89 ["docker", "images", "-q", nodeimg], universal_newlines=True | |
90 ) | |
91 # if output is an empty string | |
92 if (len(dockerimgs.split("\n")) <= 1) or force_docker_pull: | |
93 # pull node:slim docker container | |
94 nodejsimg = subprocess.check_output( # nosec | |
95 ["docker", "pull", nodeimg], universal_newlines=True | |
96 ) | |
97 _logger.info("Pulled Docker image %s %s", nodeimg, nodejsimg) | |
98 have_node_slim = True | |
99 nodejs = subprocess.Popen( # nosec | |
100 [ | |
101 "docker", | |
102 "run", | |
103 "--attach=STDIN", | |
104 "--attach=STDOUT", | |
105 "--attach=STDERR", | |
106 "--sig-proxy=true", | |
107 "--interactive", | |
108 "--rm", | |
109 nodeimg, | |
110 "node", | |
111 "--eval", | |
112 js_text, | |
113 ], | |
114 universal_newlines=True, | |
115 stdin=subprocess.PIPE, | |
116 stdout=subprocess.PIPE, | |
117 stderr=subprocess.PIPE, | |
118 ) | |
119 processes_to_kill.append(nodejs) | |
120 docker = True | |
121 except OSError as e: | |
122 if e.errno == errno.ENOENT: | |
123 pass | |
124 else: | |
125 raise | |
126 except subprocess.CalledProcessError: | |
127 pass | |
128 | |
129 # docker failed and nodejs not on system | |
130 if nodejs is None: | |
131 raise JavascriptException( | |
132 "cwltool requires Node.js engine to evaluate and validate " | |
133 "Javascript expressions, but couldn't find it. Tried {}, " | |
134 "docker run node:slim".format(", ".join(trynodes)) | |
135 ) | |
136 | |
137 # docker failed, but nodejs is installed on system but the version is below the required version | |
138 if docker is False and required_node_version is False: | |
139 raise JavascriptException( | |
140 "cwltool requires minimum v{} version of Node.js engine.".format( | |
141 minimum_node_version_str | |
142 ), | |
143 "Try updating: https://docs.npmjs.com/getting-started/installing-node", | |
144 ) | |
145 | |
146 return nodejs | |
147 | |
148 | |
149 PROCESS_FINISHED_STR = "r1cepzbhUTxtykz5XTC4\n" | |
150 | |
151 | |
152 def exec_js_process( | |
153 js_text: str, | |
154 timeout: float = default_timeout, | |
155 js_console: bool = False, | |
156 context: Optional[str] = None, | |
157 force_docker_pull: bool = False, | |
158 ) -> Tuple[int, str, str]: | |
159 | |
160 if not hasattr(localdata, "procs"): | |
161 localdata.procs = {} | |
162 | |
163 if js_console and context is not None: | |
164 raise NotImplementedError("js_console=True and context not implemented") | |
165 | |
166 if js_console: | |
167 js_engine = "cwlNodeEngineJSConsole.js" | |
168 _logger.warning( | |
169 "Running with support for javascript console in expressions (DO NOT USE IN PRODUCTION)" | |
170 ) | |
171 elif context is not None: | |
172 js_engine = "cwlNodeEngineWithContext.js" | |
173 else: | |
174 js_engine = "cwlNodeEngine.js" | |
175 | |
176 created_new_process = False | |
177 | |
178 if context is not None: | |
179 nodejs = localdata.procs.get((js_engine, context)) | |
180 else: | |
181 nodejs = localdata.procs.get(js_engine) | |
182 | |
183 if nodejs is None or nodejs.poll() is not None or onWindows(): | |
184 res = resource_stream(__name__, js_engine) | |
185 js_engine_code = res.read().decode("utf-8") | |
186 | |
187 created_new_process = True | |
188 | |
189 new_proc = new_js_proc(js_engine_code, force_docker_pull=force_docker_pull) | |
190 | |
191 if context is None: | |
192 localdata.procs[js_engine] = new_proc | |
193 nodejs = new_proc | |
194 else: | |
195 localdata.procs[(js_engine, context)] = new_proc | |
196 nodejs = new_proc | |
197 | |
198 killed = [] | |
199 | |
200 def terminate() -> None: | |
201 """Kill the node process if it exceeds timeout limit.""" | |
202 try: | |
203 killed.append(True) | |
204 nodejs.kill() | |
205 except OSError: | |
206 pass | |
207 | |
208 timer = threading.Timer(timeout, terminate) | |
209 timer.daemon = True | |
210 timer.start() | |
211 | |
212 stdin_text = "" | |
213 if created_new_process and context is not None: | |
214 stdin_text = json_dumps(context) + "\n" | |
215 stdin_text += json_dumps(js_text) + "\n" | |
216 | |
217 stdin_buf = BytesIO(stdin_text.encode("utf-8")) | |
218 stdout_buf = BytesIO() | |
219 stderr_buf = BytesIO() | |
220 | |
221 rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO] | |
222 wselect = [nodejs.stdin] # type: List[BytesIO] | |
223 | |
224 def process_finished() -> bool: | |
225 return stdout_buf.getvalue().decode("utf-8").endswith( | |
226 PROCESS_FINISHED_STR | |
227 ) and stderr_buf.getvalue().decode("utf-8").endswith(PROCESS_FINISHED_STR) | |
228 | |
229 # On windows system standard input/output are not handled properly by select module | |
230 # (modules like pywin32, msvcrt, gevent don't work either) | |
231 if sys.platform == "win32": | |
232 READ_BYTES_SIZE = 512 | |
233 | |
234 # creating queue for reading from a thread to queue | |
235 input_queue = queue.Queue() | |
236 output_queue = queue.Queue() | |
237 error_queue = queue.Queue() | |
238 | |
239 # To tell threads that output has ended and threads can safely exit | |
240 no_more_output = threading.Lock() | |
241 no_more_output.acquire() | |
242 no_more_error = threading.Lock() | |
243 no_more_error.acquire() | |
244 | |
245 # put constructed command to input queue which then will be passed to nodejs's stdin | |
246 def put_input(input_queue): | |
247 while True: | |
248 buf = stdin_buf.read(READ_BYTES_SIZE) | |
249 if buf: | |
250 input_queue.put(buf) | |
251 else: | |
252 break | |
253 | |
254 # get the output from nodejs's stdout and continue till output ends | |
255 def get_output(output_queue): | |
256 while not no_more_output.acquire(False): | |
257 buf = os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE) | |
258 if buf: | |
259 output_queue.put(buf) | |
260 | |
261 # get the output from nodejs's stderr and continue till error output ends | |
262 def get_error(error_queue): | |
263 while not no_more_error.acquire(False): | |
264 buf = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE) | |
265 if buf: | |
266 error_queue.put(buf) | |
267 | |
268 # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively | |
269 input_thread = threading.Thread(target=put_input, args=(input_queue,)) | |
270 input_thread.daemon = True | |
271 input_thread.start() | |
272 output_thread = threading.Thread(target=get_output, args=(output_queue,)) | |
273 output_thread.daemon = True | |
274 output_thread.start() | |
275 error_thread = threading.Thread(target=get_error, args=(error_queue,)) | |
276 error_thread.daemon = True | |
277 error_thread.start() | |
278 | |
279 finished = False | |
280 | |
281 while not finished and timer.is_alive(): | |
282 try: | |
283 if nodejs.stdin in wselect: | |
284 if not input_queue.empty(): | |
285 os.write(nodejs.stdin.fileno(), input_queue.get()) | |
286 elif not input_thread.is_alive(): | |
287 wselect = [] | |
288 if nodejs.stdout in rselect: | |
289 if not output_queue.empty(): | |
290 stdout_buf.write(output_queue.get()) | |
291 | |
292 if nodejs.stderr in rselect: | |
293 if not error_queue.empty(): | |
294 stderr_buf.write(error_queue.get()) | |
295 | |
296 if process_finished() and error_queue.empty() and output_queue.empty(): | |
297 finished = True | |
298 no_more_output.release() | |
299 no_more_error.release() | |
300 except OSError: | |
301 break | |
302 | |
303 else: | |
304 while not process_finished() and timer.is_alive(): | |
305 rready, wready, _ = select.select(rselect, wselect, []) | |
306 try: | |
307 if nodejs.stdin in wready: | |
308 buf = stdin_buf.read(select.PIPE_BUF) | |
309 if buf: | |
310 os.write(nodejs.stdin.fileno(), buf) | |
311 for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)): | |
312 if pipes[0] in rready: | |
313 buf = os.read(pipes[0].fileno(), select.PIPE_BUF) | |
314 if buf: | |
315 pipes[1].write(buf) | |
316 except OSError: | |
317 break | |
318 timer.cancel() | |
319 | |
320 stdin_buf.close() | |
321 stdoutdata = stdout_buf.getvalue()[: -len(PROCESS_FINISHED_STR) - 1] | |
322 stderrdata = stderr_buf.getvalue()[: -len(PROCESS_FINISHED_STR) - 1] | |
323 | |
324 nodejs.poll() | |
325 | |
326 if nodejs.poll() not in (None, 0): | |
327 if killed: | |
328 returncode = -1 | |
329 else: | |
330 returncode = nodejs.returncode | |
331 else: | |
332 returncode = 0 | |
333 # On windows currently a new instance of nodejs process is used due to | |
334 # problem with blocking on read operation on windows | |
335 if onWindows(): | |
336 nodejs.kill() | |
337 | |
338 return returncode, stdoutdata.decode("utf-8"), stderrdata.decode("utf-8") | |
339 | |
340 | |
341 def code_fragment_to_js(jscript: str, jslib: str = "") -> str: | |
342 if isinstance(jscript, str) and len(jscript) > 1 and jscript[0] == "{": | |
343 inner_js = jscript | |
344 else: | |
345 inner_js = "{return (%s);}" % jscript | |
346 | |
347 return f'"use strict";\n{jslib}\n(function(){inner_js})()' | |
348 | |
349 | |
350 def execjs( | |
351 js: str, | |
352 jslib: str, | |
353 timeout: float, | |
354 force_docker_pull: bool = False, | |
355 debug: bool = False, | |
356 js_console: bool = False, | |
357 ) -> CWLOutputType: | |
358 | |
359 fn = code_fragment_to_js(js, jslib) | |
360 | |
361 returncode, stdout, stderr = exec_js_process( | |
362 fn, timeout, js_console=js_console, force_docker_pull=force_docker_pull | |
363 ) | |
364 | |
365 if js_console: | |
366 if stderr is not None: | |
367 _logger.info("Javascript console output:") | |
368 _logger.info("----------------------------------------") | |
369 _logger.info( | |
370 "\n".join( | |
371 re.findall(r"^[[](?:log|err)[]].*$", stderr, flags=re.MULTILINE) | |
372 ) | |
373 ) | |
374 _logger.info("----------------------------------------") | |
375 | |
376 def stdfmt(data: str) -> str: | |
377 if "\n" in data: | |
378 return "\n" + data.strip() | |
379 return data | |
380 | |
381 def fn_linenum() -> str: | |
382 lines = fn.splitlines() | |
383 ofs = 0 | |
384 maxlines = 99 | |
385 if len(lines) > maxlines: | |
386 ofs = len(lines) - maxlines | |
387 lines = lines[-maxlines:] | |
388 return "\n".join("%02i %s" % (i + ofs + 1, b) for i, b in enumerate(lines)) | |
389 | |
390 if returncode != 0: | |
391 if debug: | |
392 info = ( | |
393 "returncode was: %s\nscript was:\n%s\nstdout was: %s\nstderr was: %s\n" | |
394 % (returncode, fn_linenum(), stdfmt(stdout), stdfmt(stderr)) | |
395 ) | |
396 else: | |
397 info = ( | |
398 "Javascript expression was: {}\nstdout was: {}\nstderr was: {}".format( | |
399 js, | |
400 stdfmt(stdout), | |
401 stdfmt(stderr), | |
402 ) | |
403 ) | |
404 | |
405 if returncode == -1: | |
406 raise JavascriptException( | |
407 f"Long-running script killed after {timeout} seconds: {info}" | |
408 ) | |
409 else: | |
410 raise JavascriptException(info) | |
411 | |
412 try: | |
413 return cast(CWLOutputType, json.loads(stdout)) | |
414 except ValueError as err: | |
415 raise JavascriptException( | |
416 "{}\nscript was:\n{}\nstdout was: '{}'\nstderr was: '{}'\n".format( | |
417 err, fn_linenum(), stdout, stderr | |
418 ) | |
419 ) from err |