comparison env/lib/python3.7/site-packages/cwltool/factory.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 from __future__ import absolute_import
2
3 import os
4 from typing import Callable as tCallable # pylint: disable=unused-import
5 from typing import Any, Dict, Optional, Tuple, Union
6
7 from typing_extensions import Text # pylint: disable=unused-import
8 # move to a regular typing import when Python 3.3-3.6 is no longer supported
9
10 from . import load_tool
11 from .context import LoadingContext, RuntimeContext
12 from .executors import SingleJobExecutor
13 from .process import Process
14
15
16 class WorkflowStatus(Exception):
17 def __init__(self, out, status):
18 # type: (Dict[Text,Any], Text) -> None
19 """Signaling exception for the status of a Workflow."""
20 super(WorkflowStatus, self).__init__("Completed %s" % status)
21 self.out = out
22 self.status = status
23
24
25 class Callable(object):
26 def __init__(self, t, factory): # type: (Process, Factory) -> None
27 """Initialize."""
28 self.t = t
29 self.factory = factory
30
31 def __call__(self, **kwargs):
32 # type: (**Any) -> Union[Text, Dict[Text, Text]]
33 runtime_context = self.factory.runtime_context.copy()
34 runtime_context.basedir = os.getcwd()
35 out, status = self.factory.executor(self.t, kwargs, runtime_context)
36 if status != "success":
37 raise WorkflowStatus(out, status)
38 else:
39 return out
40
41 class Factory(object):
42 def __init__(self,
43 executor=None, # type: Optional[tCallable[...,Tuple[Dict[Text,Any], Text]]]
44 loading_context=None, # type: Optional[LoadingContext]
45 runtime_context=None # type: Optional[RuntimeContext]
46 ): # type: (...) -> None
47 """Easy way to load a CWL document for execution."""
48 if executor is None:
49 executor = SingleJobExecutor()
50 self.executor = executor
51 self.loading_context = loading_context
52 if loading_context is None:
53 self.loading_context = LoadingContext()
54 if runtime_context is None:
55 self.runtime_context = RuntimeContext()
56 else:
57 self.runtime_context = runtime_context
58
59 def make(self, cwl): # type: (Union[Text, Dict[Text, Any]]) -> Callable
60 """Instantiate a CWL object from a CWl document."""
61 load = load_tool.load_tool(cwl, self.loading_context)
62 if isinstance(load, int):
63 raise Exception("Error loading tool")
64 return Callable(load, self)