view planemo/lib/python3.7/site-packages/cwltool/procgenerator.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
line wrap: on
line source

import copy
from .process import Process, shortname
from .load_tool import load_tool
from schema_salad import validate
from schema_salad.sourceline import indent
from .errors import WorkflowException
from .context import RuntimeContext, LoadingContext
from typing import (Any, Callable, Dict, Generator, Iterable, List,
                    Mapping, MutableMapping, MutableSequence,
                    Optional, Tuple, Union, cast)
from .loghandler import _logger
from typing_extensions import Text  # pylint: disable=unused-import

class ProcessGeneratorJob(object):
    def __init__(self, procgenerator):
        # type: (ProcessGenerator) -> None
        self.procgenerator = procgenerator
        self.jobout = None         # type: Optional[Dict[Text, Any]]
        self.processStatus = None  # type: Optional[Text]

    def receive_output(self, jobout, processStatus):
        # type: (Dict[Text, Any], Text) -> None
        self.jobout = jobout
        self.processStatus = processStatus

    def job(self,
            job_order,         # type: Mapping[Text, Any]
            output_callbacks,  # type: Callable[[Any, Any], Any]
            runtimeContext     # type: RuntimeContext
           ):  # type: (...) -> Generator[Any, None, None]
        # FIXME: Declare base type for what Generator yields

        try:
            for tool in self.procgenerator.embedded_tool.job(
                    job_order,
                    self.receive_output,
                    runtimeContext):
                yield tool

            while self.processStatus is None:
                yield None

            if self.processStatus != "success":
                output_callbacks(self.jobout, self.processStatus)
                return

            if self.jobout is None:
                raise WorkflowException("jobout should not be None")

            created_tool, runinputs = self.procgenerator.result(job_order, self.jobout, runtimeContext)

            for tool in created_tool.job(
                    runinputs,
                    output_callbacks,
                    runtimeContext):
                yield tool

        except WorkflowException:
            raise
        except Exception as exc:
            _logger.exception("Unexpected exception")
            raise WorkflowException(Text(exc))


class ProcessGenerator(Process):
    def __init__(self,
                 toolpath_object,      # type: MutableMapping[Text, Any]
                 loadingContext        # type: LoadingContext
    ):  # type: (...) -> None
        super(ProcessGenerator, self).__init__(
            toolpath_object, loadingContext)
        self.loadingContext = loadingContext  # type: LoadingContext
        try:
            if isinstance(toolpath_object["run"], MutableMapping):
                self.embedded_tool = loadingContext.construct_tool_object(
                    toolpath_object["run"], loadingContext)  # type: Process
            else:
                loadingContext.metadata = {}
                self.embedded_tool = load_tool(
                    toolpath_object["run"], loadingContext)
        except validate.ValidationException as vexc:
            if loadingContext.debug:
                _logger.exception("Validation exception")
            raise WorkflowException(
                u"Tool definition %s failed validation:\n%s" %
                (toolpath_object["run"], indent(str(vexc))))

    def job(self,
            job_order,         # type: Mapping[Text, Text]
            output_callbacks,  # type: Callable[[Any, Any], Any]
            runtimeContext     # type: RuntimeContext
           ):  # type: (...) -> Generator[Any, None, None]
        # FIXME: Declare base type for what Generator yields
        return ProcessGeneratorJob(self).job(job_order, output_callbacks, runtimeContext)

    def result(self,
            job_order,      # type: Mapping[Text, Any]
            jobout,         # type: Mapping[Text, Any]
            runtimeContext  # type: RuntimeContext
    ):  # type: (...) -> Tuple[Process, MutableMapping[Text, Any]]
        try:
            loadingContext = self.loadingContext.copy()
            loadingContext.metadata = {}
            embedded_tool = load_tool(
                jobout["runProcess"]["location"], loadingContext)
        except validate.ValidationException as vexc:
            if runtimeContext.debug:
                _logger.exception("Validation exception")
            raise WorkflowException(
                u"Tool definition %s failed validation:\n%s" %
                (jobout["runProcess"], indent(str(vexc))))

        if "runInputs" in jobout:
            runinputs = cast(MutableMapping[Text, Any], jobout["runInputs"])
        else:
            runinputs = cast(MutableMapping[Text, Any], copy.deepcopy(job_order))
            for i in self.embedded_tool.tool["inputs"]:
                if shortname(i["id"]) in runinputs:
                    del runinputs[shortname(i["id"])]
            if "id" in runinputs:
                del runinputs["id"]

        return embedded_tool, runinputs