Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/expression.py @ 4:79f47841a781 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:47:39 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
"""Parse CWL expressions.""" from __future__ import absolute_import import copy import re from typing import (Any, Dict, List, Mapping, MutableMapping, MutableSequence, Optional, Union) import six from six import string_types, u from future.utils import raise_from from typing_extensions import Text # pylint: disable=unused-import # move to a regular typing import when Python 3.3-3.6 is no longer supported from .sandboxjs import default_timeout, execjs, JavascriptException from .errors import WorkflowException from .utils import bytes2str_in_dicts, docker_windows_path_adjust, json_dumps def jshead(engine_config, rootvars): # type: (List[Text], Dict[Text, Any]) -> Text # make sure all the byte strings are converted # to str in `rootvars` dict. return u"\n".join( engine_config + [u"var {} = {};".format(k, json_dumps(v, indent=4)) for k, v in rootvars.items()]) # decode all raw strings to unicode seg_symbol = r"""\w+""" seg_single = r"""\['([^']|\\')+'\]""" seg_double = r"""\["([^"]|\\")+"\]""" seg_index = r"""\[[0-9]+\]""" segments = r"(\.%s|%s|%s|%s)" % (seg_symbol, seg_single, seg_double, seg_index) segment_re = re.compile(u(segments), flags=re.UNICODE) param_str = r"\((%s)%s*\)$" % (seg_symbol, segments) param_re = re.compile(u(param_str), flags=re.UNICODE) JSON = Union[Dict[Any, Any], List[Any], Text, int, float, bool, None] class SubstitutionError(Exception): pass def scanner(scan): # type: (Text) -> List[int] DEFAULT = 0 DOLLAR = 1 PAREN = 2 BRACE = 3 SINGLE_QUOTE = 4 DOUBLE_QUOTE = 5 BACKSLASH = 6 i = 0 stack = [DEFAULT] start = 0 while i < len(scan): state = stack[-1] c = scan[i] if state == DEFAULT: if c == '$': stack.append(DOLLAR) elif c == '\\': stack.append(BACKSLASH) elif state == BACKSLASH: stack.pop() if stack[-1] == DEFAULT: return [i - 1, i + 1] elif state == DOLLAR: if c == '(': start = i - 1 stack.append(PAREN) elif c == '{': start = i - 1 stack.append(BRACE) else: stack.pop() elif state == PAREN: if c == '(': stack.append(PAREN) elif c == ')': stack.pop() if stack[-1] == DOLLAR: return [start, i + 1] elif c == "'": stack.append(SINGLE_QUOTE) elif c == '"': stack.append(DOUBLE_QUOTE) elif state == BRACE: if c == '{': stack.append(BRACE) elif c == '}': stack.pop() if stack[-1] == DOLLAR: return [start, i + 1] elif c == "'": stack.append(SINGLE_QUOTE) elif c == '"': stack.append(DOUBLE_QUOTE) elif state == SINGLE_QUOTE: if c == "'": stack.pop() elif c == '\\': stack.append(BACKSLASH) elif state == DOUBLE_QUOTE: if c == '"': stack.pop() elif c == '\\': stack.append(BACKSLASH) i += 1 if len(stack) > 1: raise SubstitutionError( "Substitution error, unfinished block starting at position {}: {}".format(start, scan[start:])) else: return [] def next_seg(parsed_string, remaining_string, current_value): # type: (Text, Text, JSON) -> JSON if remaining_string: m = segment_re.match(remaining_string) if not m: return current_value next_segment_str = m.group(0) key = None # type: Optional[Union[Text, int]] if next_segment_str[0] == '.': key = next_segment_str[1:] elif next_segment_str[1] in ("'", '"'): key = next_segment_str[2:-2].replace("\\'", "'").replace('\\"', '"') if key is not None: if isinstance(current_value, MutableSequence) and key == "length" and not remaining_string[m.end(0):]: return len(current_value) if not isinstance(current_value, MutableMapping): raise WorkflowException("%s is a %s, cannot index on string '%s'" % (parsed_string, type(current_value).__name__, key)) if key not in current_value: raise WorkflowException("%s does not contain key '%s'" % (parsed_string, key)) else: try: key = int(next_segment_str[1:-1]) except ValueError as v: raise_from(WorkflowException(u(str(v))), v) if not isinstance(current_value, MutableSequence): raise WorkflowException("%s is a %s, cannot index on int '%s'" % (parsed_string, type(current_value).__name__, key)) if key >= len(current_value): raise WorkflowException("%s list index %i out of range" % (parsed_string, key)) if isinstance(current_value, Mapping): try: return next_seg(parsed_string + remaining_string, remaining_string[m.end(0):], current_value[key]) except KeyError: raise WorkflowException("%s doesn't have property %s" % (parsed_string, key)) elif isinstance(current_value, list) and isinstance(key, int): try: return next_seg(parsed_string + remaining_string, remaining_string[m.end(0):], current_value[key]) except KeyError: raise WorkflowException("%s doesn't have property %s" % (parsed_string, key)) else: raise WorkflowException("%s doesn't have property %s" % (parsed_string, key)) else: return current_value def evaluator(ex, # type: Text jslib, # type: Text obj, # type: Dict[Text, Any] timeout, # type: float fullJS=False, # type: bool force_docker_pull=False, # type: bool debug=False, # type: bool js_console=False # type: bool ): # type: (...) -> JSON match = param_re.match(ex) expression_parse_exception = None expression_parse_succeeded = False if match is not None: first_symbol = match.group(1) first_symbol_end = match.end(1) if first_symbol_end + 1 == len(ex) and first_symbol == "null": return None try: if obj.get(first_symbol) is None: raise WorkflowException("%s is not defined" % first_symbol) return next_seg(first_symbol, ex[first_symbol_end:-1], obj[first_symbol]) except WorkflowException as werr: expression_parse_exception = werr else: expression_parse_succeeded = True if fullJS and not expression_parse_succeeded: return execjs( ex, jslib, timeout, force_docker_pull=force_docker_pull, debug=debug, js_console=js_console) else: if expression_parse_exception is not None: raise JavascriptException( "Syntax error in parameter reference '%s': %s. This could be " "due to using Javascript code without specifying " "InlineJavascriptRequirement." % \ (ex[1:-1], expression_parse_exception)) else: raise JavascriptException( "Syntax error in parameter reference '%s'. This could be due " "to using Javascript code without specifying " "InlineJavascriptRequirement." % ex) def interpolate(scan, # type: Text rootvars, # type: Dict[Text, Any] timeout=default_timeout, # type: float fullJS=False, # type: bool jslib="", # type: Text force_docker_pull=False, # type: bool debug=False, # type: bool js_console=False, # type: bool strip_whitespace=True # type: bool ): # type: (...) -> JSON if strip_whitespace: scan = scan.strip() parts = [] w = scanner(scan) while w: parts.append(scan[0:w[0]]) if scan[w[0]] == '$': e = evaluator(scan[w[0] + 1:w[1]], jslib, rootvars, timeout, fullJS=fullJS, force_docker_pull=force_docker_pull, debug=debug, js_console=js_console) if w[0] == 0 and w[1] == len(scan) and len(parts) <= 1: return e leaf = json_dumps(e, sort_keys=True) if leaf[0] == '"': leaf = leaf[1:-1] parts.append(leaf) elif scan[w[0]] == '\\': e = scan[w[1] - 1] parts.append(e) scan = scan[w[1]:] w = scanner(scan) parts.append(scan) return ''.join(parts) def needs_parsing(snippet): # type: (Any) -> bool return isinstance(snippet, string_types) \ and ("$(" in snippet or "${" in snippet) def do_eval(ex, # type: Union[Text, Dict[Text, Text]] jobinput, # type: Dict[Text, JSON] requirements, # type: List[Dict[Text, Any]] outdir, # type: Optional[Text] tmpdir, # type: Optional[Text] resources, # type: Dict[str, int] context=None, # type: Any timeout=default_timeout, # type: float force_docker_pull=False, # type: bool debug=False, # type: bool js_console=False, # type: bool strip_whitespace=True # type: bool ): # type: (...) -> Any runtime = copy.deepcopy(resources) # type: Dict[str, Any] runtime["tmpdir"] = docker_windows_path_adjust(tmpdir) if tmpdir else None runtime["outdir"] = docker_windows_path_adjust(outdir) if outdir else None rootvars = { u"inputs": jobinput, u"self": context, u"runtime": runtime} # TODO: need to make sure the `rootvars dict` # contains no bytes type in the first place. if six.PY3: rootvars = bytes2str_in_dicts(rootvars) # type: ignore if isinstance(ex, string_types) and needs_parsing(ex): fullJS = False jslib = u"" for r in reversed(requirements): if r["class"] == "InlineJavascriptRequirement": fullJS = True jslib = jshead(r.get("expressionLib", []), rootvars) break try: return interpolate(ex, rootvars, timeout=timeout, fullJS=fullJS, jslib=jslib, force_docker_pull=force_docker_pull, debug=debug, js_console=js_console, strip_whitespace=strip_whitespace) except Exception as e: raise_from(WorkflowException("Expression evaluation error:\n%s" % Text(e)), e) else: return ex