view env/lib/python3.7/site-packages/cwltool/procgenerator.py @ 3:758bc20232e8 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:20:52 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

import copy
from .process import Process, shortname
from .load_tool import load_tool
from schema_salad import validate
from schema_salad.sourceline import indent
from .errors import WorkflowException
from .context import RuntimeContext, LoadingContext
from typing import (Any, Callable, Dict, Generator, Iterable, List,
                    Mapping, MutableMapping, MutableSequence,
                    Optional, Tuple, Union, cast)
from .loghandler import _logger
from typing_extensions import Text  # pylint: disable=unused-import

class ProcessGeneratorJob(object):
    def __init__(self, procgenerator):
        # type: (ProcessGenerator) -> None
        self.procgenerator = procgenerator
        self.jobout = None         # type: Optional[Dict[Text, Any]]
        self.processStatus = None  # type: Optional[Text]

    def receive_output(self, jobout, processStatus):
        # type: (Dict[Text, Any], Text) -> None
        self.jobout = jobout
        self.processStatus = processStatus

    def job(self,
            job_order,         # type: Mapping[Text, Any]
            output_callbacks,  # type: Callable[[Any, Any], Any]
            runtimeContext     # type: RuntimeContext
           ):  # type: (...) -> Generator[Any, None, None]
        # FIXME: Declare base type for what Generator yields

        try:
            for tool in self.procgenerator.embedded_tool.job(
                    job_order,
                    self.receive_output,
                    runtimeContext):
                yield tool

            while self.processStatus is None:
                yield None

            if self.processStatus != "success":
                output_callbacks(self.jobout, self.processStatus)
                return

            if self.jobout is None:
                raise WorkflowException("jobout should not be None")

            created_tool, runinputs = self.procgenerator.result(job_order, self.jobout, runtimeContext)

            for tool in created_tool.job(
                    runinputs,
                    output_callbacks,
                    runtimeContext):
                yield tool

        except WorkflowException:
            raise
        except Exception as exc:
            _logger.exception("Unexpected exception")
            raise WorkflowException(Text(exc))


class ProcessGenerator(Process):
    def __init__(self,
                 toolpath_object,      # type: MutableMapping[Text, Any]
                 loadingContext        # type: LoadingContext
    ):  # type: (...) -> None
        super(ProcessGenerator, self).__init__(
            toolpath_object, loadingContext)
        self.loadingContext = loadingContext  # type: LoadingContext
        try:
            if isinstance(toolpath_object["run"], MutableMapping):
                self.embedded_tool = loadingContext.construct_tool_object(
                    toolpath_object["run"], loadingContext)  # type: Process
            else:
                loadingContext.metadata = {}
                self.embedded_tool = load_tool(
                    toolpath_object["run"], loadingContext)
        except validate.ValidationException as vexc:
            if loadingContext.debug:
                _logger.exception("Validation exception")
            raise WorkflowException(
                u"Tool definition %s failed validation:\n%s" %
                (toolpath_object["run"], indent(str(vexc))))

    def job(self,
            job_order,         # type: Mapping[Text, Text]
            output_callbacks,  # type: Callable[[Any, Any], Any]
            runtimeContext     # type: RuntimeContext
           ):  # type: (...) -> Generator[Any, None, None]
        # FIXME: Declare base type for what Generator yields
        return ProcessGeneratorJob(self).job(job_order, output_callbacks, runtimeContext)

    def result(self,
            job_order,      # type: Mapping[Text, Any]
            jobout,         # type: Mapping[Text, Any]
            runtimeContext  # type: RuntimeContext
    ):  # type: (...) -> Tuple[Process, MutableMapping[Text, Any]]
        try:
            loadingContext = self.loadingContext.copy()
            loadingContext.metadata = {}
            embedded_tool = load_tool(
                jobout["runProcess"]["location"], loadingContext)
        except validate.ValidationException as vexc:
            if runtimeContext.debug:
                _logger.exception("Validation exception")
            raise WorkflowException(
                u"Tool definition %s failed validation:\n%s" %
                (jobout["runProcess"], indent(str(vexc))))

        if "runInputs" in jobout:
            runinputs = cast(MutableMapping[Text, Any], jobout["runInputs"])
        else:
            runinputs = cast(MutableMapping[Text, Any], copy.deepcopy(job_order))
            for i in self.embedded_tool.tool["inputs"]:
                if shortname(i["id"]) in runinputs:
                    del runinputs[shortname(i["id"])]
            if "id" in runinputs:
                del runinputs["id"]

        return embedded_tool, runinputs