Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/context.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/context.py Thu May 14 14:56:58 2020 -0400 @@ -0,0 +1,148 @@ +"""Shared context objects that replace use of kwargs.""" +import copy +import threading # pylint: disable=unused-import +from typing import (Any, Callable, Dict, Iterable, List, MutableMapping, + Optional) + +from schema_salad import schema +from schema_salad.ref_resolver import (ContextType, # pylint: disable=unused-import + Fetcher, Loader) +from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import + Text) +# move to a regular typing import when Python 3.3-3.6 is no longer supported +from .builder import Builder, HasReqsHints +from .mutation import MutationManager +from .pathmapper import PathMapper +from .secrets import SecretStore +from .software_requirements import DependenciesConfiguration +from .stdfsaccess import StdFsAccess +from .utils import DEFAULT_TMP_PREFIX + +if TYPE_CHECKING: + from .process import Process + from .provenance import (ResearchObject, # pylint: disable=unused-import + ProvenanceProfile) + +class ContextBase(object): + def __init__(self, kwargs=None): # type: (Optional[Dict[str, Any]]) -> None + """Initialize.""" + if kwargs: + for k, v in kwargs.items(): + if hasattr(self, k): + setattr(self, k, v) + +def make_tool_notimpl(toolpath_object, # type: MutableMapping[Text, Any] + loadingContext # type: LoadingContext + ): # type: (...) -> Process + raise NotImplementedError() + + +default_make_tool = make_tool_notimpl # type: Callable[[MutableMapping[Text, Any], LoadingContext], Process] + +class LoadingContext(ContextBase): + + def __init__(self, kwargs=None): # type: (Optional[Dict[str, Any]]) -> None + """Initialize the LoadingContext from the kwargs.""" + self.debug = False # type: bool + self.metadata = {} # type: Dict[Text, Any] + self.requirements = None # type: Optional[List[Dict[Text, Any]]] + self.hints = None # type: Optional[List[Dict[Text, Any]]] + self.overrides_list = [] # type: List[Dict[Text, Any]] + self.loader = None # type: Optional[Loader] + self.avsc_names = None # type: Optional[schema.Names] + self.disable_js_validation = False # type: bool + self.js_hint_options_file = None + self.do_validate = True # type: bool + self.enable_dev = False # type: bool + self.strict = True # type: bool + self.resolver = None + self.fetcher_constructor = None + self.construct_tool_object = default_make_tool + self.research_obj = None # type: Optional[ResearchObject] + self.orcid = '' # type: str + self.cwl_full_name = "" # type: str + self.host_provenance = False # type: bool + self.user_provenance = False # type: bool + self.prov_obj = None # type: Optional[ProvenanceProfile] + self.do_update = None # type: Optional[bool] + self.jobdefaults = None # type: Optional[MutableMapping[Text, Any]] + + super(LoadingContext, self).__init__(kwargs) + + def copy(self): + # type: () -> LoadingContext + return copy.copy(self) + +class RuntimeContext(ContextBase): + def __init__(self, kwargs=None): # type: (Optional[Dict[str, Any]]) -> None + """Initializet the RuntimeContext from the kwargs.""" + select_resources_callable = Callable[ # pylint: disable=unused-variable + [Dict[str, int], RuntimeContext], Dict[str, int]] + self.user_space_docker_cmd = "" # type: Text + self.secret_store = None # type: Optional[SecretStore] + self.no_read_only = False # type: bool + self.custom_net = "" # type: Text + self.no_match_user = False # type: bool + self.preserve_environment = "" # type: Optional[Iterable[str]] + self.preserve_entire_environment = False # type: bool + self.use_container = True # type: bool + self.force_docker_pull = False # type: bool + + self.tmp_outdir_prefix = DEFAULT_TMP_PREFIX # type: Text + self.tmpdir_prefix = DEFAULT_TMP_PREFIX # type: Text + self.tmpdir = "" # type: Text + self.rm_tmpdir = True # type: bool + self.pull_image = True # type: bool + self.rm_container = True # type: bool + self.move_outputs = "move" # type: Text + + self.singularity = False # type: bool + self.disable_net = False # type: bool + self.debug = False # type: bool + self.compute_checksum = True # type: bool + self.name = "" # type: Text + self.default_container = "" # type: Text + self.find_default_container = None # type: Optional[Callable[[HasReqsHints], Optional[Text]]] + self.cachedir = None # type: Optional[Text] + self.outdir = None # type: Optional[Text] + self.stagedir = "" # type: Text + self.part_of = "" # type: Text + self.basedir = "" # type: Text + self.toplevel = False # type: bool + self.mutation_manager = None # type: Optional[MutationManager] + self.make_fs_access = StdFsAccess # type: Callable[[Text], StdFsAccess] + self.path_mapper = PathMapper + self.builder = None # type: Optional[Builder] + self.docker_outdir = "" # type: Text + self.docker_tmpdir = "" # type: Text + self.docker_stagedir = "" # type: Text + self.js_console = False # type: bool + self.job_script_provider = None # type: Optional[DependenciesConfiguration] + self.select_resources = None # type: Optional[select_resources_callable] + self.eval_timeout = 20 # type: float + self.postScatterEval = None # type: Optional[Callable[[MutableMapping[Text, Any]], Dict[Text, Any]]] + self.on_error = "stop" # type: Text + self.strict_memory_limit = False # type: bool + + self.cidfile_dir = None + self.cidfile_prefix = None + + self.workflow_eval_lock = None # type: Optional[threading.Condition] + self.research_obj = None # type: Optional[ResearchObject] + self.orcid = '' # type: str + self.cwl_full_name = "" # type: str + self.process_run_id = None # type: Optional[str] + self.prov_obj = None # type: Optional[ProvenanceProfile] + super(RuntimeContext, self).__init__(kwargs) + + + def copy(self): + # type: () -> RuntimeContext + return copy.copy(self) + +def getdefault(val, default): + # type: (Any, Any) -> Any + if val is None: + return default + else: + return val