Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/validate_js.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/validate_js.py Thu May 14 14:56:58 2020 -0400 @@ -0,0 +1,210 @@ +import copy +import itertools +import json +import logging +from collections import namedtuple +from typing import (cast, Any, Dict, List, MutableMapping, MutableSequence, + Optional, Tuple, Union) + +from pkg_resources import resource_stream +from ruamel.yaml.comments import CommentedMap # pylint: disable=unused-import +from six import string_types +from typing_extensions import Text # pylint: disable=unused-import +from schema_salad import avro +from schema_salad.sourceline import SourceLine +from schema_salad.validate import Schema # pylint: disable=unused-import +from schema_salad.validate import validate_ex + +from .errors import WorkflowException +from .expression import scanner as scan_expression +from .expression import SubstitutionError +from .loghandler import _logger +from .sandboxjs import code_fragment_to_js, exec_js_process +from .utils import json_dumps + + +def is_expression(tool, schema): + # type: (Any, Optional[Schema]) -> bool + return isinstance(schema, avro.schema.EnumSchema) \ + and schema.name == "Expression" and isinstance(tool, string_types) + +class SuppressLog(logging.Filter): + def __init__(self, name): # type: (Text) -> None + """Initialize this log suppressor.""" + name = str(name) + super(SuppressLog, self).__init__(name) + + def filter(self, record): # type: (logging.LogRecord) -> bool + return False + + +_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") +_logger_validation_warnings.addFilter(SuppressLog("cwltool.validation_warnings")) + +def get_expressions(tool, # type: Union[CommentedMap, Text] + schema, # type: Optional[avro.schema.Schema] + source_line=None # type: Optional[SourceLine] + ): # type: (...) -> List[Tuple[Text, Optional[SourceLine]]] + if is_expression(tool, schema): + return [(cast(Text, tool), source_line)] + elif isinstance(schema, avro.schema.UnionSchema): + valid_schema = None + + for possible_schema in schema.schemas: + if is_expression(tool, possible_schema): + return [(cast(Text, tool), source_line)] + elif validate_ex(possible_schema, tool, raise_ex=False, + logger=_logger_validation_warnings): + valid_schema = possible_schema + + return get_expressions(tool, valid_schema, source_line) + elif isinstance(schema, avro.schema.ArraySchema): + if not isinstance(tool, MutableSequence): + return [] + + return list(itertools.chain( + *map(lambda x: get_expressions(x[1], schema.items, SourceLine(tool, x[0])), enumerate(tool)) + )) + + elif isinstance(schema, avro.schema.RecordSchema): + if not isinstance(tool, MutableMapping): + return [] + + expression_nodes = [] + + for schema_field in schema.fields: + if schema_field.name in tool: + expression_nodes.extend(get_expressions( + tool[schema_field.name], + schema_field.type, + SourceLine(tool, schema_field.name) + )) + + return expression_nodes + else: + return [] + + +JSHintJSReturn = namedtuple("jshint_return", ["errors", "globals"]) + +def jshint_js(js_text, # type: Text + globals=None, # type: Optional[List[Text]] + options=None # type: Optional[Dict[Text, Union[List[Text], Text, int]]] + ): # type: (...) -> Tuple[List[Text], List[Text]] + if globals is None: + globals = [] + if options is None: + options = { + "includewarnings": [ + "W117", # <VARIABLE> not defined + "W104", "W119" # using ES6 features + ], + "strict": "implied", + "esversion": 5 + } + + with resource_stream(__name__, "jshint/jshint.js") as file: + # NOTE: we need a global variable for lodash (which jshint depends on) + jshint_functions_text = "var global = this;" + file.read().decode('utf-8') + + with resource_stream(__name__, "jshint/jshint_wrapper.js") as file: + # NOTE: we need to assign to ob, as the expression {validateJS: validateJS} as an expression + # is interpreted as a block with a label `validateJS` + jshint_functions_text += "\n" + file.read().decode('utf-8') + "\nvar ob = {validateJS: validateJS}; ob" + + returncode, stdout, stderr = exec_js_process( + "validateJS(%s)" % json_dumps({ + "code": js_text, + "options": options, + "globals": globals + }), + timeout=30, + context=jshint_functions_text + ) + + def dump_jshint_error(): + # type: () -> None + raise RuntimeError("jshint failed to run succesfully\nreturncode: %d\nstdout: \"%s\"\nstderr: \"%s\"" % ( + returncode, + stdout, + stderr + )) + + if returncode == -1: + _logger.warning("jshint process timed out") + + if returncode != 0: + dump_jshint_error() + + try: + jshint_json = json.loads(stdout) + except ValueError: + dump_jshint_error() + + jshint_errors = [] # type: List[Text] + + js_text_lines = js_text.split("\n") + + for jshint_error_obj in jshint_json.get("errors", []): + text = u"JSHINT: " + js_text_lines[jshint_error_obj["line"] - 1] + "\n" + text += u"JSHINT: " + " " * (jshint_error_obj["character"] - 1) + "^\n" + text += u"JSHINT: %s: %s" % (jshint_error_obj["code"], jshint_error_obj["reason"]) + jshint_errors.append(text) + + return JSHintJSReturn(jshint_errors, jshint_json.get("globals", [])) + + +def print_js_hint_messages(js_hint_messages, source_line): + # type: (List[Text], Optional[SourceLine]) -> None + if source_line is not None: + for js_hint_message in js_hint_messages: + _logger.warning(source_line.makeError(js_hint_message)) + +def validate_js_expressions(tool, # type: CommentedMap + schema, # type: Schema + jshint_options=None # type: Optional[Dict[Text, Union[List[Text], Text, int]]] + ): # type: (...) -> None + + if tool.get("requirements") is None: + return + + requirements = tool["requirements"] + + default_globals = [u"self", u"inputs", u"runtime", u"console"] + + for prop in reversed(requirements): + if prop["class"] == "InlineJavascriptRequirement": + expression_lib = prop.get("expressionLib", []) + break + else: + return + + js_globals = copy.deepcopy(default_globals) + + for i, expression_lib_line in enumerate(expression_lib): + expression_lib_line_errors, expression_lib_line_globals = jshint_js(expression_lib_line, js_globals, jshint_options) + js_globals.extend(expression_lib_line_globals) + print_js_hint_messages(expression_lib_line_errors, SourceLine(expression_lib, i)) + + expressions = get_expressions(tool, schema) + + for expression, source_line in expressions: + unscanned_str = expression.strip() + try: + scan_slice = scan_expression(unscanned_str) + except SubstitutionError as se: + if source_line: + source_line.raise_type = WorkflowException + raise source_line.makeError(str(se)) + else: + raise se + + while scan_slice: + if unscanned_str[scan_slice[0]] == '$': + code_fragment = unscanned_str[scan_slice[0] + 1:scan_slice[1]] + code_fragment_js = code_fragment_to_js(code_fragment, "") + expression_errors, _ = jshint_js(code_fragment_js, js_globals, jshint_options) + print_js_hint_messages(expression_errors, source_line) + + unscanned_str = unscanned_str[scan_slice[1]:] + scan_slice = scan_expression(unscanned_str)