Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/validate_js.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
import copy import itertools import json import logging from collections import namedtuple from typing import (cast, Any, Dict, List, MutableMapping, MutableSequence, Optional, Tuple, Union) from pkg_resources import resource_stream from ruamel.yaml.comments import CommentedMap # pylint: disable=unused-import from six import string_types from typing_extensions import Text # pylint: disable=unused-import from schema_salad import avro from schema_salad.sourceline import SourceLine from schema_salad.validate import Schema # pylint: disable=unused-import from schema_salad.validate import validate_ex from .errors import WorkflowException from .expression import scanner as scan_expression from .expression import SubstitutionError from .loghandler import _logger from .sandboxjs import code_fragment_to_js, exec_js_process from .utils import json_dumps def is_expression(tool, schema): # type: (Any, Optional[Schema]) -> bool return isinstance(schema, avro.schema.EnumSchema) \ and schema.name == "Expression" and isinstance(tool, string_types) class SuppressLog(logging.Filter): def __init__(self, name): # type: (Text) -> None """Initialize this log suppressor.""" name = str(name) super(SuppressLog, self).__init__(name) def filter(self, record): # type: (logging.LogRecord) -> bool return False _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") _logger_validation_warnings.addFilter(SuppressLog("cwltool.validation_warnings")) def get_expressions(tool, # type: Union[CommentedMap, Text] schema, # type: Optional[avro.schema.Schema] source_line=None # type: Optional[SourceLine] ): # type: (...) -> List[Tuple[Text, Optional[SourceLine]]] if is_expression(tool, schema): return [(cast(Text, tool), source_line)] elif isinstance(schema, avro.schema.UnionSchema): valid_schema = None for possible_schema in schema.schemas: if is_expression(tool, possible_schema): return [(cast(Text, tool), source_line)] elif validate_ex(possible_schema, tool, raise_ex=False, logger=_logger_validation_warnings): valid_schema = possible_schema return get_expressions(tool, valid_schema, source_line) elif isinstance(schema, avro.schema.ArraySchema): if not isinstance(tool, MutableSequence): return [] return list(itertools.chain( *map(lambda x: get_expressions(x[1], schema.items, SourceLine(tool, x[0])), enumerate(tool)) )) elif isinstance(schema, avro.schema.RecordSchema): if not isinstance(tool, MutableMapping): return [] expression_nodes = [] for schema_field in schema.fields: if schema_field.name in tool: expression_nodes.extend(get_expressions( tool[schema_field.name], schema_field.type, SourceLine(tool, schema_field.name) )) return expression_nodes else: return [] JSHintJSReturn = namedtuple("jshint_return", ["errors", "globals"]) def jshint_js(js_text, # type: Text globals=None, # type: Optional[List[Text]] options=None # type: Optional[Dict[Text, Union[List[Text], Text, int]]] ): # type: (...) -> Tuple[List[Text], List[Text]] if globals is None: globals = [] if options is None: options = { "includewarnings": [ "W117", # <VARIABLE> not defined "W104", "W119" # using ES6 features ], "strict": "implied", "esversion": 5 } with resource_stream(__name__, "jshint/jshint.js") as file: # NOTE: we need a global variable for lodash (which jshint depends on) jshint_functions_text = "var global = this;" + file.read().decode('utf-8') with resource_stream(__name__, "jshint/jshint_wrapper.js") as file: # NOTE: we need to assign to ob, as the expression {validateJS: validateJS} as an expression # is interpreted as a block with a label `validateJS` jshint_functions_text += "\n" + file.read().decode('utf-8') + "\nvar ob = {validateJS: validateJS}; ob" returncode, stdout, stderr = exec_js_process( "validateJS(%s)" % json_dumps({ "code": js_text, "options": options, "globals": globals }), timeout=30, context=jshint_functions_text ) def dump_jshint_error(): # type: () -> None raise RuntimeError("jshint failed to run succesfully\nreturncode: %d\nstdout: \"%s\"\nstderr: \"%s\"" % ( returncode, stdout, stderr )) if returncode == -1: _logger.warning("jshint process timed out") if returncode != 0: dump_jshint_error() try: jshint_json = json.loads(stdout) except ValueError: dump_jshint_error() jshint_errors = [] # type: List[Text] js_text_lines = js_text.split("\n") for jshint_error_obj in jshint_json.get("errors", []): text = u"JSHINT: " + js_text_lines[jshint_error_obj["line"] - 1] + "\n" text += u"JSHINT: " + " " * (jshint_error_obj["character"] - 1) + "^\n" text += u"JSHINT: %s: %s" % (jshint_error_obj["code"], jshint_error_obj["reason"]) jshint_errors.append(text) return JSHintJSReturn(jshint_errors, jshint_json.get("globals", [])) def print_js_hint_messages(js_hint_messages, source_line): # type: (List[Text], Optional[SourceLine]) -> None if source_line is not None: for js_hint_message in js_hint_messages: _logger.warning(source_line.makeError(js_hint_message)) def validate_js_expressions(tool, # type: CommentedMap schema, # type: Schema jshint_options=None # type: Optional[Dict[Text, Union[List[Text], Text, int]]] ): # type: (...) -> None if tool.get("requirements") is None: return requirements = tool["requirements"] default_globals = [u"self", u"inputs", u"runtime", u"console"] for prop in reversed(requirements): if prop["class"] == "InlineJavascriptRequirement": expression_lib = prop.get("expressionLib", []) break else: return js_globals = copy.deepcopy(default_globals) for i, expression_lib_line in enumerate(expression_lib): expression_lib_line_errors, expression_lib_line_globals = jshint_js(expression_lib_line, js_globals, jshint_options) js_globals.extend(expression_lib_line_globals) print_js_hint_messages(expression_lib_line_errors, SourceLine(expression_lib, i)) expressions = get_expressions(tool, schema) for expression, source_line in expressions: unscanned_str = expression.strip() try: scan_slice = scan_expression(unscanned_str) except SubstitutionError as se: if source_line: source_line.raise_type = WorkflowException raise source_line.makeError(str(se)) else: raise se while scan_slice: if unscanned_str[scan_slice[0]] == '$': code_fragment = unscanned_str[scan_slice[0] + 1:scan_slice[1]] code_fragment_js = code_fragment_to_js(code_fragment, "") expression_errors, _ = jshint_js(code_fragment_js, js_globals, jshint_options) print_js_hint_messages(expression_errors, source_line) unscanned_str = unscanned_str[scan_slice[1]:] scan_slice = scan_expression(unscanned_str)