Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/factory.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
from __future__ import absolute_import import os from typing import Callable as tCallable # pylint: disable=unused-import from typing import Any, Dict, Optional, Tuple, Union from typing_extensions import Text # pylint: disable=unused-import # move to a regular typing import when Python 3.3-3.6 is no longer supported from . import load_tool from .context import LoadingContext, RuntimeContext from .executors import SingleJobExecutor from .process import Process class WorkflowStatus(Exception): def __init__(self, out, status): # type: (Dict[Text,Any], Text) -> None """Signaling exception for the status of a Workflow.""" super(WorkflowStatus, self).__init__("Completed %s" % status) self.out = out self.status = status class Callable(object): def __init__(self, t, factory): # type: (Process, Factory) -> None """Initialize.""" self.t = t self.factory = factory def __call__(self, **kwargs): # type: (**Any) -> Union[Text, Dict[Text, Text]] runtime_context = self.factory.runtime_context.copy() runtime_context.basedir = os.getcwd() out, status = self.factory.executor(self.t, kwargs, runtime_context) if status != "success": raise WorkflowStatus(out, status) else: return out class Factory(object): def __init__(self, executor=None, # type: Optional[tCallable[...,Tuple[Dict[Text,Any], Text]]] loading_context=None, # type: Optional[LoadingContext] runtime_context=None # type: Optional[RuntimeContext] ): # type: (...) -> None """Easy way to load a CWL document for execution.""" if executor is None: executor = SingleJobExecutor() self.executor = executor self.loading_context = loading_context if loading_context is None: self.loading_context = LoadingContext() if runtime_context is None: self.runtime_context = RuntimeContext() else: self.runtime_context = runtime_context def make(self, cwl): # type: (Union[Text, Dict[Text, Any]]) -> Callable """Instantiate a CWL object from a CWl document.""" load = load_tool.load_tool(cwl, self.loading_context) if isinstance(load, int): raise Exception("Error loading tool") return Callable(load, self)