Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/factory.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 import os | |
4 from typing import Callable as tCallable # pylint: disable=unused-import | |
5 from typing import Any, Dict, Optional, Tuple, Union | |
6 | |
7 from typing_extensions import Text # pylint: disable=unused-import | |
8 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
9 | |
10 from . import load_tool | |
11 from .context import LoadingContext, RuntimeContext | |
12 from .executors import SingleJobExecutor | |
13 from .process import Process | |
14 | |
15 | |
16 class WorkflowStatus(Exception): | |
17 def __init__(self, out, status): | |
18 # type: (Dict[Text,Any], Text) -> None | |
19 """Signaling exception for the status of a Workflow.""" | |
20 super(WorkflowStatus, self).__init__("Completed %s" % status) | |
21 self.out = out | |
22 self.status = status | |
23 | |
24 | |
25 class Callable(object): | |
26 def __init__(self, t, factory): # type: (Process, Factory) -> None | |
27 """Initialize.""" | |
28 self.t = t | |
29 self.factory = factory | |
30 | |
31 def __call__(self, **kwargs): | |
32 # type: (**Any) -> Union[Text, Dict[Text, Text]] | |
33 runtime_context = self.factory.runtime_context.copy() | |
34 runtime_context.basedir = os.getcwd() | |
35 out, status = self.factory.executor(self.t, kwargs, runtime_context) | |
36 if status != "success": | |
37 raise WorkflowStatus(out, status) | |
38 else: | |
39 return out | |
40 | |
41 class Factory(object): | |
42 def __init__(self, | |
43 executor=None, # type: Optional[tCallable[...,Tuple[Dict[Text,Any], Text]]] | |
44 loading_context=None, # type: Optional[LoadingContext] | |
45 runtime_context=None # type: Optional[RuntimeContext] | |
46 ): # type: (...) -> None | |
47 """Easy way to load a CWL document for execution.""" | |
48 if executor is None: | |
49 executor = SingleJobExecutor() | |
50 self.executor = executor | |
51 self.loading_context = loading_context | |
52 if loading_context is None: | |
53 self.loading_context = LoadingContext() | |
54 if runtime_context is None: | |
55 self.runtime_context = RuntimeContext() | |
56 else: | |
57 self.runtime_context = runtime_context | |
58 | |
59 def make(self, cwl): # type: (Union[Text, Dict[Text, Any]]) -> Callable | |
60 """Instantiate a CWL object from a CWl document.""" | |
61 load = load_tool.load_tool(cwl, self.loading_context) | |
62 if isinstance(load, int): | |
63 raise Exception("Error loading tool") | |
64 return Callable(load, self) |