Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/validate_js.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import copy import itertools import json import logging from collections import namedtuple from typing import ( Any, Dict, List, MutableMapping, MutableSequence, Optional, Tuple, Union, cast, ) from pkg_resources import resource_stream from ruamel.yaml.comments import CommentedMap from schema_salad.avro.schema import ( ArraySchema, EnumSchema, RecordSchema, Schema, UnionSchema, ) from schema_salad.sourceline import SourceLine from schema_salad.utils import json_dumps from schema_salad.validate import validate_ex from .errors import WorkflowException from .expression import SubstitutionError from .expression import scanner as scan_expression from .loghandler import _logger from .sandboxjs import code_fragment_to_js, exec_js_process def is_expression(tool, schema): # type: (Any, Optional[Schema]) -> bool return ( isinstance(schema, EnumSchema) and schema.name == "Expression" and isinstance(tool, str) ) class SuppressLog(logging.Filter): def __init__(self, name): # type: (str) -> None """Initialize this log suppressor.""" name = str(name) super().__init__(name) def filter(self, record): # type: (logging.LogRecord) -> bool return False _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") _logger_validation_warnings.addFilter(SuppressLog("cwltool.validation_warnings")) def get_expressions( tool: Union[CommentedMap, str], schema: Optional[Union[Schema, ArraySchema]], source_line: Optional[SourceLine] = None, ) -> List[Tuple[str, Optional[SourceLine]]]: if is_expression(tool, schema): return [(cast(str, tool), source_line)] elif isinstance(schema, UnionSchema): valid_schema = None for possible_schema in schema.schemas: if is_expression(tool, possible_schema): return [(cast(str, tool), source_line)] elif validate_ex( possible_schema, tool, raise_ex=False, logger=_logger_validation_warnings, ): valid_schema = possible_schema return get_expressions(tool, valid_schema, source_line) elif isinstance(schema, ArraySchema): if not isinstance(tool, MutableSequence): return [] return list( itertools.chain( *map( lambda x: get_expressions( x[1], schema.items, SourceLine(tool, x[0]) # type: ignore ), enumerate(tool), ) ) ) elif isinstance(schema, RecordSchema): if not isinstance(tool, MutableMapping): return [] expression_nodes = [] for schema_field in schema.fields: if schema_field.name in tool: expression_nodes.extend( get_expressions( tool[schema_field.name], schema_field.type, SourceLine(tool, schema_field.name), ) ) return expression_nodes else: return [] JSHintJSReturn = namedtuple("JSHintJSReturn", ["errors", "globals"]) def jshint_js( js_text: str, globals: Optional[List[str]] = None, options: Optional[Dict[str, Union[List[str], str, int]]] = None, ) -> JSHintJSReturn: if globals is None: globals = [] if options is None: options = { "includewarnings": [ "W117", # <VARIABLE> not defined "W104", "W119", # using ES6 features ], "strict": "implied", "esversion": 5, } with resource_stream(__name__, "jshint/jshint.js") as res: # NOTE: we need a global variable for lodash (which jshint depends on) jshint_functions_text = "var global = this;" + res.read().decode("utf-8") with resource_stream(__name__, "jshint/jshint_wrapper.js") as res2: # NOTE: we need to assign to ob, as the expression {validateJS: validateJS} as an expression # is interpreted as a block with a label `validateJS` jshint_functions_text += ( "\n" + res2.read().decode("utf-8") + "\nvar ob = {validateJS: validateJS}; ob" ) returncode, stdout, stderr = exec_js_process( "validateJS(%s)" % json_dumps({"code": js_text, "options": options, "globals": globals}), timeout=30, context=jshint_functions_text, ) def dump_jshint_error(): # type: () -> None raise RuntimeError( 'jshint failed to run succesfully\nreturncode: %d\nstdout: "%s"\nstderr: "%s"' % (returncode, stdout, stderr) ) if returncode == -1: _logger.warning("jshint process timed out") if returncode != 0: dump_jshint_error() try: jshint_json = json.loads(stdout) except ValueError: dump_jshint_error() jshint_errors = [] # type: List[str] js_text_lines = js_text.split("\n") for jshint_error_obj in jshint_json.get("errors", []): text = "JSHINT: " + js_text_lines[jshint_error_obj["line"] - 1] + "\n" text += "JSHINT: " + " " * (jshint_error_obj["character"] - 1) + "^\n" text += "JSHINT: {}: {}".format( jshint_error_obj["code"], jshint_error_obj["reason"], ) jshint_errors.append(text) return JSHintJSReturn(jshint_errors, jshint_json.get("globals", [])) def print_js_hint_messages( js_hint_messages: List[str], source_line: Optional[SourceLine] ) -> None: if source_line is not None: for js_hint_message in js_hint_messages: _logger.warning(source_line.makeError(js_hint_message)) def validate_js_expressions( tool: CommentedMap, schema: Schema, jshint_options: Optional[Dict[str, Union[List[str], str, int]]] = None, ) -> None: if tool.get("requirements") is None: return requirements = tool["requirements"] default_globals = ["self", "inputs", "runtime", "console"] for prop in reversed(requirements): if prop["class"] == "InlineJavascriptRequirement": expression_lib = prop.get("expressionLib", []) break else: return js_globals = copy.deepcopy(default_globals) for i, expression_lib_line in enumerate(expression_lib): expression_lib_line_errors, expression_lib_line_globals = jshint_js( expression_lib_line, js_globals, jshint_options ) js_globals.extend(expression_lib_line_globals) print_js_hint_messages( expression_lib_line_errors, SourceLine(expression_lib, i) ) expressions = get_expressions(tool, schema) for expression, source_line in expressions: unscanned_str = expression.strip() try: scan_slice = scan_expression(unscanned_str) except SubstitutionError as se: if source_line: source_line.raise_type = WorkflowException raise source_line.makeError(str(se)) else: raise se while scan_slice: if unscanned_str[scan_slice[0]] == "$": code_fragment = unscanned_str[scan_slice[0] + 1 : scan_slice[1]] code_fragment_js = code_fragment_to_js(code_fragment, "") expression_errors, _ = jshint_js( code_fragment_js, js_globals, jshint_options ) print_js_hint_messages(expression_errors, source_line) unscanned_str = unscanned_str[scan_slice[1] :] scan_slice = scan_expression(unscanned_str)