view env/lib/python3.7/site-packages/planemo/galaxy/activity.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line source

"""Module provides generic interface to running Galaxy tools and workflows."""

import json
import os
import tempfile
import time

import bioblend
import requests
import yaml
from bioblend.galaxy.client import Client
from bioblend.util import attach_file
from galaxy.tool_util.cwl.util import (
    DirectoryUploadTarget,
    FileUploadTarget,
    galactic_job_json,
    invocation_to_output,
    output_properties,
    output_to_cwl_json,
    path_or_uri_to_uri,
    tool_response_to_output,
)
from galaxy.tool_util.parser import get_tool_source
from galaxy.util import (
    safe_makedirs,
    unicodify,
)

from planemo.galaxy.api import summarize_history
from planemo.io import wait_on
from planemo.runnable import (
    ErrorRunResponse,
    get_outputs,
    RunnableType,
    SuccessfulRunResponse,
)

DEFAULT_HISTORY_NAME = "CWL Target History"
ERR_NO_SUCH_TOOL = ("Failed to find tool with ID [%s] in Galaxy - cannot execute job. "
                    "You may need to enable verbose logging and determine why the tool did not load. [%s]")


def execute(ctx, config, runnable, job_path, **kwds):
    """Execute a Galaxy activity."""
    try:
        return _execute(ctx, config, runnable, job_path, **kwds)
    except Exception as e:
        return ErrorRunResponse(unicodify(e))


def _verified_tool_id(runnable, user_gi):
    tool_id = _tool_id(runnable.path)
    try:
        user_gi.tools.show_tool(tool_id)
    except Exception as e:
        raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e))
    return tool_id


def _inputs_representation(runnable):
    if runnable.type == RunnableType.cwl_tool:
        inputs_representation = "cwl"
    else:
        inputs_representation = "galaxy"
    return inputs_representation


def log_contents_str(config):
    if hasattr(config, "log_contents"):
        return config.log_contents
    else:
        return "No log for this engine type."


def _execute(ctx, config, runnable, job_path, **kwds):
    user_gi = config.user_gi
    admin_gi = config.gi

    history_id = _history_id(user_gi, **kwds)

    job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds)

    if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]:
        response_class = GalaxyToolRunResponse
        tool_id = _verified_tool_id(runnable, user_gi)
        inputs_representation = _inputs_representation(runnable)
        run_tool_payload = dict(
            history_id=history_id,
            tool_id=tool_id,
            inputs=job_dict,
            inputs_representation=inputs_representation,
        )
        ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload)
        tool_run_response = user_gi.tools._post(run_tool_payload)

        job = tool_run_response["jobs"][0]
        job_id = job["id"]
        try:
            final_state = _wait_for_job(user_gi, job_id)
        except Exception:
            summarize_history(ctx, user_gi, history_id)
            raise
        if final_state != "ok":
            msg = "Failed to run CWL tool job final job state is [%s]." % final_state
            summarize_history(ctx, user_gi, history_id)
            with open("errored_galaxy.log", "w") as f:
                f.write(log_contents_str(config))
            raise Exception(msg)

        ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id)
        job_info = admin_gi.jobs.show_job(job_id)
        response_kwds = {
            'job_info': job_info,
            'api_run_response': tool_run_response,
        }
        if ctx.verbose:
            summarize_history(ctx, user_gi, history_id)
    elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]:
        response_class = GalaxyWorkflowRunResponse
        workflow_id = config.workflow_id(runnable.path)
        ctx.vlog("Found Galaxy workflow ID [%s] for path [%s]" % (workflow_id, runnable.path))
        # TODO: update bioblend to allow inputs_by.
        # invocation = user_gi.worklfows.invoke_workflow(
        #    workflow_id,
        #    history_id=history_id,
        #    inputs=job_dict,
        # )
        payload = dict(
            workflow_id=workflow_id,
            history_id=history_id,
            inputs=job_dict,
            inputs_by="name",
            allow_tool_state_corrections=True,
        )
        invocations_url = "%s/%s/invocations" % (
            user_gi._make_url(user_gi.workflows),
            workflow_id,
        )
        invocation = Client._post(user_gi.workflows, payload, url=invocations_url)
        invocation_id = invocation["id"]
        ctx.vlog("Waiting for invocation [%s]" % invocation_id)
        polling_backoff = kwds.get("polling_backoff", 0)
        try:
            final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff)
        except Exception:
            ctx.vlog("Problem waiting on invocation...")
            summarize_history(ctx, user_gi, history_id)
            raise
        ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
        final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff)
        if final_state != "ok":
            msg = "Failed to run workflow final history state is [%s]." % final_state
            summarize_history(ctx, user_gi, history_id)
            with open("errored_galaxy.log", "w") as f:
                f.write(log_contents_str(config))
            raise Exception(msg)
        ctx.vlog("Final history state is 'ok'")
        response_kwds = {
            'workflow_id': workflow_id,
            'invocation_id': invocation_id,
        }
    else:
        raise NotImplementedError()

    run_response = response_class(
        ctx=ctx,
        runnable=runnable,
        user_gi=user_gi,
        history_id=history_id,
        log=log_contents_str(config),
        **response_kwds
    )
    output_directory = kwds.get("output_directory", None)
    ctx.vlog("collecting outputs from run...")
    run_response.collect_outputs(ctx, output_directory)
    ctx.vlog("collecting outputs complete")
    return run_response


def stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds):
    files_attached = [False]

    def upload_func(upload_target):

        def _attach_file(upload_payload, uri, index=0):
            uri = path_or_uri_to_uri(uri)
            is_path = uri.startswith("file://")
            if not is_path or config.use_path_paste:
                upload_payload["inputs"]["files_%d|url_paste" % index] = uri
            else:
                files_attached[0] = True
                path = uri[len("file://"):]
                upload_payload["files_%d|file_data" % index] = attach_file(path)

        if isinstance(upload_target, FileUploadTarget):
            file_path = upload_target.path
            upload_payload = user_gi.tools._upload_payload(
                history_id,
                file_type=upload_target.properties.get('filetype', None) or "auto",
            )
            name = os.path.basename(file_path)
            upload_payload["inputs"]["files_0|auto_decompress"] = False
            upload_payload["inputs"]["auto_decompress"] = False
            _attach_file(upload_payload, file_path)
            upload_payload["inputs"]["files_0|NAME"] = name
            if upload_target.secondary_files:
                _attach_file(upload_payload, upload_target.secondary_files, index=1)
                upload_payload["inputs"]["files_1|type"] = "upload_dataset"
                upload_payload["inputs"]["files_1|auto_decompress"] = True
                upload_payload["inputs"]["file_count"] = "2"
                upload_payload["inputs"]["force_composite"] = "True"

            ctx.vlog("upload_payload is %s" % upload_payload)
            return user_gi.tools._post(upload_payload, files_attached=files_attached[0])
        elif isinstance(upload_target, DirectoryUploadTarget):
            tar_path = upload_target.tar_path

            upload_payload = user_gi.tools._upload_payload(
                history_id,
                file_type="tar",
            )
            upload_payload["inputs"]["files_0|auto_decompress"] = False
            _attach_file(upload_payload, tar_path)
            tar_upload_response = user_gi.tools._post(upload_payload, files_attached=files_attached[0])
            convert_response = user_gi.tools.run_tool(
                tool_id="CONVERTER_tar_to_directory",
                tool_inputs={"input1": {"src": "hda", "id": tar_upload_response["outputs"][0]["id"]}},
                history_id=history_id,
            )
            assert "outputs" in convert_response, convert_response
            return convert_response
        else:
            content = json.dumps(upload_target.object)
            return user_gi.tools.paste_content(
                content,
                history_id,
                file_type="expression.json",
            )

    def create_collection_func(element_identifiers, collection_type):
        payload = {
            "name": "dataset collection",
            "instance_type": "history",
            "history_id": history_id,
            "element_identifiers": element_identifiers,
            "collection_type": collection_type,
            "fields": None if collection_type != "record" else "auto",
        }
        dataset_collections_url = user_gi.url + "/dataset_collections"
        dataset_collection = Client._post(user_gi.histories, payload, url=dataset_collections_url)
        return dataset_collection

    with open(job_path, "r") as f:
        job = yaml.safe_load(f)

    # Figure out what "." should be here instead.
    job_dir = os.path.dirname(job_path)
    job_dict, datasets = galactic_job_json(
        job,
        job_dir,
        upload_func,
        create_collection_func,
        tool_or_workflow="tool" if runnable.type in [RunnableType.cwl_tool, RunnableType.galaxy_tool] else "workflow",
    )

    if datasets:
        final_state = _wait_for_history(ctx, user_gi, history_id)

        for (dataset, path) in datasets:
            dataset_details = user_gi.histories.show_dataset(
                history_id,
                dataset["id"],
            )
            ctx.vlog("Uploaded dataset for path [%s] with metadata [%s]" % (path, dataset_details))
    else:
        # Mark uploads as ok because nothing to do.
        final_state = "ok"

    ctx.vlog("final state is %s" % final_state)
    if final_state != "ok":
        msg = "Failed to run job final job state is [%s]." % final_state
        summarize_history(ctx, user_gi, history_id)
        with open("errored_galaxy.log", "w") as f:
            f.write(log_contents_str(config))
        raise Exception(msg)

    return job_dict, datasets


class GalaxyBaseRunResponse(SuccessfulRunResponse):

    def __init__(
        self,
        ctx,
        runnable,
        user_gi,
        history_id,
        log,
    ):
        self._ctx = ctx
        self._runnable = runnable
        self._user_gi = user_gi
        self._history_id = history_id
        self._log = log

        self._job_info = None

        self._outputs_dict = None

    def to_galaxy_output(self, output):
        """Convert runnable output to a GalaxyOutput object.

        Subclasses for workflow and tool execution override this.
        """
        raise NotImplementedError()

    def _get_extra_files(self, dataset_details):
        extra_files_url = "%s/%s/contents/%s/extra_files" % (
            self._user_gi._make_url(self._user_gi.histories), self._history_id, dataset_details["id"]
        )
        extra_files = Client._get(self._user_gi.jobs, url=extra_files_url)
        return extra_files

    def _get_metadata(self, history_content_type, content_id):
        if history_content_type == "dataset":
            return self._user_gi.histories.show_dataset(
                self._history_id,
                content_id,
            )
        elif history_content_type == "dataset_collection":
            return self._user_gi.histories.show_dataset_collection(
                self._history_id,
                content_id,
            )
        else:
            raise Exception("Unknown history content type encountered [%s]" % history_content_type)

    def collect_outputs(self, ctx, output_directory):
        assert self._outputs_dict is None, "collect_outputs pre-condition violated"

        outputs_dict = {}
        if not output_directory:
            # TODO: rather than creating a directory just use
            # Galaxy paths if they are available in this
            # configuration.
            output_directory = tempfile.mkdtemp()

        def get_dataset(dataset_details, filename=None):
            parent_basename = dataset_details.get("cwl_file_name")
            if not parent_basename:
                parent_basename = dataset_details.get("name")
            file_ext = dataset_details["file_ext"]
            if file_ext == "directory":
                # TODO: rename output_directory to outputs_directory because we can have output directories
                # and this is confusing...
                the_output_directory = os.path.join(output_directory, parent_basename)
                safe_makedirs(the_output_directory)
                destination = self.download_output_to(dataset_details, the_output_directory, filename=filename)
            else:
                destination = self.download_output_to(dataset_details, output_directory, filename=filename)
            if filename is None:
                basename = parent_basename
            else:
                basename = os.path.basename(filename)

            return {"path": destination, "basename": basename}

        ctx.vlog("collecting outputs to directory %s" % output_directory)
        for runnable_output in get_outputs(self._runnable):
            output_id = runnable_output.get_id()
            if not output_id:
                ctx.vlog("Workflow output identified without an ID (label), skipping")
                continue
            output_dict_value = None
            if self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]:
                galaxy_output = self.to_galaxy_output(runnable_output)
                cwl_output = output_to_cwl_json(
                    galaxy_output,
                    self._get_metadata,
                    get_dataset,
                    self._get_extra_files,
                    pseduo_location=True,
                )
                output_dict_value = cwl_output
            else:
                # TODO: deprecate this route for finding workflow outputs,
                # it is a brittle and bad approach...
                output_dataset_id = self.output_dataset_id(runnable_output)
                dataset = self._get_metadata("dataset", output_dataset_id)
                dataset_dict = get_dataset(dataset)
                ctx.vlog("populated destination [%s]" % dataset_dict["path"])

                if dataset["file_ext"] == "expression.json":
                    with open(dataset_dict["path"], "r") as f:
                        output_dict_value = json.load(f)
                else:
                    output_dict_value = output_properties(**dataset_dict)

            outputs_dict[output_id] = output_dict_value

        self._outputs_dict = outputs_dict
        ctx.vlog("collected outputs [%s]" % self._outputs_dict)

    @property
    def log(self):
        return self._log

    @property
    def job_info(self):
        if self._job_info is not None:
            return dict(
                stdout=self._job_info["stdout"],
                stderr=self._job_info["stderr"],
                command_line=self._job_info["command_line"],
            )
        return None

    @property
    def outputs_dict(self):
        return self._outputs_dict

    def download_output_to(self, dataset_details, output_directory, filename=None):
        if filename is None:
            local_filename = dataset_details.get("cwl_file_name") or dataset_details.get("name")
        else:
            local_filename = filename
        destination = os.path.join(output_directory, local_filename)
        self._history_content_download(
            self._history_id,
            dataset_details["id"],
            to_path=destination,
            filename=filename,
        )
        return destination

    def _history_content_download(self, history_id, dataset_id, to_path, filename=None):
        user_gi = self._user_gi
        url = user_gi.url + "/histories/%s/contents/%s/display" % (history_id, dataset_id)

        data = {}
        if filename:
            data["filename"] = filename

        r = requests.get(url, params=data, verify=user_gi.verify, stream=True, timeout=user_gi.timeout)
        r.raise_for_status()

        with open(to_path, 'wb') as fp:
            for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE):
                if chunk:
                    fp.write(chunk)


class GalaxyToolRunResponse(GalaxyBaseRunResponse):

    def __init__(
        self,
        ctx,
        runnable,
        user_gi,
        history_id,
        log,
        job_info,
        api_run_response,
    ):
        super(GalaxyToolRunResponse, self).__init__(
            ctx=ctx,
            runnable=runnable,
            user_gi=user_gi,
            history_id=history_id,
            log=log,
        )
        self._job_info = job_info
        self.api_run_response = api_run_response

    def is_collection(self, output):
        # TODO: Make this more rigorous - search both output and output
        # collections - throw an exception if not found in either place instead
        # of just assuming all non-datasets are collections.
        return self.output_dataset_id(output) is None

    def to_galaxy_output(self, runnable_output):
        output_id = runnable_output.get_id()
        return tool_response_to_output(self.api_run_response, self._history_id, output_id)

    def output_dataset_id(self, output):
        outputs = self.api_run_response["outputs"]
        output_id = output.get_id()
        output_dataset_id = None
        self._ctx.vlog("Looking for id [%s] in outputs [%s]" % (output_id, outputs))
        for output in outputs:
            if output["output_name"] == output_id:
                output_dataset_id = output["id"]

        return output_dataset_id


class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse):

    def __init__(
        self,
        ctx,
        runnable,
        user_gi,
        history_id,
        log,
        workflow_id,
        invocation_id,
    ):
        super(GalaxyWorkflowRunResponse, self).__init__(
            ctx=ctx,
            runnable=runnable,
            user_gi=user_gi,
            history_id=history_id,
            log=log,
        )
        self._workflow_id = workflow_id
        self._invocation_id = invocation_id

    def to_galaxy_output(self, runnable_output):
        output_id = runnable_output.get_id()
        self._ctx.vlog("checking for output in invocation [%s]" % self._invocation)
        return invocation_to_output(self._invocation, self._history_id, output_id)

    def output_dataset_id(self, output):
        invocation = self._invocation
        if "outputs" in invocation:
            # Use newer workflow outputs API.

            output_name = output.get_id()
            if output_name in invocation["outputs"]:
                return invocation["outputs"][output.get_id()]["id"]
            else:
                raise Exception("Failed to find output [%s] in invocation outputs [%s]" % (output_name, invocation["outputs"]))
        else:
            # Assume the output knows its order_index and such - older line of
            # development not worth persuing.
            workflow_output = output.workflow_output
            order_index = workflow_output.order_index

            invocation_steps = invocation["steps"]
            output_steps = [s for s in invocation_steps if s["order_index"] == order_index]
            assert len(output_steps) == 1, "More than one step matching outputs, behavior undefined."
            output_step = output_steps[0]
            job_id = output_step["job_id"]
            assert job_id, "Output doesn't define a job_id, behavior undefined."
            job_info = self._user_gi.jobs.show_job(job_id, full_details=True)
            job_outputs = job_info["outputs"]
            output_name = workflow_output.output_name
            assert output_name in job_outputs, "No output [%s] found for output job."
            job_output = job_outputs[output_name]
            assert "id" in job_output, "Job output [%s] does not contain 'id'." % job_output
            return job_output["id"]

    @property
    def _invocation(self):
        invocation = self._user_gi.workflows.show_invocation(
            self._workflow_id,
            self._invocation_id,
        )
        return invocation


def _tool_id(tool_path):
    tool_source = get_tool_source(tool_path)
    return tool_source.parse_id()


def _history_id(gi, **kwds):
    history_id = kwds.get("history_id", None)
    if history_id is None:
        history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME)
        history_id = gi.histories.create_history(history_name)["id"]
    return history_id


def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0):

    def state_func():
        if _retry_on_timeouts(ctx, gi, lambda gi: has_jobs_in_states(gi, history_id, ["error", "deleted", "deleted_new"])):
            raise Exception("Problem running workflow, one or more jobs failed.")

        return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))

    return _wait_on_state(state_func, polling_backoff)


def _retry_on_timeouts(ctx, gi, f):
    gi.timeout = 60
    try_count = 5
    try:
        for try_num in range(try_count):
            start_time = time.time()
            try:
                return f(gi)
            except Exception:
                end_time = time.time()
                if end_time - start_time > 45 and (try_num + 1) < try_count:
                    ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.")
                    continue
                else:
                    raise
    finally:
        gi.timeout = None


def has_jobs_in_states(gi, history_id, states):
    params = {"history_id": history_id}
    jobs_url = gi._make_url(gi.jobs)
    jobs = Client._get(gi.jobs, params=params, url=jobs_url)

    target_jobs = [j for j in jobs if j["state"] in states]

    return len(target_jobs) > 0


def _wait_for_history(ctx, gi, history_id, polling_backoff=0):

    def has_active_jobs(gi):
        if has_jobs_in_states(gi, history_id, ["new", "upload", "waiting", "queued", "running"]):
            return True
        else:
            return None

    timeout = 60 * 60 * 24
    wait_on(lambda: _retry_on_timeouts(ctx, gi, has_active_jobs), "active jobs", timeout, polling_backoff)

    def state_func():
        return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))

    return _wait_on_state(state_func, polling_backoff)


def _wait_for_job(gi, job_id):
    def state_func():
        return gi.jobs.show_job(job_id, full_details=True)

    return _wait_on_state(state_func)


def _wait_on_state(state_func, polling_backoff=0):

    def get_state():
        response = state_func()
        state = response["state"]
        if str(state) not in ["running", "queued", "new", "ready"]:
            return state
        else:
            return None
    timeout = 60 * 60 * 24
    final_state = wait_on(get_state, "state", timeout, polling_backoff)
    return final_state


__all__ = (
    "execute",
)