view env/lib/python3.7/site-packages/cwltool/executors.py @ 3:758bc20232e8 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:20:52 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

# -*- coding: utf-8 -*-
""" Single and multi-threaded executors."""
import datetime
import os
import tempfile
import threading
import logging
from threading import Lock
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union

import psutil
from six import string_types, with_metaclass
from typing_extensions import Text  # pylint: disable=unused-import
from future.utils import raise_from
from schema_salad.validate import ValidationException

from .builder import Builder  # pylint: disable=unused-import
from .context import (RuntimeContext,  # pylint: disable=unused-import
                      getdefault)
from .errors import WorkflowException
from .job import JobBase  # pylint: disable=unused-import
from .loghandler import _logger
from .mutation import MutationManager
from .process import Process  # pylint: disable=unused-import
from .process import cleanIntermediate, relocateOutputs
from .provenance import ProvenanceProfile
from .utils import DEFAULT_TMP_PREFIX
from .workflow import Workflow, WorkflowJob, WorkflowJobStep
from .command_line_tool import CallbackJob

TMPDIR_LOCK = Lock()


class JobExecutor(with_metaclass(ABCMeta, object)):
    """Abstract base job executor."""

    def __init__(self):
        # type: (...) -> None
        """Initialize."""
        self.final_output = []  # type: List[Union[Dict[Text, Any], List[Dict[Text, Any]]]]
        self.final_status = []  # type: List[Text]
        self.output_dirs = set()  # type: Set[Text]

    def __call__(self, *args, **kwargs):  # type: (*Any, **Any) -> Any
        return self.execute(*args, **kwargs)

    def output_callback(self, out, process_status):  # type: (Dict[Text, Any], Text) -> None
        """Collect the final status and outputs."""
        self.final_status.append(process_status)
        self.final_output.append(out)

    @abstractmethod
    def run_jobs(self,
                 process,           # type: Process
                 job_order_object,  # type: Dict[Text, Any]
                 logger,            # type: logging.Logger
                 runtime_context     # type: RuntimeContext
                ):  # type: (...) -> None
        """Execute the jobs for the given Process."""

    def execute(self,
                process,           # type: Process
                job_order_object,  # type: Dict[Text, Any]
                runtime_context,   # type: RuntimeContext
                logger=_logger,    # type: logging.Logger
               ):  # type: (...) -> Tuple[Optional[Union[Dict[Text, Any], List[Dict[Text, Any]]]], Text]
        """Execute the process."""
        if not runtime_context.basedir:
            raise WorkflowException("Must provide 'basedir' in runtimeContext")

        finaloutdir = None  # Type: Optional[Text]
        original_outdir = runtime_context.outdir
        if isinstance(original_outdir, string_types):
            finaloutdir = os.path.abspath(original_outdir)
        runtime_context = runtime_context.copy()
        outdir = tempfile.mkdtemp(
            prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))
        self.output_dirs.add(outdir)
        runtime_context.outdir = outdir
        runtime_context.mutation_manager = MutationManager()
        runtime_context.toplevel = True
        runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())

        job_reqs = None
        if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
            if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
                raise WorkflowException(
                    "`cwl:requirements` in the input object is not part of CWL "
                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
                    "can set the cwlVersion to v1.1")
            job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
        elif ("cwl:defaults" in process.metadata
              and "https://w3id.org/cwl/cwl#requirements"
              in process.metadata["cwl:defaults"]):
            if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
                raise WorkflowException(
                    "`cwl:requirements` in the input object is not part of CWL "
                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
                    "can set the cwlVersion to v1.1")
            job_reqs = process.metadata["cwl:defaults"]["https://w3id.org/cwl/cwl#requirements"]
        if job_reqs is not None:
            for req in job_reqs:
                process.requirements.append(req)

        self.run_jobs(process, job_order_object, logger, runtime_context)

        if self.final_output and self.final_output[0] is not None and finaloutdir is not None:
            self.final_output[0] = relocateOutputs(
                self.final_output[0], finaloutdir, self.output_dirs,
                runtime_context.move_outputs, runtime_context.make_fs_access(""),
                getdefault(runtime_context.compute_checksum, True),
                path_mapper=runtime_context.path_mapper)

        if runtime_context.rm_tmpdir:
            if runtime_context.cachedir is None:
                output_dirs = self.output_dirs  # type: Iterable[Any]
            else:
                output_dirs = filter(lambda x: not x.startswith(
                    runtime_context.cachedir), self.output_dirs)
            cleanIntermediate(output_dirs)

        if self.final_output and self.final_status:

            if runtime_context.research_obj is not None and \
                    isinstance(process, (JobBase, Process, WorkflowJobStep,
                                         WorkflowJob)) and process.parent_wf:
                process_run_id = None
                name = "primary"
                process.parent_wf.generate_output_prov(self.final_output[0],
                                                       process_run_id, name)
                process.parent_wf.document.wasEndedBy(
                    process.parent_wf.workflow_run_uri, None, process.parent_wf.engine_uuid,
                    datetime.datetime.now())
                process.parent_wf.finalize_prov_profile(name=None)
            return (self.final_output[0], self.final_status[0])
        return (None, "permanentFail")


class SingleJobExecutor(JobExecutor):
    """Default single-threaded CWL reference executor."""

    def run_jobs(self,
                 process,           # type: Process
                 job_order_object,  # type: Dict[Text, Any]
                 logger,            # type: logging.Logger
                 runtime_context    # type: RuntimeContext
                ):  # type: (...) -> None

        process_run_id = None  # type: Optional[str]

        # define provenance profile for single commandline tool
        if not isinstance(process, Workflow) \
                and runtime_context.research_obj is not None:
            process.provenance_object = ProvenanceProfile(
                runtime_context.research_obj,
                full_name=runtime_context.cwl_full_name,
                host_provenance=False,
                user_provenance=False,
                orcid=runtime_context.orcid,
                # single tool execution, so RO UUID = wf UUID = tool UUID
                run_uuid=runtime_context.research_obj.ro_uuid,
                fsaccess=runtime_context.make_fs_access(''))
            process.parent_wf = process.provenance_object
        jobiter = process.job(job_order_object, self.output_callback,
                              runtime_context)

        try:
            for job in jobiter:
                if job is not None:
                    if runtime_context.builder is not None:
                        job.builder = runtime_context.builder
                    if job.outdir is not None:
                        self.output_dirs.add(job.outdir)
                    if runtime_context.research_obj is not None:
                        if not isinstance(process, Workflow):
                            prov_obj = process.provenance_object
                        else:
                            prov_obj = job.prov_obj
                        if prov_obj:
                            runtime_context.prov_obj = prov_obj
                            prov_obj.fsaccess = runtime_context.make_fs_access('')
                            prov_obj.evaluate(
                                process, job, job_order_object,
                                runtime_context.research_obj)
                            process_run_id =\
                                prov_obj.record_process_start(process, job)
                            runtime_context = runtime_context.copy()
                        runtime_context.process_run_id = process_run_id
                    job.run(runtime_context)
                else:
                    logger.error("Workflow cannot make any more progress.")
                    break
        except (ValidationException, WorkflowException):  # pylint: disable=try-except-raise
            raise
        except Exception as err:
            logger.exception("Got workflow error")
            raise_from(WorkflowException(Text(err)), err)


class MultithreadedJobExecutor(JobExecutor):
    """
    Experimental multi-threaded CWL executor.

    Does simple resource accounting, will not start a job unless it
    has cores / ram available, but does not make any attempt to
    optimize usage.
    """

    def __init__(self):  # type: () -> None
        """Initialize."""
        super(MultithreadedJobExecutor, self).__init__()
        self.threads = set()  # type: Set[threading.Thread]
        self.exceptions = []  # type: List[WorkflowException]
        self.pending_jobs = []  # type: List[Union[JobBase, WorkflowJob]]
        self.pending_jobs_lock = threading.Lock()

        self.max_ram = int(psutil.virtual_memory().available / 2**20)
        self.max_cores = psutil.cpu_count()
        self.allocated_ram = 0
        self.allocated_cores = 0

    def select_resources(self, request, runtime_context):  # pylint: disable=unused-argument
        # type: (Dict[str, int], RuntimeContext) -> Dict[str, int]
        """Naïve check for available cpu cores and memory."""
        result = {}  # type: Dict[str, int]
        maxrsc = {
            "cores": self.max_cores,
            "ram": self.max_ram
        }
        for rsc in ("cores", "ram"):
            if request[rsc+"Min"] > maxrsc[rsc]:
                raise WorkflowException(
                    "Requested at least %d %s but only %d available" %
                    (request[rsc+"Min"], rsc, maxrsc[rsc]))
            if request[rsc+"Max"] < maxrsc[rsc]:
                result[rsc] = request[rsc+"Max"]
            else:
                result[rsc] = maxrsc[rsc]

        return result

    def _runner(self, job, runtime_context, TMPDIR_LOCK):
        # type: (Union[JobBase, WorkflowJob, CallbackJob], RuntimeContext, threading.Lock) -> None
        """Job running thread."""
        try:
            _logger.debug("job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format(job, runtime_context, TMPDIR_LOCK))
            job.run(runtime_context, TMPDIR_LOCK)
        except WorkflowException as err:
            _logger.exception("Got workflow error")
            self.exceptions.append(err)
        except Exception as err:  # pylint: disable=broad-except
            _logger.exception("Got workflow error")
            self.exceptions.append(WorkflowException(Text(err)))
        finally:
            if runtime_context.workflow_eval_lock:
                with runtime_context.workflow_eval_lock:
                    self.threads.remove(threading.current_thread())
                    if isinstance(job, JobBase):
                        self.allocated_ram -= job.builder.resources["ram"]
                        self.allocated_cores -= job.builder.resources["cores"]
                    runtime_context.workflow_eval_lock.notifyAll()

    def run_job(self,
                job,             # type: Union[JobBase, WorkflowJob, None]
                runtime_context  # type: RuntimeContext
               ):  # type: (...) -> None
        """Execute a single Job in a seperate thread."""
        if job is not None:
            with self.pending_jobs_lock:
                self.pending_jobs.append(job)

        with self.pending_jobs_lock:
            n = 0
            while (n+1) <= len(self.pending_jobs):
                job = self.pending_jobs[n]
                if isinstance(job, JobBase):
                    if ((job.builder.resources["ram"])
                        > self.max_ram
                        or (job.builder.resources["cores"])
                        > self.max_cores):
                        _logger.error(
                            'Job "%s" cannot be run, requests more resources (%s) '
                            'than available on this host (max ram %d, max cores %d',
                            job.name, job.builder.resources,
                            self.allocated_ram,
                            self.allocated_cores,
                            self.max_ram,
                            self.max_cores)
                        self.pending_jobs.remove(job)
                        return

                    if ((self.allocated_ram + job.builder.resources["ram"])
                        > self.max_ram
                        or (self.allocated_cores + job.builder.resources["cores"])
                        > self.max_cores):
                        _logger.debug(
                            'Job "%s" cannot run yet, resources (%s) are not '
                            'available (already allocated ram is %d, allocated cores is %d, '
                            'max ram %d, max cores %d',
                            job.name, job.builder.resources,
                            self.allocated_ram,
                            self.allocated_cores,
                            self.max_ram,
                            self.max_cores)
                        n += 1
                        continue

                thread = threading.Thread(target=self._runner, args=(job, runtime_context, TMPDIR_LOCK))
                thread.daemon = True
                self.threads.add(thread)
                if isinstance(job, JobBase):
                    self.allocated_ram += job.builder.resources["ram"]
                    self.allocated_cores += job.builder.resources["cores"]
                thread.start()
                self.pending_jobs.remove(job)

    def wait_for_next_completion(self, runtime_context):
        # type: (RuntimeContext) -> None
        """Wait for jobs to finish."""
        if runtime_context.workflow_eval_lock is not None:
            runtime_context.workflow_eval_lock.wait()
        if self.exceptions:
            raise self.exceptions[0]

    def run_jobs(self,
                 process,           # type: Process
                 job_order_object,  # type: Dict[Text, Any]
                 logger,            # type: logging.Logger
                 runtime_context    # type: RuntimeContext
                ):  # type: (...) -> None

        jobiter = process.job(job_order_object, self.output_callback,
                              runtime_context)

        if runtime_context.workflow_eval_lock is None:
            raise WorkflowException(
                "runtimeContext.workflow_eval_lock must not be None")

        runtime_context.workflow_eval_lock.acquire()
        for job in jobiter:
            if job is not None:
                if isinstance(job, JobBase):
                    job.builder = runtime_context.builder or job.builder
                    if job.outdir is not None:
                        self.output_dirs.add(job.outdir)

            self.run_job(job, runtime_context)

            if job is None:
                if self.threads:
                    self.wait_for_next_completion(runtime_context)
                else:
                    logger.error("Workflow cannot make any more progress.")
                    break

        self.run_job(None, runtime_context)
        while self.threads:
            self.wait_for_next_completion(runtime_context)
            self.run_job(None, runtime_context)

        runtime_context.workflow_eval_lock.release()