diff env/lib/python3.7/site-packages/cwltool/factory.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/factory.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,64 +0,0 @@
-from __future__ import absolute_import
-
-import os
-from typing import Callable as tCallable  # pylint: disable=unused-import
-from typing import Any, Dict, Optional, Tuple, Union
-
-from typing_extensions import Text  # pylint: disable=unused-import
-# move to a regular typing import when Python 3.3-3.6 is no longer supported
-
-from . import load_tool
-from .context import LoadingContext, RuntimeContext
-from .executors import SingleJobExecutor
-from .process import Process
-
-
-class WorkflowStatus(Exception):
-    def __init__(self, out, status):
-        # type: (Dict[Text,Any], Text) -> None
-        """Signaling exception for the status of a Workflow."""
-        super(WorkflowStatus, self).__init__("Completed %s" % status)
-        self.out = out
-        self.status = status
-
-
-class Callable(object):
-    def __init__(self, t, factory):  # type: (Process, Factory) -> None
-        """Initialize."""
-        self.t = t
-        self.factory = factory
-
-    def __call__(self, **kwargs):
-        # type: (**Any) -> Union[Text, Dict[Text, Text]]
-        runtime_context = self.factory.runtime_context.copy()
-        runtime_context.basedir = os.getcwd()
-        out, status = self.factory.executor(self.t, kwargs, runtime_context)
-        if status != "success":
-            raise WorkflowStatus(out, status)
-        else:
-            return out
-
-class Factory(object):
-    def __init__(self,
-                 executor=None,         # type: Optional[tCallable[...,Tuple[Dict[Text,Any], Text]]]
-                 loading_context=None,  # type: Optional[LoadingContext]
-                 runtime_context=None   # type: Optional[RuntimeContext]
-                ):  # type: (...) -> None
-        """Easy way to load a CWL document for execution."""
-        if executor is None:
-            executor = SingleJobExecutor()
-        self.executor = executor
-        self.loading_context = loading_context
-        if loading_context is None:
-            self.loading_context = LoadingContext()
-        if runtime_context is None:
-            self.runtime_context = RuntimeContext()
-        else:
-            self.runtime_context = runtime_context
-
-    def make(self, cwl):  # type: (Union[Text, Dict[Text, Any]]) -> Callable
-        """Instantiate a CWL object from a CWl document."""
-        load = load_tool.load_tool(cwl, self.loading_context)
-        if isinstance(load, int):
-            raise Exception("Error loading tool")
-        return Callable(load, self)