diff env/lib/python3.7/site-packages/cwltool/factory.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/factory.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,64 @@
+from __future__ import absolute_import
+
+import os
+from typing import Callable as tCallable  # pylint: disable=unused-import
+from typing import Any, Dict, Optional, Tuple, Union
+
+from typing_extensions import Text  # pylint: disable=unused-import
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from . import load_tool
+from .context import LoadingContext, RuntimeContext
+from .executors import SingleJobExecutor
+from .process import Process
+
+
+class WorkflowStatus(Exception):
+    def __init__(self, out, status):
+        # type: (Dict[Text,Any], Text) -> None
+        """Signaling exception for the status of a Workflow."""
+        super(WorkflowStatus, self).__init__("Completed %s" % status)
+        self.out = out
+        self.status = status
+
+
+class Callable(object):
+    def __init__(self, t, factory):  # type: (Process, Factory) -> None
+        """Initialize."""
+        self.t = t
+        self.factory = factory
+
+    def __call__(self, **kwargs):
+        # type: (**Any) -> Union[Text, Dict[Text, Text]]
+        runtime_context = self.factory.runtime_context.copy()
+        runtime_context.basedir = os.getcwd()
+        out, status = self.factory.executor(self.t, kwargs, runtime_context)
+        if status != "success":
+            raise WorkflowStatus(out, status)
+        else:
+            return out
+
+class Factory(object):
+    def __init__(self,
+                 executor=None,         # type: Optional[tCallable[...,Tuple[Dict[Text,Any], Text]]]
+                 loading_context=None,  # type: Optional[LoadingContext]
+                 runtime_context=None   # type: Optional[RuntimeContext]
+                ):  # type: (...) -> None
+        """Easy way to load a CWL document for execution."""
+        if executor is None:
+            executor = SingleJobExecutor()
+        self.executor = executor
+        self.loading_context = loading_context
+        if loading_context is None:
+            self.loading_context = LoadingContext()
+        if runtime_context is None:
+            self.runtime_context = RuntimeContext()
+        else:
+            self.runtime_context = runtime_context
+
+    def make(self, cwl):  # type: (Union[Text, Dict[Text, Any]]) -> Callable
+        """Instantiate a CWL object from a CWl document."""
+        load = load_tool.load_tool(cwl, self.loading_context)
+        if isinstance(load, int):
+            raise Exception("Error loading tool")
+        return Callable(load, self)