Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/factory.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/factory.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,64 +0,0 @@ -from __future__ import absolute_import - -import os -from typing import Callable as tCallable # pylint: disable=unused-import -from typing import Any, Dict, Optional, Tuple, Union - -from typing_extensions import Text # pylint: disable=unused-import -# move to a regular typing import when Python 3.3-3.6 is no longer supported - -from . import load_tool -from .context import LoadingContext, RuntimeContext -from .executors import SingleJobExecutor -from .process import Process - - -class WorkflowStatus(Exception): - def __init__(self, out, status): - # type: (Dict[Text,Any], Text) -> None - """Signaling exception for the status of a Workflow.""" - super(WorkflowStatus, self).__init__("Completed %s" % status) - self.out = out - self.status = status - - -class Callable(object): - def __init__(self, t, factory): # type: (Process, Factory) -> None - """Initialize.""" - self.t = t - self.factory = factory - - def __call__(self, **kwargs): - # type: (**Any) -> Union[Text, Dict[Text, Text]] - runtime_context = self.factory.runtime_context.copy() - runtime_context.basedir = os.getcwd() - out, status = self.factory.executor(self.t, kwargs, runtime_context) - if status != "success": - raise WorkflowStatus(out, status) - else: - return out - -class Factory(object): - def __init__(self, - executor=None, # type: Optional[tCallable[...,Tuple[Dict[Text,Any], Text]]] - loading_context=None, # type: Optional[LoadingContext] - runtime_context=None # type: Optional[RuntimeContext] - ): # type: (...) -> None - """Easy way to load a CWL document for execution.""" - if executor is None: - executor = SingleJobExecutor() - self.executor = executor - self.loading_context = loading_context - if loading_context is None: - self.loading_context = LoadingContext() - if runtime_context is None: - self.runtime_context = RuntimeContext() - else: - self.runtime_context = runtime_context - - def make(self, cwl): # type: (Union[Text, Dict[Text, Any]]) -> Callable - """Instantiate a CWL object from a CWl document.""" - load = load_tool.load_tool(cwl, self.loading_context) - if isinstance(load, int): - raise Exception("Error loading tool") - return Callable(load, self)