view env/lib/python3.7/site-packages/cwltool/expression.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line source

"""Parse CWL expressions."""
from __future__ import absolute_import

import copy
import re
from typing import (Any, Dict, List, Mapping, MutableMapping, MutableSequence, Optional,
                    Union)

import six
from six import string_types, u
from future.utils import raise_from
from typing_extensions import Text  # pylint: disable=unused-import
# move to a regular typing import when Python 3.3-3.6 is no longer supported

from .sandboxjs import default_timeout, execjs, JavascriptException
from .errors import WorkflowException
from .utils import bytes2str_in_dicts, docker_windows_path_adjust, json_dumps


def jshead(engine_config, rootvars):
    # type: (List[Text], Dict[Text, Any]) -> Text

    # make sure all the byte strings are converted
    # to str in `rootvars` dict.

    return u"\n".join(
        engine_config + [u"var {} = {};".format(k, json_dumps(v, indent=4))
                         for k, v in rootvars.items()])


# decode all raw strings to unicode
seg_symbol = r"""\w+"""
seg_single = r"""\['([^']|\\')+'\]"""
seg_double = r"""\["([^"]|\\")+"\]"""
seg_index = r"""\[[0-9]+\]"""
segments = r"(\.%s|%s|%s|%s)" % (seg_symbol, seg_single, seg_double, seg_index)
segment_re = re.compile(u(segments), flags=re.UNICODE)
param_str = r"\((%s)%s*\)$" % (seg_symbol, segments)
param_re = re.compile(u(param_str), flags=re.UNICODE)

JSON = Union[Dict[Any, Any], List[Any], Text, int, float, bool, None]


class SubstitutionError(Exception):
    pass


def scanner(scan):  # type: (Text) -> List[int]
    DEFAULT = 0
    DOLLAR = 1
    PAREN = 2
    BRACE = 3
    SINGLE_QUOTE = 4
    DOUBLE_QUOTE = 5
    BACKSLASH = 6

    i = 0
    stack = [DEFAULT]
    start = 0
    while i < len(scan):
        state = stack[-1]
        c = scan[i]

        if state == DEFAULT:
            if c == '$':
                stack.append(DOLLAR)
            elif c == '\\':
                stack.append(BACKSLASH)
        elif state == BACKSLASH:
            stack.pop()
            if stack[-1] == DEFAULT:
                return [i - 1, i + 1]
        elif state == DOLLAR:
            if c == '(':
                start = i - 1
                stack.append(PAREN)
            elif c == '{':
                start = i - 1
                stack.append(BRACE)
            else:
                stack.pop()
        elif state == PAREN:
            if c == '(':
                stack.append(PAREN)
            elif c == ')':
                stack.pop()
                if stack[-1] == DOLLAR:
                    return [start, i + 1]
            elif c == "'":
                stack.append(SINGLE_QUOTE)
            elif c == '"':
                stack.append(DOUBLE_QUOTE)
        elif state == BRACE:
            if c == '{':
                stack.append(BRACE)
            elif c == '}':
                stack.pop()
                if stack[-1] == DOLLAR:
                    return [start, i + 1]
            elif c == "'":
                stack.append(SINGLE_QUOTE)
            elif c == '"':
                stack.append(DOUBLE_QUOTE)
        elif state == SINGLE_QUOTE:
            if c == "'":
                stack.pop()
            elif c == '\\':
                stack.append(BACKSLASH)
        elif state == DOUBLE_QUOTE:
            if c == '"':
                stack.pop()
            elif c == '\\':
                stack.append(BACKSLASH)
        i += 1

    if len(stack) > 1:
        raise SubstitutionError(
            "Substitution error, unfinished block starting at position {}: {}".format(start, scan[start:]))
    else:
        return []


def next_seg(parsed_string, remaining_string, current_value):  # type: (Text, Text, JSON) -> JSON
    if remaining_string:
        m = segment_re.match(remaining_string)
        if not m:
            return current_value
        next_segment_str = m.group(0)

        key = None  # type: Optional[Union[Text, int]]
        if next_segment_str[0] == '.':
            key = next_segment_str[1:]
        elif next_segment_str[1] in ("'", '"'):
            key = next_segment_str[2:-2].replace("\\'", "'").replace('\\"', '"')

        if key is not None:
            if isinstance(current_value, MutableSequence) and key == "length" and not remaining_string[m.end(0):]:
                return len(current_value)
            if not isinstance(current_value, MutableMapping):
                raise WorkflowException("%s is a %s, cannot index on string '%s'" % (parsed_string, type(current_value).__name__, key))
            if key not in current_value:
                raise WorkflowException("%s does not contain key '%s'" % (parsed_string, key))
        else:
            try:
                key = int(next_segment_str[1:-1])
            except ValueError as v:
                raise_from(WorkflowException(u(str(v))), v)
            if not isinstance(current_value, MutableSequence):
                raise WorkflowException("%s is a %s, cannot index on int '%s'" % (parsed_string, type(current_value).__name__, key))
            if key >= len(current_value):
                raise WorkflowException("%s list index %i out of range" % (parsed_string, key))

        if isinstance(current_value, Mapping):
            try:
                return next_seg(parsed_string + remaining_string, remaining_string[m.end(0):], current_value[key])
            except KeyError:
                raise WorkflowException("%s doesn't have property %s" % (parsed_string, key))
        elif isinstance(current_value, list) and isinstance(key, int):
            try:
                return next_seg(parsed_string + remaining_string, remaining_string[m.end(0):], current_value[key])
            except KeyError:
                raise WorkflowException("%s doesn't have property %s" % (parsed_string, key))
        else:
            raise WorkflowException("%s doesn't have property %s" % (parsed_string, key))
    else:
        return current_value


def evaluator(ex,                       # type: Text
              jslib,                    # type: Text
              obj,                      # type: Dict[Text, Any]
              timeout,                  # type: float
              fullJS=False,             # type: bool
              force_docker_pull=False,  # type: bool
              debug=False,              # type: bool
              js_console=False          # type: bool
             ):
    # type: (...) -> JSON
    match = param_re.match(ex)

    expression_parse_exception = None
    expression_parse_succeeded = False

    if match is not None:
        first_symbol = match.group(1)
        first_symbol_end = match.end(1)

        if first_symbol_end + 1 == len(ex) and first_symbol == "null":
            return None
        try:
            if obj.get(first_symbol) is None:
                raise WorkflowException("%s is not defined" % first_symbol)

            return next_seg(first_symbol, ex[first_symbol_end:-1], obj[first_symbol])
        except WorkflowException as werr:
            expression_parse_exception = werr
        else:
            expression_parse_succeeded = True

    if fullJS and not expression_parse_succeeded:
        return execjs(
            ex, jslib, timeout, force_docker_pull=force_docker_pull,
            debug=debug, js_console=js_console)
    else:
        if expression_parse_exception is not None:
            raise JavascriptException(
                "Syntax error in parameter reference '%s': %s. This could be "
                "due to using Javascript code without specifying "
                "InlineJavascriptRequirement." % \
                    (ex[1:-1], expression_parse_exception))
        else:
            raise JavascriptException(
                "Syntax error in parameter reference '%s'. This could be due "
                "to using Javascript code without specifying "
                "InlineJavascriptRequirement." % ex)


def interpolate(scan,                     # type: Text
                rootvars,                 # type: Dict[Text, Any]
                timeout=default_timeout,  # type: float
                fullJS=False,             # type: bool
                jslib="",                 # type: Text
                force_docker_pull=False,  # type: bool
                debug=False,              # type: bool
                js_console=False,         # type: bool
                strip_whitespace=True     # type: bool
               ):  # type: (...) -> JSON
    if strip_whitespace:
        scan = scan.strip()
    parts = []
    w = scanner(scan)
    while w:
        parts.append(scan[0:w[0]])

        if scan[w[0]] == '$':
            e = evaluator(scan[w[0] + 1:w[1]], jslib, rootvars, timeout,
                          fullJS=fullJS, force_docker_pull=force_docker_pull,
                          debug=debug, js_console=js_console)
            if w[0] == 0 and w[1] == len(scan) and len(parts) <= 1:
                return e
            leaf = json_dumps(e, sort_keys=True)
            if leaf[0] == '"':
                leaf = leaf[1:-1]
            parts.append(leaf)
        elif scan[w[0]] == '\\':
            e = scan[w[1] - 1]
            parts.append(e)

        scan = scan[w[1]:]
        w = scanner(scan)
    parts.append(scan)
    return ''.join(parts)

def needs_parsing(snippet):  # type: (Any) -> bool
    return isinstance(snippet, string_types) \
        and ("$(" in snippet or "${" in snippet)

def do_eval(ex,                       # type: Union[Text, Dict[Text, Text]]
            jobinput,                 # type: Dict[Text, JSON]
            requirements,             # type: List[Dict[Text, Any]]
            outdir,                   # type: Optional[Text]
            tmpdir,                   # type: Optional[Text]
            resources,                # type: Dict[str, int]
            context=None,             # type: Any
            timeout=default_timeout,  # type: float
            force_docker_pull=False,  # type: bool
            debug=False,              # type: bool
            js_console=False,         # type: bool
            strip_whitespace=True     # type: bool
           ):  # type: (...) -> Any

    runtime = copy.deepcopy(resources)  # type: Dict[str, Any]
    runtime["tmpdir"] = docker_windows_path_adjust(tmpdir) if tmpdir else None
    runtime["outdir"] = docker_windows_path_adjust(outdir) if outdir else None

    rootvars = {
        u"inputs": jobinput,
        u"self": context,
        u"runtime": runtime}

    # TODO: need to make sure the `rootvars dict`
    # contains no bytes type in the first place.
    if six.PY3:
        rootvars = bytes2str_in_dicts(rootvars)  # type: ignore

    if isinstance(ex, string_types) and needs_parsing(ex):
        fullJS = False
        jslib = u""
        for r in reversed(requirements):
            if r["class"] == "InlineJavascriptRequirement":
                fullJS = True
                jslib = jshead(r.get("expressionLib", []), rootvars)
                break

        try:
            return interpolate(ex,
                               rootvars,
                               timeout=timeout,
                               fullJS=fullJS,
                               jslib=jslib,
                               force_docker_pull=force_docker_pull,
                               debug=debug,
                               js_console=js_console,
                               strip_whitespace=strip_whitespace)

        except Exception as e:
            raise_from(WorkflowException("Expression evaluation error:\n%s" % Text(e)), e)
    else:
        return ex