Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/expression.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
"""Parse CWL expressions.""" import copy import json import re from typing import ( Any, Dict, List, Mapping, MutableMapping, MutableSequence, Optional, Tuple, Union, cast, ) from schema_salad.utils import json_dumps from .errors import WorkflowException from .loghandler import _logger from .sandboxjs import JavascriptException, default_timeout, execjs from .utils import ( CWLObjectType, CWLOutputType, bytes2str_in_dicts, docker_windows_path_adjust, ) def jshead(engine_config: List[str], rootvars: CWLObjectType) -> str: # make sure all the byte strings are converted # to str in `rootvars` dict. return "\n".join( engine_config + [ "var {} = {};".format(k, json_dumps(v, indent=4)) for k, v in rootvars.items() ] ) # decode all raw strings to unicode seg_symbol = r"""\w+""" seg_single = r"""\['([^']|\\')+'\]""" seg_double = r"""\["([^"]|\\")+"\]""" seg_index = r"""\[[0-9]+\]""" segments = fr"(\.{seg_symbol}|{seg_single}|{seg_double}|{seg_index})" segment_re = re.compile(segments, flags=re.UNICODE) param_str = fr"\(({seg_symbol}){segments}*\)$" param_re = re.compile(param_str, flags=re.UNICODE) class SubstitutionError(Exception): pass def scanner(scan: str) -> Optional[Tuple[int, int]]: DEFAULT = 0 DOLLAR = 1 PAREN = 2 BRACE = 3 SINGLE_QUOTE = 4 DOUBLE_QUOTE = 5 BACKSLASH = 6 i = 0 stack = [DEFAULT] start = 0 while i < len(scan): state = stack[-1] c = scan[i] if state == DEFAULT: if c == "$": stack.append(DOLLAR) elif c == "\\": stack.append(BACKSLASH) elif state == BACKSLASH: stack.pop() if stack[-1] == DEFAULT: return (i - 1, i + 1) elif state == DOLLAR: if c == "(": start = i - 1 stack.append(PAREN) elif c == "{": start = i - 1 stack.append(BRACE) else: stack.pop() i -= 1 elif state == PAREN: if c == "(": stack.append(PAREN) elif c == ")": stack.pop() if stack[-1] == DOLLAR: return (start, i + 1) elif c == "'": stack.append(SINGLE_QUOTE) elif c == '"': stack.append(DOUBLE_QUOTE) elif state == BRACE: if c == "{": stack.append(BRACE) elif c == "}": stack.pop() if stack[-1] == DOLLAR: return (start, i + 1) elif c == "'": stack.append(SINGLE_QUOTE) elif c == '"': stack.append(DOUBLE_QUOTE) elif state == SINGLE_QUOTE: if c == "'": stack.pop() elif c == "\\": stack.append(BACKSLASH) elif state == DOUBLE_QUOTE: if c == '"': stack.pop() elif c == "\\": stack.append(BACKSLASH) i += 1 if len(stack) > 1 and not (len(stack) == 2 and stack[1] in (BACKSLASH, DOLLAR)): raise SubstitutionError( "Substitution error, unfinished block starting at position {}: '{}' stack was {}".format( start, scan[start:], stack ) ) return None def next_seg( parsed_string: str, remaining_string: str, current_value: CWLOutputType ) -> CWLOutputType: if remaining_string: m = segment_re.match(remaining_string) if not m: return current_value next_segment_str = m.group(0) key = None # type: Optional[Union[str, int]] if next_segment_str[0] == ".": key = next_segment_str[1:] elif next_segment_str[1] in ("'", '"'): key = next_segment_str[2:-2].replace("\\'", "'").replace('\\"', '"') if key is not None: if ( isinstance(current_value, MutableSequence) and key == "length" and not remaining_string[m.end(0) :] ): return len(current_value) if not isinstance(current_value, MutableMapping): raise WorkflowException( "%s is a %s, cannot index on string '%s'" % (parsed_string, type(current_value).__name__, key) ) if key not in current_value: raise WorkflowException(f"{parsed_string} does not contain key '{key}'") else: try: key = int(next_segment_str[1:-1]) except ValueError as v: raise WorkflowException(str(v)) from v if not isinstance(current_value, MutableSequence): raise WorkflowException( "%s is a %s, cannot index on int '%s'" % (parsed_string, type(current_value).__name__, key) ) if key and key >= len(current_value): raise WorkflowException( "%s list index %i out of range" % (parsed_string, key) ) if isinstance(current_value, Mapping): try: return next_seg( parsed_string + remaining_string, remaining_string[m.end(0) :], cast(CWLOutputType, current_value[cast(str, key)]), ) except KeyError: raise WorkflowException(f"{parsed_string} doesn't have property {key}") elif isinstance(current_value, list) and isinstance(key, int): try: return next_seg( parsed_string + remaining_string, remaining_string[m.end(0) :], current_value[key], ) except KeyError: raise WorkflowException(f"{parsed_string} doesn't have property {key}") else: raise WorkflowException(f"{parsed_string} doesn't have property {key}") else: return current_value def evaluator( ex: str, jslib: str, obj: CWLObjectType, timeout: float, fullJS: bool = False, force_docker_pull: bool = False, debug: bool = False, js_console: bool = False, ) -> Optional[CWLOutputType]: match = param_re.match(ex) expression_parse_exception = None expression_parse_succeeded = False if match is not None: first_symbol = match.group(1) first_symbol_end = match.end(1) if first_symbol_end + 1 == len(ex) and first_symbol == "null": return None try: if obj.get(first_symbol) is None: raise WorkflowException("%s is not defined" % first_symbol) return next_seg( first_symbol, ex[first_symbol_end:-1], cast(CWLOutputType, obj[first_symbol]), ) except WorkflowException as werr: expression_parse_exception = werr else: expression_parse_succeeded = True if fullJS and not expression_parse_succeeded: return execjs( ex, jslib, timeout, force_docker_pull=force_docker_pull, debug=debug, js_console=js_console, ) else: if expression_parse_exception is not None: raise JavascriptException( "Syntax error in parameter reference '%s': %s. This could be " "due to using Javascript code without specifying " "InlineJavascriptRequirement." % (ex[1:-1], expression_parse_exception) ) else: raise JavascriptException( "Syntax error in parameter reference '%s'. This could be due " "to using Javascript code without specifying " "InlineJavascriptRequirement." % ex ) def _convert_dumper(string: str) -> str: return "{} + ".format(json.dumps(string)) def interpolate( scan: str, rootvars: CWLObjectType, timeout: float = default_timeout, fullJS: bool = False, jslib: str = "", force_docker_pull: bool = False, debug: bool = False, js_console: bool = False, strip_whitespace: bool = True, escaping_behavior: int = 2, convert_to_expression: bool = False, ) -> Optional[CWLOutputType]: """ Interpolate and evaluate. Note: only call with convert_to_expression=True on CWL Expressions in $() form that need interpolation. """ if strip_whitespace: scan = scan.strip() parts = [] if convert_to_expression: dump = _convert_dumper parts.append("${return ") else: dump = lambda x: x w = scanner(scan) while w: if convert_to_expression: parts.append('"{}" + '.format(scan[0 : w[0]])) else: parts.append(scan[0 : w[0]]) if scan[w[0]] == "$": if not convert_to_expression: e = evaluator( scan[w[0] + 1 : w[1]], jslib, rootvars, timeout, fullJS=fullJS, force_docker_pull=force_docker_pull, debug=debug, js_console=js_console, ) if w[0] == 0 and w[1] == len(scan) and len(parts) <= 1: return e leaf = json_dumps(e, sort_keys=True) if leaf[0] == '"': leaf = json.loads(leaf) parts.append(leaf) else: parts.append( "function(){var item =" + scan[w[0] : w[1]][2:-1] + '; if (typeof(item) === "string"){ return item; } else { return JSON.stringify(item); }}() + ' ) elif scan[w[0]] == "\\": if escaping_behavior == 1: # Old behavior. Just skip the next character. e = scan[w[1] - 1] parts.append(dump(e)) elif escaping_behavior == 2: # Backslash quoting requires a three character lookahead. e = scan[w[0] : w[1] + 1] if e in ("\\$(", "\\${"): # Suppress start of a parameter reference, drop the # backslash. parts.append(dump(e[1:])) w = (w[0], w[1] + 1) elif e[1] == "\\": # Double backslash, becomes a single backslash parts.append(dump("\\")) else: # Some other text, add it as-is (including the # backslash) and resume scanning. parts.append(dump(e[:2])) else: raise Exception("Unknown escaping behavior %s" % escaping_behavior) scan = scan[w[1] :] w = scanner(scan) if convert_to_expression: parts.append(f'"{scan}"') parts.append(";}") else: parts.append(scan) return "".join(parts) def needs_parsing(snippet: Any) -> bool: return isinstance(snippet, str) and ("$(" in snippet or "${" in snippet) def do_eval( ex: Optional[CWLOutputType], jobinput: CWLObjectType, requirements: List[CWLObjectType], outdir: Optional[str], tmpdir: Optional[str], resources: Dict[str, Union[float, int, str]], context: Optional[CWLOutputType] = None, timeout: float = default_timeout, force_docker_pull: bool = False, debug: bool = False, js_console: bool = False, strip_whitespace: bool = True, cwlVersion: str = "", ) -> Optional[CWLOutputType]: runtime = cast(MutableMapping[str, Union[int, str, None]], copy.deepcopy(resources)) runtime["tmpdir"] = docker_windows_path_adjust(tmpdir) if tmpdir else None runtime["outdir"] = docker_windows_path_adjust(outdir) if outdir else None rootvars = cast( CWLObjectType, bytes2str_in_dicts({"inputs": jobinput, "self": context, "runtime": runtime}), ) if isinstance(ex, str) and needs_parsing(ex): fullJS = False jslib = "" for r in reversed(requirements): if r["class"] == "InlineJavascriptRequirement": fullJS = True jslib = jshead(cast(List[str], r.get("expressionLib", [])), rootvars) break try: return interpolate( ex, rootvars, timeout=timeout, fullJS=fullJS, jslib=jslib, force_docker_pull=force_docker_pull, debug=debug, js_console=js_console, strip_whitespace=strip_whitespace, escaping_behavior=1 if cwlVersion in ( "v1.0", "v1.1.0-dev1", "v1.1", "v1.2.0-dev1", "v1.2.0-dev2", "v1.2.0-dev3", ) else 2, ) except Exception as e: _logger.exception(e) raise WorkflowException("Expression evaluation error:\n%s" % str(e)) from e else: return ex