view env/lib/python3.7/site-packages/cwltool/sandboxjs.py @ 4:79f47841a781 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:47:39 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

"""Evaluate CWL Javascript Expressions in a sandbox."""
from __future__ import absolute_import

import errno
import json
import os
import re
import select
import sys
import threading
from io import BytesIO
from typing import cast, Any, Dict, List, Optional, Tuple, Union

import six
from future.utils import raise_from
from pkg_resources import resource_stream
from typing_extensions import Text  # pylint: disable=unused-import
# move to a regular typing import when Python 3.3-3.6 is no longer supported

from schema_salad.utils import json_dumps

from .loghandler import _logger
from .utils import onWindows, processes_to_kill, subprocess
try:
    import queue  # type: ignore
except ImportError:
    import Queue as queue  # type: ignore


class JavascriptException(Exception):
    pass


JSON = Union[Dict[Text, Any], List[Any], Text, int, float, bool, None]

localdata = threading.local()

default_timeout = 20
have_node_slim = False
# minimum acceptable version of nodejs engine
minimum_node_version_str = '0.10.26'

def check_js_threshold_version(working_alias):
    # type: (str) -> bool
    """
    Check if the nodeJS engine version on the system with the allowed minimum version.

    https://github.com/nodejs/node/blob/master/CHANGELOG.md#nodejs-changelog
    """
    # parse nodejs version into int Tuple: 'v4.2.6\n' -> [4, 2, 6]
    current_version_str = subprocess.check_output(
        [working_alias, "-v"]).decode('utf-8')

    current_version = [int(v) for v in current_version_str.strip().strip('v').split('.')]
    minimum_node_version = [int(v) for v in minimum_node_version_str.split('.')]

    return current_version >= minimum_node_version


def new_js_proc(js_text, force_docker_pull=False):
    # type: (Text, bool) -> subprocess.Popen

    required_node_version, docker = (False,)*2
    nodejs = None
    trynodes = ("nodejs", "node")
    for n in trynodes:
        try:
            if subprocess.check_output([n, "--eval", "process.stdout.write('t')"]).decode('utf-8') != "t":
                continue
            else:
                nodejs = subprocess.Popen([n, "--eval", js_text],
                                          stdin=subprocess.PIPE,
                                          stdout=subprocess.PIPE,
                                          stderr=subprocess.PIPE)
                processes_to_kill.append(nodejs)
                required_node_version = check_js_threshold_version(n)
                break
        except (subprocess.CalledProcessError, OSError):
            pass

    if nodejs is None or nodejs is not None and required_node_version is False:
        try:
            nodeimg = "node:slim"
            global have_node_slim

            if not have_node_slim:
                dockerimgs = subprocess.check_output(["docker", "images", "-q", nodeimg]).decode('utf-8')
                # if output is an empty string
                if (len(dockerimgs.split("\n")) <= 1) or force_docker_pull:
                    # pull node:slim docker container
                    nodejsimg = subprocess.check_output(["docker", "pull", nodeimg]).decode('utf-8')
                    _logger.info("Pulled Docker image %s %s", nodeimg, nodejsimg)
                have_node_slim = True
            nodejs = subprocess.Popen(["docker", "run",
                                       "--attach=STDIN", "--attach=STDOUT", "--attach=STDERR",
                                       "--sig-proxy=true", "--interactive",
                                       "--rm", nodeimg, "node", "--eval", js_text],
                                      stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            processes_to_kill.append(nodejs)
            docker = True
        except OSError as e:
            if e.errno == errno.ENOENT:
                pass
            else:
                raise
        except subprocess.CalledProcessError:
            pass

    # docker failed and nodejs not on system
    if nodejs is None:
        raise JavascriptException(
            u"cwltool requires Node.js engine to evaluate and validate "
            u"Javascript expressions, but couldn't find it.  Tried {}, "
            u"docker run node:slim".format(u", ".join(trynodes)))

    # docker failed, but nodejs is installed on system but the version is below the required version
    if docker is False and required_node_version is False:
        raise JavascriptException(
            u'cwltool requires minimum v{} version of Node.js engine.'.format(
                minimum_node_version_str),
            u'Try updating: https://docs.npmjs.com/getting-started/installing-node')

    return nodejs

PROCESS_FINISHED_STR = "r1cepzbhUTxtykz5XTC4\n"

def exec_js_process(js_text,                  # type: Text
                    timeout=default_timeout,  # type: float
                    js_console=False,         # type: bool
                    context=None,             # type: Optional[Text]
                    force_docker_pull=False,  # type: bool
                   ):
    # type: (...) -> Tuple[int, Text, Text]

    if not hasattr(localdata, "procs"):
        localdata.procs = {}

    if js_console and context is not None:
        raise NotImplementedError("js_console=True and context not implemented")

    if js_console:
        js_engine = 'cwlNodeEngineJSConsole.js'
        _logger.warning(
            "Running with support for javascript console in expressions (DO NOT USE IN PRODUCTION)")
    elif context is not None:
        js_engine = "cwlNodeEngineWithContext.js"
    else:
        js_engine = 'cwlNodeEngine.js'

    created_new_process = False

    if context is not None:
        nodejs = localdata.procs.get((js_engine, context))
    else:
        nodejs = localdata.procs.get(js_engine)

    if nodejs is None \
            or nodejs.poll() is not None \
            or onWindows():
        res = resource_stream(__name__, js_engine)
        js_engine_code = res.read().decode('utf-8')

        created_new_process = True

        new_proc = new_js_proc(js_engine_code, force_docker_pull=force_docker_pull)

        if context is None:
            localdata.procs[js_engine] = new_proc
            nodejs = new_proc
        else:
            localdata.procs[(js_engine, context)] = new_proc
            nodejs = new_proc

    killed = []

    def terminate():  # type: () -> None
        """Kill the node process if it exceeds timeout limit."""
        try:
            killed.append(True)
            nodejs.kill()
        except OSError:
            pass

    timer = threading.Timer(timeout, terminate)
    timer.daemon = True
    timer.start()

    stdin_text = u""
    if created_new_process and context is not None:
        stdin_text = json_dumps(context) + "\n"
    stdin_text += json_dumps(js_text) + "\n"

    stdin_buf = BytesIO(stdin_text.encode('utf-8'))
    stdout_buf = BytesIO()
    stderr_buf = BytesIO()

    rselect = [nodejs.stdout, nodejs.stderr]  # type: List[BytesIO]
    wselect = [nodejs.stdin]  # type: List[BytesIO]


    def process_finished():  # type: () -> bool
        return stdout_buf.getvalue().decode('utf-8').endswith(PROCESS_FINISHED_STR) and \
            stderr_buf.getvalue().decode('utf-8').endswith(PROCESS_FINISHED_STR)

    # On windows system standard input/output are not handled properly by select module
    # (modules like  pywin32, msvcrt, gevent don't work either)
    if sys.platform == 'win32':
        READ_BYTES_SIZE = 512

        # creating queue for reading from a thread to queue
        input_queue = queue.Queue()
        output_queue = queue.Queue()
        error_queue = queue.Queue()

        # To tell threads that output has ended and threads can safely exit
        no_more_output = threading.Lock()
        no_more_output.acquire()
        no_more_error = threading.Lock()
        no_more_error.acquire()

        # put constructed command to input queue which then will be passed to nodejs's stdin
        def put_input(input_queue):
            while True:
                buf = stdin_buf.read(READ_BYTES_SIZE)
                if buf:
                    input_queue.put(buf)
                else:
                    break

        # get the output from nodejs's stdout and continue till output ends
        def get_output(output_queue):
            while not no_more_output.acquire(False):
                buf = os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
                if buf:
                    output_queue.put(buf)

        # get the output from nodejs's stderr and continue till error output ends
        def get_error(error_queue):
            while not no_more_error.acquire(False):
                buf = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
                if buf:
                    error_queue.put(buf)

        # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
        input_thread = threading.Thread(target=put_input, args=(input_queue,))
        input_thread.daemon = True
        input_thread.start()
        output_thread = threading.Thread(target=get_output, args=(output_queue,))
        output_thread.daemon = True
        output_thread.start()
        error_thread = threading.Thread(target=get_error, args=(error_queue,))
        error_thread.daemon = True
        error_thread.start()

        finished = False

        while not finished and timer.is_alive():
            try:
                if nodejs.stdin in wselect:
                    if not input_queue.empty():
                        os.write(nodejs.stdin.fileno(), input_queue.get())
                    elif not input_thread.is_alive():
                        wselect = []
                if nodejs.stdout in rselect:
                    if not output_queue.empty():
                        stdout_buf.write(output_queue.get())

                if nodejs.stderr in rselect:
                    if not error_queue.empty():
                        stderr_buf.write(error_queue.get())

                if process_finished() and error_queue.empty() and output_queue.empty():
                    finished = True
                    no_more_output.release()
                    no_more_error.release()
            except OSError:
                break

    else:
        while not process_finished() and timer.is_alive():
            rready, wready, _ = select.select(rselect, wselect, [])
            try:
                if nodejs.stdin in wready:
                    buf = stdin_buf.read(select.PIPE_BUF)
                    if buf:
                        os.write(nodejs.stdin.fileno(), buf)
                for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
                    if pipes[0] in rready:
                        buf = os.read(pipes[0].fileno(), select.PIPE_BUF)
                        if buf:
                            pipes[1].write(buf)
            except OSError:
                break
    timer.cancel()

    stdin_buf.close()
    stdoutdata = stdout_buf.getvalue()[:-len(PROCESS_FINISHED_STR) - 1]
    stderrdata = stderr_buf.getvalue()[:-len(PROCESS_FINISHED_STR) - 1]

    nodejs.poll()

    if nodejs.poll() not in (None, 0):
        if killed:
            returncode = -1
        else:
            returncode = nodejs.returncode
    else:
        returncode = 0
        # On windows currently a new instance of nodejs process is used due to
        # problem with blocking on read operation on windows
        if onWindows():
            nodejs.kill()

    return returncode, stdoutdata.decode('utf-8'), stderrdata.decode('utf-8')

def code_fragment_to_js(jscript, jslib=""):
    # type: (Text, Text) -> Text
    if isinstance(jscript, six.string_types) \
            and len(jscript) > 1 and jscript[0] == '{':
        inner_js = jscript
    else:
        inner_js = "{return (%s);}" % jscript

    return u"\"use strict\";\n{}\n(function(){})()".format(jslib, inner_js)

def execjs(js,                       # type: Text
           jslib,                    # type: Text
           timeout,                  # type: float
           force_docker_pull=False,  # type: bool
           debug=False,              # type: bool
           js_console=False          # type: bool
          ):  # type: (...) -> JSON

    fn = code_fragment_to_js(js, jslib)

    returncode, stdout, stderr = exec_js_process(
        fn, timeout, js_console=js_console,
        force_docker_pull=force_docker_pull)

    if js_console:
        if stderr is not None:
            _logger.info("Javascript console output:")
            _logger.info("----------------------------------------")
            _logger.info('\n'.join(re.findall(
                r'^[[](?:log|err)[]].*$', stderr, flags=re.MULTILINE)))
            _logger.info("----------------------------------------")

    def stdfmt(data):  # type: (Text) -> Text
        if "\n" in data:
            return "\n" + data.strip()
        return data

    def fn_linenum():  # type: () -> Text
        lines = fn.splitlines()
        ofs = 0
        maxlines = 99
        if len(lines) > maxlines:
            ofs = len(lines) - maxlines
            lines = lines[-maxlines:]
        return u"\n".join(u"%02i %s" % (i + ofs + 1, b) for i, b in enumerate(lines))

    if returncode != 0:
        if debug:
            info = u"returncode was: %s\nscript was:\n%s\nstdout was: %s\nstderr was: %s\n" %\
                (returncode, fn_linenum(), stdfmt(stdout), stdfmt(stderr))
        else:
            info = u"Javascript expression was: %s\nstdout was: %s\nstderr was: %s" %\
                (js, stdfmt(stdout), stdfmt(stderr))

        if returncode == -1:
            raise JavascriptException(
                u"Long-running script killed after {} seconds: {}".format(
                    timeout, info))
        else:
            raise JavascriptException(info)

    try:
        return cast(JSON, json.loads(stdout))
    except ValueError as err:
        raise_from(JavascriptException(
            u"{}\nscript was:\n{}\nstdout was: '{}'\nstderr was: '{}'\n".format(
                err, fn_linenum(), stdout, stderr)), err)