comparison env/lib/python3.7/site-packages/cwltool/sandboxjs.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
comparison
equal deleted inserted replaced
4:79f47841a781 5:9b1c78e6ba9c
1 """Evaluate CWL Javascript Expressions in a sandbox."""
2 from __future__ import absolute_import
3
4 import errno
5 import json
6 import os
7 import re
8 import select
9 import sys
10 import threading
11 from io import BytesIO
12 from typing import cast, Any, Dict, List, Optional, Tuple, Union
13
14 import six
15 from future.utils import raise_from
16 from pkg_resources import resource_stream
17 from typing_extensions import Text # pylint: disable=unused-import
18 # move to a regular typing import when Python 3.3-3.6 is no longer supported
19
20 from schema_salad.utils import json_dumps
21
22 from .loghandler import _logger
23 from .utils import onWindows, processes_to_kill, subprocess
24 try:
25 import queue # type: ignore
26 except ImportError:
27 import Queue as queue # type: ignore
28
29
30 class JavascriptException(Exception):
31 pass
32
33
34 JSON = Union[Dict[Text, Any], List[Any], Text, int, float, bool, None]
35
36 localdata = threading.local()
37
38 default_timeout = 20
39 have_node_slim = False
40 # minimum acceptable version of nodejs engine
41 minimum_node_version_str = '0.10.26'
42
43 def check_js_threshold_version(working_alias):
44 # type: (str) -> bool
45 """
46 Check if the nodeJS engine version on the system with the allowed minimum version.
47
48 https://github.com/nodejs/node/blob/master/CHANGELOG.md#nodejs-changelog
49 """
50 # parse nodejs version into int Tuple: 'v4.2.6\n' -> [4, 2, 6]
51 current_version_str = subprocess.check_output(
52 [working_alias, "-v"]).decode('utf-8')
53
54 current_version = [int(v) for v in current_version_str.strip().strip('v').split('.')]
55 minimum_node_version = [int(v) for v in minimum_node_version_str.split('.')]
56
57 return current_version >= minimum_node_version
58
59
60 def new_js_proc(js_text, force_docker_pull=False):
61 # type: (Text, bool) -> subprocess.Popen
62
63 required_node_version, docker = (False,)*2
64 nodejs = None
65 trynodes = ("nodejs", "node")
66 for n in trynodes:
67 try:
68 if subprocess.check_output([n, "--eval", "process.stdout.write('t')"]).decode('utf-8') != "t":
69 continue
70 else:
71 nodejs = subprocess.Popen([n, "--eval", js_text],
72 stdin=subprocess.PIPE,
73 stdout=subprocess.PIPE,
74 stderr=subprocess.PIPE)
75 processes_to_kill.append(nodejs)
76 required_node_version = check_js_threshold_version(n)
77 break
78 except (subprocess.CalledProcessError, OSError):
79 pass
80
81 if nodejs is None or nodejs is not None and required_node_version is False:
82 try:
83 nodeimg = "node:slim"
84 global have_node_slim
85
86 if not have_node_slim:
87 dockerimgs = subprocess.check_output(["docker", "images", "-q", nodeimg]).decode('utf-8')
88 # if output is an empty string
89 if (len(dockerimgs.split("\n")) <= 1) or force_docker_pull:
90 # pull node:slim docker container
91 nodejsimg = subprocess.check_output(["docker", "pull", nodeimg]).decode('utf-8')
92 _logger.info("Pulled Docker image %s %s", nodeimg, nodejsimg)
93 have_node_slim = True
94 nodejs = subprocess.Popen(["docker", "run",
95 "--attach=STDIN", "--attach=STDOUT", "--attach=STDERR",
96 "--sig-proxy=true", "--interactive",
97 "--rm", nodeimg, "node", "--eval", js_text],
98 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
99 processes_to_kill.append(nodejs)
100 docker = True
101 except OSError as e:
102 if e.errno == errno.ENOENT:
103 pass
104 else:
105 raise
106 except subprocess.CalledProcessError:
107 pass
108
109 # docker failed and nodejs not on system
110 if nodejs is None:
111 raise JavascriptException(
112 u"cwltool requires Node.js engine to evaluate and validate "
113 u"Javascript expressions, but couldn't find it. Tried {}, "
114 u"docker run node:slim".format(u", ".join(trynodes)))
115
116 # docker failed, but nodejs is installed on system but the version is below the required version
117 if docker is False and required_node_version is False:
118 raise JavascriptException(
119 u'cwltool requires minimum v{} version of Node.js engine.'.format(
120 minimum_node_version_str),
121 u'Try updating: https://docs.npmjs.com/getting-started/installing-node')
122
123 return nodejs
124
125 PROCESS_FINISHED_STR = "r1cepzbhUTxtykz5XTC4\n"
126
127 def exec_js_process(js_text, # type: Text
128 timeout=default_timeout, # type: float
129 js_console=False, # type: bool
130 context=None, # type: Optional[Text]
131 force_docker_pull=False, # type: bool
132 ):
133 # type: (...) -> Tuple[int, Text, Text]
134
135 if not hasattr(localdata, "procs"):
136 localdata.procs = {}
137
138 if js_console and context is not None:
139 raise NotImplementedError("js_console=True and context not implemented")
140
141 if js_console:
142 js_engine = 'cwlNodeEngineJSConsole.js'
143 _logger.warning(
144 "Running with support for javascript console in expressions (DO NOT USE IN PRODUCTION)")
145 elif context is not None:
146 js_engine = "cwlNodeEngineWithContext.js"
147 else:
148 js_engine = 'cwlNodeEngine.js'
149
150 created_new_process = False
151
152 if context is not None:
153 nodejs = localdata.procs.get((js_engine, context))
154 else:
155 nodejs = localdata.procs.get(js_engine)
156
157 if nodejs is None \
158 or nodejs.poll() is not None \
159 or onWindows():
160 res = resource_stream(__name__, js_engine)
161 js_engine_code = res.read().decode('utf-8')
162
163 created_new_process = True
164
165 new_proc = new_js_proc(js_engine_code, force_docker_pull=force_docker_pull)
166
167 if context is None:
168 localdata.procs[js_engine] = new_proc
169 nodejs = new_proc
170 else:
171 localdata.procs[(js_engine, context)] = new_proc
172 nodejs = new_proc
173
174 killed = []
175
176 def terminate(): # type: () -> None
177 """Kill the node process if it exceeds timeout limit."""
178 try:
179 killed.append(True)
180 nodejs.kill()
181 except OSError:
182 pass
183
184 timer = threading.Timer(timeout, terminate)
185 timer.daemon = True
186 timer.start()
187
188 stdin_text = u""
189 if created_new_process and context is not None:
190 stdin_text = json_dumps(context) + "\n"
191 stdin_text += json_dumps(js_text) + "\n"
192
193 stdin_buf = BytesIO(stdin_text.encode('utf-8'))
194 stdout_buf = BytesIO()
195 stderr_buf = BytesIO()
196
197 rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO]
198 wselect = [nodejs.stdin] # type: List[BytesIO]
199
200
201 def process_finished(): # type: () -> bool
202 return stdout_buf.getvalue().decode('utf-8').endswith(PROCESS_FINISHED_STR) and \
203 stderr_buf.getvalue().decode('utf-8').endswith(PROCESS_FINISHED_STR)
204
205 # On windows system standard input/output are not handled properly by select module
206 # (modules like pywin32, msvcrt, gevent don't work either)
207 if sys.platform == 'win32':
208 READ_BYTES_SIZE = 512
209
210 # creating queue for reading from a thread to queue
211 input_queue = queue.Queue()
212 output_queue = queue.Queue()
213 error_queue = queue.Queue()
214
215 # To tell threads that output has ended and threads can safely exit
216 no_more_output = threading.Lock()
217 no_more_output.acquire()
218 no_more_error = threading.Lock()
219 no_more_error.acquire()
220
221 # put constructed command to input queue which then will be passed to nodejs's stdin
222 def put_input(input_queue):
223 while True:
224 buf = stdin_buf.read(READ_BYTES_SIZE)
225 if buf:
226 input_queue.put(buf)
227 else:
228 break
229
230 # get the output from nodejs's stdout and continue till output ends
231 def get_output(output_queue):
232 while not no_more_output.acquire(False):
233 buf = os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
234 if buf:
235 output_queue.put(buf)
236
237 # get the output from nodejs's stderr and continue till error output ends
238 def get_error(error_queue):
239 while not no_more_error.acquire(False):
240 buf = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
241 if buf:
242 error_queue.put(buf)
243
244 # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
245 input_thread = threading.Thread(target=put_input, args=(input_queue,))
246 input_thread.daemon = True
247 input_thread.start()
248 output_thread = threading.Thread(target=get_output, args=(output_queue,))
249 output_thread.daemon = True
250 output_thread.start()
251 error_thread = threading.Thread(target=get_error, args=(error_queue,))
252 error_thread.daemon = True
253 error_thread.start()
254
255 finished = False
256
257 while not finished and timer.is_alive():
258 try:
259 if nodejs.stdin in wselect:
260 if not input_queue.empty():
261 os.write(nodejs.stdin.fileno(), input_queue.get())
262 elif not input_thread.is_alive():
263 wselect = []
264 if nodejs.stdout in rselect:
265 if not output_queue.empty():
266 stdout_buf.write(output_queue.get())
267
268 if nodejs.stderr in rselect:
269 if not error_queue.empty():
270 stderr_buf.write(error_queue.get())
271
272 if process_finished() and error_queue.empty() and output_queue.empty():
273 finished = True
274 no_more_output.release()
275 no_more_error.release()
276 except OSError:
277 break
278
279 else:
280 while not process_finished() and timer.is_alive():
281 rready, wready, _ = select.select(rselect, wselect, [])
282 try:
283 if nodejs.stdin in wready:
284 buf = stdin_buf.read(select.PIPE_BUF)
285 if buf:
286 os.write(nodejs.stdin.fileno(), buf)
287 for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
288 if pipes[0] in rready:
289 buf = os.read(pipes[0].fileno(), select.PIPE_BUF)
290 if buf:
291 pipes[1].write(buf)
292 except OSError:
293 break
294 timer.cancel()
295
296 stdin_buf.close()
297 stdoutdata = stdout_buf.getvalue()[:-len(PROCESS_FINISHED_STR) - 1]
298 stderrdata = stderr_buf.getvalue()[:-len(PROCESS_FINISHED_STR) - 1]
299
300 nodejs.poll()
301
302 if nodejs.poll() not in (None, 0):
303 if killed:
304 returncode = -1
305 else:
306 returncode = nodejs.returncode
307 else:
308 returncode = 0
309 # On windows currently a new instance of nodejs process is used due to
310 # problem with blocking on read operation on windows
311 if onWindows():
312 nodejs.kill()
313
314 return returncode, stdoutdata.decode('utf-8'), stderrdata.decode('utf-8')
315
316 def code_fragment_to_js(jscript, jslib=""):
317 # type: (Text, Text) -> Text
318 if isinstance(jscript, six.string_types) \
319 and len(jscript) > 1 and jscript[0] == '{':
320 inner_js = jscript
321 else:
322 inner_js = "{return (%s);}" % jscript
323
324 return u"\"use strict\";\n{}\n(function(){})()".format(jslib, inner_js)
325
326 def execjs(js, # type: Text
327 jslib, # type: Text
328 timeout, # type: float
329 force_docker_pull=False, # type: bool
330 debug=False, # type: bool
331 js_console=False # type: bool
332 ): # type: (...) -> JSON
333
334 fn = code_fragment_to_js(js, jslib)
335
336 returncode, stdout, stderr = exec_js_process(
337 fn, timeout, js_console=js_console,
338 force_docker_pull=force_docker_pull)
339
340 if js_console:
341 if stderr is not None:
342 _logger.info("Javascript console output:")
343 _logger.info("----------------------------------------")
344 _logger.info('\n'.join(re.findall(
345 r'^[[](?:log|err)[]].*$', stderr, flags=re.MULTILINE)))
346 _logger.info("----------------------------------------")
347
348 def stdfmt(data): # type: (Text) -> Text
349 if "\n" in data:
350 return "\n" + data.strip()
351 return data
352
353 def fn_linenum(): # type: () -> Text
354 lines = fn.splitlines()
355 ofs = 0
356 maxlines = 99
357 if len(lines) > maxlines:
358 ofs = len(lines) - maxlines
359 lines = lines[-maxlines:]
360 return u"\n".join(u"%02i %s" % (i + ofs + 1, b) for i, b in enumerate(lines))
361
362 if returncode != 0:
363 if debug:
364 info = u"returncode was: %s\nscript was:\n%s\nstdout was: %s\nstderr was: %s\n" %\
365 (returncode, fn_linenum(), stdfmt(stdout), stdfmt(stderr))
366 else:
367 info = u"Javascript expression was: %s\nstdout was: %s\nstderr was: %s" %\
368 (js, stdfmt(stdout), stdfmt(stderr))
369
370 if returncode == -1:
371 raise JavascriptException(
372 u"Long-running script killed after {} seconds: {}".format(
373 timeout, info))
374 else:
375 raise JavascriptException(info)
376
377 try:
378 return cast(JSON, json.loads(stdout))
379 except ValueError as err:
380 raise_from(JavascriptException(
381 u"{}\nscript was:\n{}\nstdout was: '{}'\nstderr was: '{}'\n".format(
382 err, fn_linenum(), stdout, stderr)), err)