Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/expression.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/expression.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,309 +0,0 @@ -"""Parse CWL expressions.""" -from __future__ import absolute_import - -import copy -import re -from typing import (Any, Dict, List, Mapping, MutableMapping, MutableSequence, Optional, - Union) - -import six -from six import string_types, u -from future.utils import raise_from -from typing_extensions import Text # pylint: disable=unused-import -# move to a regular typing import when Python 3.3-3.6 is no longer supported - -from .sandboxjs import default_timeout, execjs, JavascriptException -from .errors import WorkflowException -from .utils import bytes2str_in_dicts, docker_windows_path_adjust, json_dumps - - -def jshead(engine_config, rootvars): - # type: (List[Text], Dict[Text, Any]) -> Text - - # make sure all the byte strings are converted - # to str in `rootvars` dict. - - return u"\n".join( - engine_config + [u"var {} = {};".format(k, json_dumps(v, indent=4)) - for k, v in rootvars.items()]) - - -# decode all raw strings to unicode -seg_symbol = r"""\w+""" -seg_single = r"""\['([^']|\\')+'\]""" -seg_double = r"""\["([^"]|\\")+"\]""" -seg_index = r"""\[[0-9]+\]""" -segments = r"(\.%s|%s|%s|%s)" % (seg_symbol, seg_single, seg_double, seg_index) -segment_re = re.compile(u(segments), flags=re.UNICODE) -param_str = r"\((%s)%s*\)$" % (seg_symbol, segments) -param_re = re.compile(u(param_str), flags=re.UNICODE) - -JSON = Union[Dict[Any, Any], List[Any], Text, int, float, bool, None] - - -class SubstitutionError(Exception): - pass - - -def scanner(scan): # type: (Text) -> List[int] - DEFAULT = 0 - DOLLAR = 1 - PAREN = 2 - BRACE = 3 - SINGLE_QUOTE = 4 - DOUBLE_QUOTE = 5 - BACKSLASH = 6 - - i = 0 - stack = [DEFAULT] - start = 0 - while i < len(scan): - state = stack[-1] - c = scan[i] - - if state == DEFAULT: - if c == '$': - stack.append(DOLLAR) - elif c == '\\': - stack.append(BACKSLASH) - elif state == BACKSLASH: - stack.pop() - if stack[-1] == DEFAULT: - return [i - 1, i + 1] - elif state == DOLLAR: - if c == '(': - start = i - 1 - stack.append(PAREN) - elif c == '{': - start = i - 1 - stack.append(BRACE) - else: - stack.pop() - elif state == PAREN: - if c == '(': - stack.append(PAREN) - elif c == ')': - stack.pop() - if stack[-1] == DOLLAR: - return [start, i + 1] - elif c == "'": - stack.append(SINGLE_QUOTE) - elif c == '"': - stack.append(DOUBLE_QUOTE) - elif state == BRACE: - if c == '{': - stack.append(BRACE) - elif c == '}': - stack.pop() - if stack[-1] == DOLLAR: - return [start, i + 1] - elif c == "'": - stack.append(SINGLE_QUOTE) - elif c == '"': - stack.append(DOUBLE_QUOTE) - elif state == SINGLE_QUOTE: - if c == "'": - stack.pop() - elif c == '\\': - stack.append(BACKSLASH) - elif state == DOUBLE_QUOTE: - if c == '"': - stack.pop() - elif c == '\\': - stack.append(BACKSLASH) - i += 1 - - if len(stack) > 1: - raise SubstitutionError( - "Substitution error, unfinished block starting at position {}: {}".format(start, scan[start:])) - else: - return [] - - -def next_seg(parsed_string, remaining_string, current_value): # type: (Text, Text, JSON) -> JSON - if remaining_string: - m = segment_re.match(remaining_string) - if not m: - return current_value - next_segment_str = m.group(0) - - key = None # type: Optional[Union[Text, int]] - if next_segment_str[0] == '.': - key = next_segment_str[1:] - elif next_segment_str[1] in ("'", '"'): - key = next_segment_str[2:-2].replace("\\'", "'").replace('\\"', '"') - - if key is not None: - if isinstance(current_value, MutableSequence) and key == "length" and not remaining_string[m.end(0):]: - return len(current_value) - if not isinstance(current_value, MutableMapping): - raise WorkflowException("%s is a %s, cannot index on string '%s'" % (parsed_string, type(current_value).__name__, key)) - if key not in current_value: - raise WorkflowException("%s does not contain key '%s'" % (parsed_string, key)) - else: - try: - key = int(next_segment_str[1:-1]) - except ValueError as v: - raise_from(WorkflowException(u(str(v))), v) - if not isinstance(current_value, MutableSequence): - raise WorkflowException("%s is a %s, cannot index on int '%s'" % (parsed_string, type(current_value).__name__, key)) - if key >= len(current_value): - raise WorkflowException("%s list index %i out of range" % (parsed_string, key)) - - if isinstance(current_value, Mapping): - try: - return next_seg(parsed_string + remaining_string, remaining_string[m.end(0):], current_value[key]) - except KeyError: - raise WorkflowException("%s doesn't have property %s" % (parsed_string, key)) - elif isinstance(current_value, list) and isinstance(key, int): - try: - return next_seg(parsed_string + remaining_string, remaining_string[m.end(0):], current_value[key]) - except KeyError: - raise WorkflowException("%s doesn't have property %s" % (parsed_string, key)) - else: - raise WorkflowException("%s doesn't have property %s" % (parsed_string, key)) - else: - return current_value - - -def evaluator(ex, # type: Text - jslib, # type: Text - obj, # type: Dict[Text, Any] - timeout, # type: float - fullJS=False, # type: bool - force_docker_pull=False, # type: bool - debug=False, # type: bool - js_console=False # type: bool - ): - # type: (...) -> JSON - match = param_re.match(ex) - - expression_parse_exception = None - expression_parse_succeeded = False - - if match is not None: - first_symbol = match.group(1) - first_symbol_end = match.end(1) - - if first_symbol_end + 1 == len(ex) and first_symbol == "null": - return None - try: - if obj.get(first_symbol) is None: - raise WorkflowException("%s is not defined" % first_symbol) - - return next_seg(first_symbol, ex[first_symbol_end:-1], obj[first_symbol]) - except WorkflowException as werr: - expression_parse_exception = werr - else: - expression_parse_succeeded = True - - if fullJS and not expression_parse_succeeded: - return execjs( - ex, jslib, timeout, force_docker_pull=force_docker_pull, - debug=debug, js_console=js_console) - else: - if expression_parse_exception is not None: - raise JavascriptException( - "Syntax error in parameter reference '%s': %s. This could be " - "due to using Javascript code without specifying " - "InlineJavascriptRequirement." % \ - (ex[1:-1], expression_parse_exception)) - else: - raise JavascriptException( - "Syntax error in parameter reference '%s'. This could be due " - "to using Javascript code without specifying " - "InlineJavascriptRequirement." % ex) - - -def interpolate(scan, # type: Text - rootvars, # type: Dict[Text, Any] - timeout=default_timeout, # type: float - fullJS=False, # type: bool - jslib="", # type: Text - force_docker_pull=False, # type: bool - debug=False, # type: bool - js_console=False, # type: bool - strip_whitespace=True # type: bool - ): # type: (...) -> JSON - if strip_whitespace: - scan = scan.strip() - parts = [] - w = scanner(scan) - while w: - parts.append(scan[0:w[0]]) - - if scan[w[0]] == '$': - e = evaluator(scan[w[0] + 1:w[1]], jslib, rootvars, timeout, - fullJS=fullJS, force_docker_pull=force_docker_pull, - debug=debug, js_console=js_console) - if w[0] == 0 and w[1] == len(scan) and len(parts) <= 1: - return e - leaf = json_dumps(e, sort_keys=True) - if leaf[0] == '"': - leaf = leaf[1:-1] - parts.append(leaf) - elif scan[w[0]] == '\\': - e = scan[w[1] - 1] - parts.append(e) - - scan = scan[w[1]:] - w = scanner(scan) - parts.append(scan) - return ''.join(parts) - -def needs_parsing(snippet): # type: (Any) -> bool - return isinstance(snippet, string_types) \ - and ("$(" in snippet or "${" in snippet) - -def do_eval(ex, # type: Union[Text, Dict[Text, Text]] - jobinput, # type: Dict[Text, JSON] - requirements, # type: List[Dict[Text, Any]] - outdir, # type: Optional[Text] - tmpdir, # type: Optional[Text] - resources, # type: Dict[str, int] - context=None, # type: Any - timeout=default_timeout, # type: float - force_docker_pull=False, # type: bool - debug=False, # type: bool - js_console=False, # type: bool - strip_whitespace=True # type: bool - ): # type: (...) -> Any - - runtime = copy.deepcopy(resources) # type: Dict[str, Any] - runtime["tmpdir"] = docker_windows_path_adjust(tmpdir) if tmpdir else None - runtime["outdir"] = docker_windows_path_adjust(outdir) if outdir else None - - rootvars = { - u"inputs": jobinput, - u"self": context, - u"runtime": runtime} - - # TODO: need to make sure the `rootvars dict` - # contains no bytes type in the first place. - if six.PY3: - rootvars = bytes2str_in_dicts(rootvars) # type: ignore - - if isinstance(ex, string_types) and needs_parsing(ex): - fullJS = False - jslib = u"" - for r in reversed(requirements): - if r["class"] == "InlineJavascriptRequirement": - fullJS = True - jslib = jshead(r.get("expressionLib", []), rootvars) - break - - try: - return interpolate(ex, - rootvars, - timeout=timeout, - fullJS=fullJS, - jslib=jslib, - force_docker_pull=force_docker_pull, - debug=debug, - js_console=js_console, - strip_whitespace=strip_whitespace) - - except Exception as e: - raise_from(WorkflowException("Expression evaluation error:\n%s" % Text(e)), e) - else: - return ex