diff env/lib/python3.7/site-packages/cwltool/executors.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/executors.py	Thu May 14 14:56:58 2020 -0400
@@ -0,0 +1,362 @@
+# -*- coding: utf-8 -*-
+""" Single and multi-threaded executors."""
+import datetime
+import os
+import tempfile
+import threading
+import logging
+from threading import Lock
+from abc import ABCMeta, abstractmethod
+from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
+
+import psutil
+from six import string_types, with_metaclass
+from typing_extensions import Text  # pylint: disable=unused-import
+from future.utils import raise_from
+from schema_salad.validate import ValidationException
+
+from .builder import Builder  # pylint: disable=unused-import
+from .context import (RuntimeContext,  # pylint: disable=unused-import
+                      getdefault)
+from .errors import WorkflowException
+from .job import JobBase  # pylint: disable=unused-import
+from .loghandler import _logger
+from .mutation import MutationManager
+from .process import Process  # pylint: disable=unused-import
+from .process import cleanIntermediate, relocateOutputs
+from .provenance import ProvenanceProfile
+from .utils import DEFAULT_TMP_PREFIX
+from .workflow import Workflow, WorkflowJob, WorkflowJobStep
+from .command_line_tool import CallbackJob
+
+TMPDIR_LOCK = Lock()
+
+
+class JobExecutor(with_metaclass(ABCMeta, object)):
+    """Abstract base job executor."""
+
+    def __init__(self):
+        # type: (...) -> None
+        """Initialize."""
+        self.final_output = []  # type: List[Union[Dict[Text, Any], List[Dict[Text, Any]]]]
+        self.final_status = []  # type: List[Text]
+        self.output_dirs = set()  # type: Set[Text]
+
+    def __call__(self, *args, **kwargs):  # type: (*Any, **Any) -> Any
+        return self.execute(*args, **kwargs)
+
+    def output_callback(self, out, process_status):  # type: (Dict[Text, Any], Text) -> None
+        """Collect the final status and outputs."""
+        self.final_status.append(process_status)
+        self.final_output.append(out)
+
+    @abstractmethod
+    def run_jobs(self,
+                 process,           # type: Process
+                 job_order_object,  # type: Dict[Text, Any]
+                 logger,            # type: logging.Logger
+                 runtime_context     # type: RuntimeContext
+                ):  # type: (...) -> None
+        """Execute the jobs for the given Process."""
+
+    def execute(self,
+                process,           # type: Process
+                job_order_object,  # type: Dict[Text, Any]
+                runtime_context,   # type: RuntimeContext
+                logger=_logger,    # type: logging.Logger
+               ):  # type: (...) -> Tuple[Optional[Union[Dict[Text, Any], List[Dict[Text, Any]]]], Text]
+        """Execute the process."""
+        if not runtime_context.basedir:
+            raise WorkflowException("Must provide 'basedir' in runtimeContext")
+
+        finaloutdir = None  # Type: Optional[Text]
+        original_outdir = runtime_context.outdir
+        if isinstance(original_outdir, string_types):
+            finaloutdir = os.path.abspath(original_outdir)
+        runtime_context = runtime_context.copy()
+        outdir = tempfile.mkdtemp(
+            prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))
+        self.output_dirs.add(outdir)
+        runtime_context.outdir = outdir
+        runtime_context.mutation_manager = MutationManager()
+        runtime_context.toplevel = True
+        runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())
+
+        job_reqs = None
+        if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
+            if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
+                raise WorkflowException(
+                    "`cwl:requirements` in the input object is not part of CWL "
+                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
+                    "can set the cwlVersion to v1.1")
+            job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
+        elif ("cwl:defaults" in process.metadata
+              and "https://w3id.org/cwl/cwl#requirements"
+              in process.metadata["cwl:defaults"]):
+            if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
+                raise WorkflowException(
+                    "`cwl:requirements` in the input object is not part of CWL "
+                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
+                    "can set the cwlVersion to v1.1")
+            job_reqs = process.metadata["cwl:defaults"]["https://w3id.org/cwl/cwl#requirements"]
+        if job_reqs is not None:
+            for req in job_reqs:
+                process.requirements.append(req)
+
+        self.run_jobs(process, job_order_object, logger, runtime_context)
+
+        if self.final_output and self.final_output[0] is not None and finaloutdir is not None:
+            self.final_output[0] = relocateOutputs(
+                self.final_output[0], finaloutdir, self.output_dirs,
+                runtime_context.move_outputs, runtime_context.make_fs_access(""),
+                getdefault(runtime_context.compute_checksum, True),
+                path_mapper=runtime_context.path_mapper)
+
+        if runtime_context.rm_tmpdir:
+            if runtime_context.cachedir is None:
+                output_dirs = self.output_dirs  # type: Iterable[Any]
+            else:
+                output_dirs = filter(lambda x: not x.startswith(
+                    runtime_context.cachedir), self.output_dirs)
+            cleanIntermediate(output_dirs)
+
+        if self.final_output and self.final_status:
+
+            if runtime_context.research_obj is not None and \
+                    isinstance(process, (JobBase, Process, WorkflowJobStep,
+                                         WorkflowJob)) and process.parent_wf:
+                process_run_id = None
+                name = "primary"
+                process.parent_wf.generate_output_prov(self.final_output[0],
+                                                       process_run_id, name)
+                process.parent_wf.document.wasEndedBy(
+                    process.parent_wf.workflow_run_uri, None, process.parent_wf.engine_uuid,
+                    datetime.datetime.now())
+                process.parent_wf.finalize_prov_profile(name=None)
+            return (self.final_output[0], self.final_status[0])
+        return (None, "permanentFail")
+
+
+class SingleJobExecutor(JobExecutor):
+    """Default single-threaded CWL reference executor."""
+
+    def run_jobs(self,
+                 process,           # type: Process
+                 job_order_object,  # type: Dict[Text, Any]
+                 logger,            # type: logging.Logger
+                 runtime_context    # type: RuntimeContext
+                ):  # type: (...) -> None
+
+        process_run_id = None  # type: Optional[str]
+
+        # define provenance profile for single commandline tool
+        if not isinstance(process, Workflow) \
+                and runtime_context.research_obj is not None:
+            process.provenance_object = ProvenanceProfile(
+                runtime_context.research_obj,
+                full_name=runtime_context.cwl_full_name,
+                host_provenance=False,
+                user_provenance=False,
+                orcid=runtime_context.orcid,
+                # single tool execution, so RO UUID = wf UUID = tool UUID
+                run_uuid=runtime_context.research_obj.ro_uuid,
+                fsaccess=runtime_context.make_fs_access(''))
+            process.parent_wf = process.provenance_object
+        jobiter = process.job(job_order_object, self.output_callback,
+                              runtime_context)
+
+        try:
+            for job in jobiter:
+                if job is not None:
+                    if runtime_context.builder is not None:
+                        job.builder = runtime_context.builder
+                    if job.outdir is not None:
+                        self.output_dirs.add(job.outdir)
+                    if runtime_context.research_obj is not None:
+                        if not isinstance(process, Workflow):
+                            prov_obj = process.provenance_object
+                        else:
+                            prov_obj = job.prov_obj
+                        if prov_obj:
+                            runtime_context.prov_obj = prov_obj
+                            prov_obj.fsaccess = runtime_context.make_fs_access('')
+                            prov_obj.evaluate(
+                                process, job, job_order_object,
+                                runtime_context.research_obj)
+                            process_run_id =\
+                                prov_obj.record_process_start(process, job)
+                            runtime_context = runtime_context.copy()
+                        runtime_context.process_run_id = process_run_id
+                    job.run(runtime_context)
+                else:
+                    logger.error("Workflow cannot make any more progress.")
+                    break
+        except (ValidationException, WorkflowException):  # pylint: disable=try-except-raise
+            raise
+        except Exception as err:
+            logger.exception("Got workflow error")
+            raise_from(WorkflowException(Text(err)), err)
+
+
+class MultithreadedJobExecutor(JobExecutor):
+    """
+    Experimental multi-threaded CWL executor.
+
+    Does simple resource accounting, will not start a job unless it
+    has cores / ram available, but does not make any attempt to
+    optimize usage.
+    """
+
+    def __init__(self):  # type: () -> None
+        """Initialize."""
+        super(MultithreadedJobExecutor, self).__init__()
+        self.threads = set()  # type: Set[threading.Thread]
+        self.exceptions = []  # type: List[WorkflowException]
+        self.pending_jobs = []  # type: List[Union[JobBase, WorkflowJob]]
+        self.pending_jobs_lock = threading.Lock()
+
+        self.max_ram = int(psutil.virtual_memory().available / 2**20)
+        self.max_cores = psutil.cpu_count()
+        self.allocated_ram = 0
+        self.allocated_cores = 0
+
+    def select_resources(self, request, runtime_context):  # pylint: disable=unused-argument
+        # type: (Dict[str, int], RuntimeContext) -> Dict[str, int]
+        """Naïve check for available cpu cores and memory."""
+        result = {}  # type: Dict[str, int]
+        maxrsc = {
+            "cores": self.max_cores,
+            "ram": self.max_ram
+        }
+        for rsc in ("cores", "ram"):
+            if request[rsc+"Min"] > maxrsc[rsc]:
+                raise WorkflowException(
+                    "Requested at least %d %s but only %d available" %
+                    (request[rsc+"Min"], rsc, maxrsc[rsc]))
+            if request[rsc+"Max"] < maxrsc[rsc]:
+                result[rsc] = request[rsc+"Max"]
+            else:
+                result[rsc] = maxrsc[rsc]
+
+        return result
+
+    def _runner(self, job, runtime_context, TMPDIR_LOCK):
+        # type: (Union[JobBase, WorkflowJob, CallbackJob], RuntimeContext, threading.Lock) -> None
+        """Job running thread."""
+        try:
+            _logger.debug("job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format(job, runtime_context, TMPDIR_LOCK))
+            job.run(runtime_context, TMPDIR_LOCK)
+        except WorkflowException as err:
+            _logger.exception("Got workflow error")
+            self.exceptions.append(err)
+        except Exception as err:  # pylint: disable=broad-except
+            _logger.exception("Got workflow error")
+            self.exceptions.append(WorkflowException(Text(err)))
+        finally:
+            if runtime_context.workflow_eval_lock:
+                with runtime_context.workflow_eval_lock:
+                    self.threads.remove(threading.current_thread())
+                    if isinstance(job, JobBase):
+                        self.allocated_ram -= job.builder.resources["ram"]
+                        self.allocated_cores -= job.builder.resources["cores"]
+                    runtime_context.workflow_eval_lock.notifyAll()
+
+    def run_job(self,
+                job,             # type: Union[JobBase, WorkflowJob, None]
+                runtime_context  # type: RuntimeContext
+               ):  # type: (...) -> None
+        """Execute a single Job in a seperate thread."""
+        if job is not None:
+            with self.pending_jobs_lock:
+                self.pending_jobs.append(job)
+
+        with self.pending_jobs_lock:
+            n = 0
+            while (n+1) <= len(self.pending_jobs):
+                job = self.pending_jobs[n]
+                if isinstance(job, JobBase):
+                    if ((job.builder.resources["ram"])
+                        > self.max_ram
+                        or (job.builder.resources["cores"])
+                        > self.max_cores):
+                        _logger.error(
+                            'Job "%s" cannot be run, requests more resources (%s) '
+                            'than available on this host (max ram %d, max cores %d',
+                            job.name, job.builder.resources,
+                            self.allocated_ram,
+                            self.allocated_cores,
+                            self.max_ram,
+                            self.max_cores)
+                        self.pending_jobs.remove(job)
+                        return
+
+                    if ((self.allocated_ram + job.builder.resources["ram"])
+                        > self.max_ram
+                        or (self.allocated_cores + job.builder.resources["cores"])
+                        > self.max_cores):
+                        _logger.debug(
+                            'Job "%s" cannot run yet, resources (%s) are not '
+                            'available (already allocated ram is %d, allocated cores is %d, '
+                            'max ram %d, max cores %d',
+                            job.name, job.builder.resources,
+                            self.allocated_ram,
+                            self.allocated_cores,
+                            self.max_ram,
+                            self.max_cores)
+                        n += 1
+                        continue
+
+                thread = threading.Thread(target=self._runner, args=(job, runtime_context, TMPDIR_LOCK))
+                thread.daemon = True
+                self.threads.add(thread)
+                if isinstance(job, JobBase):
+                    self.allocated_ram += job.builder.resources["ram"]
+                    self.allocated_cores += job.builder.resources["cores"]
+                thread.start()
+                self.pending_jobs.remove(job)
+
+    def wait_for_next_completion(self, runtime_context):
+        # type: (RuntimeContext) -> None
+        """Wait for jobs to finish."""
+        if runtime_context.workflow_eval_lock is not None:
+            runtime_context.workflow_eval_lock.wait()
+        if self.exceptions:
+            raise self.exceptions[0]
+
+    def run_jobs(self,
+                 process,           # type: Process
+                 job_order_object,  # type: Dict[Text, Any]
+                 logger,            # type: logging.Logger
+                 runtime_context    # type: RuntimeContext
+                ):  # type: (...) -> None
+
+        jobiter = process.job(job_order_object, self.output_callback,
+                              runtime_context)
+
+        if runtime_context.workflow_eval_lock is None:
+            raise WorkflowException(
+                "runtimeContext.workflow_eval_lock must not be None")
+
+        runtime_context.workflow_eval_lock.acquire()
+        for job in jobiter:
+            if job is not None:
+                if isinstance(job, JobBase):
+                    job.builder = runtime_context.builder or job.builder
+                    if job.outdir is not None:
+                        self.output_dirs.add(job.outdir)
+
+            self.run_job(job, runtime_context)
+
+            if job is None:
+                if self.threads:
+                    self.wait_for_next_completion(runtime_context)
+                else:
+                    logger.error("Workflow cannot make any more progress.")
+                    break
+
+        self.run_job(None, runtime_context)
+        while self.threads:
+            self.wait_for_next_completion(runtime_context)
+            self.run_job(None, runtime_context)
+
+        runtime_context.workflow_eval_lock.release()