comparison env/lib/python3.9/site-packages/cwltool/sandboxjs.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Evaluate CWL Javascript Expressions in a sandbox."""
2
3 import errno
4 import json
5 import os
6 import queue
7 import re
8 import select
9 import subprocess # nosec
10 import sys
11 import threading
12 from io import BytesIO
13 from typing import List, Optional, Tuple, cast
14
15 from pkg_resources import resource_stream
16 from schema_salad.utils import json_dumps
17
18 from .loghandler import _logger
19 from .utils import CWLOutputType, onWindows, processes_to_kill
20
21
22 class JavascriptException(Exception):
23 pass
24
25
26 localdata = threading.local()
27
28 default_timeout = 20
29 have_node_slim = False
30 # minimum acceptable version of nodejs engine
31 minimum_node_version_str = "0.10.26"
32
33
34 def check_js_threshold_version(working_alias: str) -> bool:
35 """
36 Check if the nodeJS engine version on the system with the allowed minimum version.
37
38 https://github.com/nodejs/node/blob/master/CHANGELOG.md#nodejs-changelog
39 """
40 # parse nodejs version into int Tuple: 'v4.2.6\n' -> [4, 2, 6]
41 current_version_str = subprocess.check_output( # nosec
42 [working_alias, "-v"], universal_newlines=True
43 )
44
45 current_version = [
46 int(v) for v in current_version_str.strip().strip("v").split(".")
47 ]
48 minimum_node_version = [int(v) for v in minimum_node_version_str.split(".")]
49
50 return current_version >= minimum_node_version
51
52
53 def new_js_proc(js_text: str, force_docker_pull: bool = False):
54 # type: (...) -> subprocess.Popen[str]
55
56 required_node_version, docker = (False,) * 2
57 nodejs = None # type: Optional[subprocess.Popen[str]]
58 trynodes = ("nodejs", "node")
59 for n in trynodes:
60 try:
61 if (
62 subprocess.check_output( # nosec
63 [n, "--eval", "process.stdout.write('t')"], universal_newlines=True
64 )
65 != "t"
66 ):
67 continue
68 else:
69 nodejs = subprocess.Popen( # nosec
70 [n, "--eval", js_text],
71 stdin=subprocess.PIPE,
72 stdout=subprocess.PIPE,
73 stderr=subprocess.PIPE,
74 universal_newlines=True,
75 )
76 processes_to_kill.append(nodejs)
77 required_node_version = check_js_threshold_version(n)
78 break
79 except (subprocess.CalledProcessError, OSError):
80 pass
81
82 if nodejs is None or nodejs is not None and required_node_version is False:
83 try:
84 nodeimg = "node:slim"
85 global have_node_slim
86
87 if not have_node_slim:
88 dockerimgs = subprocess.check_output( # nosec
89 ["docker", "images", "-q", nodeimg], universal_newlines=True
90 )
91 # if output is an empty string
92 if (len(dockerimgs.split("\n")) <= 1) or force_docker_pull:
93 # pull node:slim docker container
94 nodejsimg = subprocess.check_output( # nosec
95 ["docker", "pull", nodeimg], universal_newlines=True
96 )
97 _logger.info("Pulled Docker image %s %s", nodeimg, nodejsimg)
98 have_node_slim = True
99 nodejs = subprocess.Popen( # nosec
100 [
101 "docker",
102 "run",
103 "--attach=STDIN",
104 "--attach=STDOUT",
105 "--attach=STDERR",
106 "--sig-proxy=true",
107 "--interactive",
108 "--rm",
109 nodeimg,
110 "node",
111 "--eval",
112 js_text,
113 ],
114 universal_newlines=True,
115 stdin=subprocess.PIPE,
116 stdout=subprocess.PIPE,
117 stderr=subprocess.PIPE,
118 )
119 processes_to_kill.append(nodejs)
120 docker = True
121 except OSError as e:
122 if e.errno == errno.ENOENT:
123 pass
124 else:
125 raise
126 except subprocess.CalledProcessError:
127 pass
128
129 # docker failed and nodejs not on system
130 if nodejs is None:
131 raise JavascriptException(
132 "cwltool requires Node.js engine to evaluate and validate "
133 "Javascript expressions, but couldn't find it. Tried {}, "
134 "docker run node:slim".format(", ".join(trynodes))
135 )
136
137 # docker failed, but nodejs is installed on system but the version is below the required version
138 if docker is False and required_node_version is False:
139 raise JavascriptException(
140 "cwltool requires minimum v{} version of Node.js engine.".format(
141 minimum_node_version_str
142 ),
143 "Try updating: https://docs.npmjs.com/getting-started/installing-node",
144 )
145
146 return nodejs
147
148
149 PROCESS_FINISHED_STR = "r1cepzbhUTxtykz5XTC4\n"
150
151
152 def exec_js_process(
153 js_text: str,
154 timeout: float = default_timeout,
155 js_console: bool = False,
156 context: Optional[str] = None,
157 force_docker_pull: bool = False,
158 ) -> Tuple[int, str, str]:
159
160 if not hasattr(localdata, "procs"):
161 localdata.procs = {}
162
163 if js_console and context is not None:
164 raise NotImplementedError("js_console=True and context not implemented")
165
166 if js_console:
167 js_engine = "cwlNodeEngineJSConsole.js"
168 _logger.warning(
169 "Running with support for javascript console in expressions (DO NOT USE IN PRODUCTION)"
170 )
171 elif context is not None:
172 js_engine = "cwlNodeEngineWithContext.js"
173 else:
174 js_engine = "cwlNodeEngine.js"
175
176 created_new_process = False
177
178 if context is not None:
179 nodejs = localdata.procs.get((js_engine, context))
180 else:
181 nodejs = localdata.procs.get(js_engine)
182
183 if nodejs is None or nodejs.poll() is not None or onWindows():
184 res = resource_stream(__name__, js_engine)
185 js_engine_code = res.read().decode("utf-8")
186
187 created_new_process = True
188
189 new_proc = new_js_proc(js_engine_code, force_docker_pull=force_docker_pull)
190
191 if context is None:
192 localdata.procs[js_engine] = new_proc
193 nodejs = new_proc
194 else:
195 localdata.procs[(js_engine, context)] = new_proc
196 nodejs = new_proc
197
198 killed = []
199
200 def terminate() -> None:
201 """Kill the node process if it exceeds timeout limit."""
202 try:
203 killed.append(True)
204 nodejs.kill()
205 except OSError:
206 pass
207
208 timer = threading.Timer(timeout, terminate)
209 timer.daemon = True
210 timer.start()
211
212 stdin_text = ""
213 if created_new_process and context is not None:
214 stdin_text = json_dumps(context) + "\n"
215 stdin_text += json_dumps(js_text) + "\n"
216
217 stdin_buf = BytesIO(stdin_text.encode("utf-8"))
218 stdout_buf = BytesIO()
219 stderr_buf = BytesIO()
220
221 rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO]
222 wselect = [nodejs.stdin] # type: List[BytesIO]
223
224 def process_finished() -> bool:
225 return stdout_buf.getvalue().decode("utf-8").endswith(
226 PROCESS_FINISHED_STR
227 ) and stderr_buf.getvalue().decode("utf-8").endswith(PROCESS_FINISHED_STR)
228
229 # On windows system standard input/output are not handled properly by select module
230 # (modules like pywin32, msvcrt, gevent don't work either)
231 if sys.platform == "win32":
232 READ_BYTES_SIZE = 512
233
234 # creating queue for reading from a thread to queue
235 input_queue = queue.Queue()
236 output_queue = queue.Queue()
237 error_queue = queue.Queue()
238
239 # To tell threads that output has ended and threads can safely exit
240 no_more_output = threading.Lock()
241 no_more_output.acquire()
242 no_more_error = threading.Lock()
243 no_more_error.acquire()
244
245 # put constructed command to input queue which then will be passed to nodejs's stdin
246 def put_input(input_queue):
247 while True:
248 buf = stdin_buf.read(READ_BYTES_SIZE)
249 if buf:
250 input_queue.put(buf)
251 else:
252 break
253
254 # get the output from nodejs's stdout and continue till output ends
255 def get_output(output_queue):
256 while not no_more_output.acquire(False):
257 buf = os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
258 if buf:
259 output_queue.put(buf)
260
261 # get the output from nodejs's stderr and continue till error output ends
262 def get_error(error_queue):
263 while not no_more_error.acquire(False):
264 buf = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
265 if buf:
266 error_queue.put(buf)
267
268 # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
269 input_thread = threading.Thread(target=put_input, args=(input_queue,))
270 input_thread.daemon = True
271 input_thread.start()
272 output_thread = threading.Thread(target=get_output, args=(output_queue,))
273 output_thread.daemon = True
274 output_thread.start()
275 error_thread = threading.Thread(target=get_error, args=(error_queue,))
276 error_thread.daemon = True
277 error_thread.start()
278
279 finished = False
280
281 while not finished and timer.is_alive():
282 try:
283 if nodejs.stdin in wselect:
284 if not input_queue.empty():
285 os.write(nodejs.stdin.fileno(), input_queue.get())
286 elif not input_thread.is_alive():
287 wselect = []
288 if nodejs.stdout in rselect:
289 if not output_queue.empty():
290 stdout_buf.write(output_queue.get())
291
292 if nodejs.stderr in rselect:
293 if not error_queue.empty():
294 stderr_buf.write(error_queue.get())
295
296 if process_finished() and error_queue.empty() and output_queue.empty():
297 finished = True
298 no_more_output.release()
299 no_more_error.release()
300 except OSError:
301 break
302
303 else:
304 while not process_finished() and timer.is_alive():
305 rready, wready, _ = select.select(rselect, wselect, [])
306 try:
307 if nodejs.stdin in wready:
308 buf = stdin_buf.read(select.PIPE_BUF)
309 if buf:
310 os.write(nodejs.stdin.fileno(), buf)
311 for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
312 if pipes[0] in rready:
313 buf = os.read(pipes[0].fileno(), select.PIPE_BUF)
314 if buf:
315 pipes[1].write(buf)
316 except OSError:
317 break
318 timer.cancel()
319
320 stdin_buf.close()
321 stdoutdata = stdout_buf.getvalue()[: -len(PROCESS_FINISHED_STR) - 1]
322 stderrdata = stderr_buf.getvalue()[: -len(PROCESS_FINISHED_STR) - 1]
323
324 nodejs.poll()
325
326 if nodejs.poll() not in (None, 0):
327 if killed:
328 returncode = -1
329 else:
330 returncode = nodejs.returncode
331 else:
332 returncode = 0
333 # On windows currently a new instance of nodejs process is used due to
334 # problem with blocking on read operation on windows
335 if onWindows():
336 nodejs.kill()
337
338 return returncode, stdoutdata.decode("utf-8"), stderrdata.decode("utf-8")
339
340
341 def code_fragment_to_js(jscript: str, jslib: str = "") -> str:
342 if isinstance(jscript, str) and len(jscript) > 1 and jscript[0] == "{":
343 inner_js = jscript
344 else:
345 inner_js = "{return (%s);}" % jscript
346
347 return f'"use strict";\n{jslib}\n(function(){inner_js})()'
348
349
350 def execjs(
351 js: str,
352 jslib: str,
353 timeout: float,
354 force_docker_pull: bool = False,
355 debug: bool = False,
356 js_console: bool = False,
357 ) -> CWLOutputType:
358
359 fn = code_fragment_to_js(js, jslib)
360
361 returncode, stdout, stderr = exec_js_process(
362 fn, timeout, js_console=js_console, force_docker_pull=force_docker_pull
363 )
364
365 if js_console:
366 if stderr is not None:
367 _logger.info("Javascript console output:")
368 _logger.info("----------------------------------------")
369 _logger.info(
370 "\n".join(
371 re.findall(r"^[[](?:log|err)[]].*$", stderr, flags=re.MULTILINE)
372 )
373 )
374 _logger.info("----------------------------------------")
375
376 def stdfmt(data: str) -> str:
377 if "\n" in data:
378 return "\n" + data.strip()
379 return data
380
381 def fn_linenum() -> str:
382 lines = fn.splitlines()
383 ofs = 0
384 maxlines = 99
385 if len(lines) > maxlines:
386 ofs = len(lines) - maxlines
387 lines = lines[-maxlines:]
388 return "\n".join("%02i %s" % (i + ofs + 1, b) for i, b in enumerate(lines))
389
390 if returncode != 0:
391 if debug:
392 info = (
393 "returncode was: %s\nscript was:\n%s\nstdout was: %s\nstderr was: %s\n"
394 % (returncode, fn_linenum(), stdfmt(stdout), stdfmt(stderr))
395 )
396 else:
397 info = (
398 "Javascript expression was: {}\nstdout was: {}\nstderr was: {}".format(
399 js,
400 stdfmt(stdout),
401 stdfmt(stderr),
402 )
403 )
404
405 if returncode == -1:
406 raise JavascriptException(
407 f"Long-running script killed after {timeout} seconds: {info}"
408 )
409 else:
410 raise JavascriptException(info)
411
412 try:
413 return cast(CWLOutputType, json.loads(stdout))
414 except ValueError as err:
415 raise JavascriptException(
416 "{}\nscript was:\n{}\nstdout was: '{}'\nstderr was: '{}'\n".format(
417 err, fn_linenum(), stdout, stderr
418 )
419 ) from err