diff env/lib/python3.7/site-packages/planemo/galaxy/activity.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/planemo/galaxy/activity.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,656 +0,0 @@
-"""Module provides generic interface to running Galaxy tools and workflows."""
-
-import json
-import os
-import tempfile
-import time
-
-import bioblend
-import requests
-import yaml
-from bioblend.galaxy.client import Client
-from bioblend.util import attach_file
-from galaxy.tool_util.cwl.util import (
-    DirectoryUploadTarget,
-    FileUploadTarget,
-    galactic_job_json,
-    invocation_to_output,
-    output_properties,
-    output_to_cwl_json,
-    path_or_uri_to_uri,
-    tool_response_to_output,
-)
-from galaxy.tool_util.parser import get_tool_source
-from galaxy.util import (
-    safe_makedirs,
-    unicodify,
-)
-
-from planemo.galaxy.api import summarize_history
-from planemo.io import wait_on
-from planemo.runnable import (
-    ErrorRunResponse,
-    get_outputs,
-    RunnableType,
-    SuccessfulRunResponse,
-)
-
-DEFAULT_HISTORY_NAME = "CWL Target History"
-ERR_NO_SUCH_TOOL = ("Failed to find tool with ID [%s] in Galaxy - cannot execute job. "
-                    "You may need to enable verbose logging and determine why the tool did not load. [%s]")
-
-
-def execute(ctx, config, runnable, job_path, **kwds):
-    """Execute a Galaxy activity."""
-    try:
-        return _execute(ctx, config, runnable, job_path, **kwds)
-    except Exception as e:
-        return ErrorRunResponse(unicodify(e))
-
-
-def _verified_tool_id(runnable, user_gi):
-    tool_id = _tool_id(runnable.path)
-    try:
-        user_gi.tools.show_tool(tool_id)
-    except Exception as e:
-        raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e))
-    return tool_id
-
-
-def _inputs_representation(runnable):
-    if runnable.type == RunnableType.cwl_tool:
-        inputs_representation = "cwl"
-    else:
-        inputs_representation = "galaxy"
-    return inputs_representation
-
-
-def log_contents_str(config):
-    if hasattr(config, "log_contents"):
-        return config.log_contents
-    else:
-        return "No log for this engine type."
-
-
-def _execute(ctx, config, runnable, job_path, **kwds):
-    user_gi = config.user_gi
-    admin_gi = config.gi
-
-    history_id = _history_id(user_gi, **kwds)
-
-    job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds)
-
-    if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]:
-        response_class = GalaxyToolRunResponse
-        tool_id = _verified_tool_id(runnable, user_gi)
-        inputs_representation = _inputs_representation(runnable)
-        run_tool_payload = dict(
-            history_id=history_id,
-            tool_id=tool_id,
-            inputs=job_dict,
-            inputs_representation=inputs_representation,
-        )
-        ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload)
-        tool_run_response = user_gi.tools._post(run_tool_payload)
-
-        job = tool_run_response["jobs"][0]
-        job_id = job["id"]
-        try:
-            final_state = _wait_for_job(user_gi, job_id)
-        except Exception:
-            summarize_history(ctx, user_gi, history_id)
-            raise
-        if final_state != "ok":
-            msg = "Failed to run CWL tool job final job state is [%s]." % final_state
-            summarize_history(ctx, user_gi, history_id)
-            with open("errored_galaxy.log", "w") as f:
-                f.write(log_contents_str(config))
-            raise Exception(msg)
-
-        ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id)
-        job_info = admin_gi.jobs.show_job(job_id)
-        response_kwds = {
-            'job_info': job_info,
-            'api_run_response': tool_run_response,
-        }
-        if ctx.verbose:
-            summarize_history(ctx, user_gi, history_id)
-    elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]:
-        response_class = GalaxyWorkflowRunResponse
-        workflow_id = config.workflow_id(runnable.path)
-        ctx.vlog("Found Galaxy workflow ID [%s] for path [%s]" % (workflow_id, runnable.path))
-        # TODO: update bioblend to allow inputs_by.
-        # invocation = user_gi.worklfows.invoke_workflow(
-        #    workflow_id,
-        #    history_id=history_id,
-        #    inputs=job_dict,
-        # )
-        payload = dict(
-            workflow_id=workflow_id,
-            history_id=history_id,
-            inputs=job_dict,
-            inputs_by="name",
-            allow_tool_state_corrections=True,
-        )
-        invocations_url = "%s/%s/invocations" % (
-            user_gi._make_url(user_gi.workflows),
-            workflow_id,
-        )
-        invocation = Client._post(user_gi.workflows, payload, url=invocations_url)
-        invocation_id = invocation["id"]
-        ctx.vlog("Waiting for invocation [%s]" % invocation_id)
-        polling_backoff = kwds.get("polling_backoff", 0)
-        try:
-            final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff)
-        except Exception:
-            ctx.vlog("Problem waiting on invocation...")
-            summarize_history(ctx, user_gi, history_id)
-            raise
-        ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
-        final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff)
-        if final_state != "ok":
-            msg = "Failed to run workflow final history state is [%s]." % final_state
-            summarize_history(ctx, user_gi, history_id)
-            with open("errored_galaxy.log", "w") as f:
-                f.write(log_contents_str(config))
-            raise Exception(msg)
-        ctx.vlog("Final history state is 'ok'")
-        response_kwds = {
-            'workflow_id': workflow_id,
-            'invocation_id': invocation_id,
-        }
-    else:
-        raise NotImplementedError()
-
-    run_response = response_class(
-        ctx=ctx,
-        runnable=runnable,
-        user_gi=user_gi,
-        history_id=history_id,
-        log=log_contents_str(config),
-        **response_kwds
-    )
-    output_directory = kwds.get("output_directory", None)
-    ctx.vlog("collecting outputs from run...")
-    run_response.collect_outputs(ctx, output_directory)
-    ctx.vlog("collecting outputs complete")
-    return run_response
-
-
-def stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds):
-    files_attached = [False]
-
-    def upload_func(upload_target):
-
-        def _attach_file(upload_payload, uri, index=0):
-            uri = path_or_uri_to_uri(uri)
-            is_path = uri.startswith("file://")
-            if not is_path or config.use_path_paste:
-                upload_payload["inputs"]["files_%d|url_paste" % index] = uri
-            else:
-                files_attached[0] = True
-                path = uri[len("file://"):]
-                upload_payload["files_%d|file_data" % index] = attach_file(path)
-
-        if isinstance(upload_target, FileUploadTarget):
-            file_path = upload_target.path
-            upload_payload = user_gi.tools._upload_payload(
-                history_id,
-                file_type=upload_target.properties.get('filetype', None) or "auto",
-            )
-            name = os.path.basename(file_path)
-            upload_payload["inputs"]["files_0|auto_decompress"] = False
-            upload_payload["inputs"]["auto_decompress"] = False
-            _attach_file(upload_payload, file_path)
-            upload_payload["inputs"]["files_0|NAME"] = name
-            if upload_target.secondary_files:
-                _attach_file(upload_payload, upload_target.secondary_files, index=1)
-                upload_payload["inputs"]["files_1|type"] = "upload_dataset"
-                upload_payload["inputs"]["files_1|auto_decompress"] = True
-                upload_payload["inputs"]["file_count"] = "2"
-                upload_payload["inputs"]["force_composite"] = "True"
-
-            ctx.vlog("upload_payload is %s" % upload_payload)
-            return user_gi.tools._post(upload_payload, files_attached=files_attached[0])
-        elif isinstance(upload_target, DirectoryUploadTarget):
-            tar_path = upload_target.tar_path
-
-            upload_payload = user_gi.tools._upload_payload(
-                history_id,
-                file_type="tar",
-            )
-            upload_payload["inputs"]["files_0|auto_decompress"] = False
-            _attach_file(upload_payload, tar_path)
-            tar_upload_response = user_gi.tools._post(upload_payload, files_attached=files_attached[0])
-            convert_response = user_gi.tools.run_tool(
-                tool_id="CONVERTER_tar_to_directory",
-                tool_inputs={"input1": {"src": "hda", "id": tar_upload_response["outputs"][0]["id"]}},
-                history_id=history_id,
-            )
-            assert "outputs" in convert_response, convert_response
-            return convert_response
-        else:
-            content = json.dumps(upload_target.object)
-            return user_gi.tools.paste_content(
-                content,
-                history_id,
-                file_type="expression.json",
-            )
-
-    def create_collection_func(element_identifiers, collection_type):
-        payload = {
-            "name": "dataset collection",
-            "instance_type": "history",
-            "history_id": history_id,
-            "element_identifiers": element_identifiers,
-            "collection_type": collection_type,
-            "fields": None if collection_type != "record" else "auto",
-        }
-        dataset_collections_url = user_gi.url + "/dataset_collections"
-        dataset_collection = Client._post(user_gi.histories, payload, url=dataset_collections_url)
-        return dataset_collection
-
-    with open(job_path, "r") as f:
-        job = yaml.safe_load(f)
-
-    # Figure out what "." should be here instead.
-    job_dir = os.path.dirname(job_path)
-    job_dict, datasets = galactic_job_json(
-        job,
-        job_dir,
-        upload_func,
-        create_collection_func,
-        tool_or_workflow="tool" if runnable.type in [RunnableType.cwl_tool, RunnableType.galaxy_tool] else "workflow",
-    )
-
-    if datasets:
-        final_state = _wait_for_history(ctx, user_gi, history_id)
-
-        for (dataset, path) in datasets:
-            dataset_details = user_gi.histories.show_dataset(
-                history_id,
-                dataset["id"],
-            )
-            ctx.vlog("Uploaded dataset for path [%s] with metadata [%s]" % (path, dataset_details))
-    else:
-        # Mark uploads as ok because nothing to do.
-        final_state = "ok"
-
-    ctx.vlog("final state is %s" % final_state)
-    if final_state != "ok":
-        msg = "Failed to run job final job state is [%s]." % final_state
-        summarize_history(ctx, user_gi, history_id)
-        with open("errored_galaxy.log", "w") as f:
-            f.write(log_contents_str(config))
-        raise Exception(msg)
-
-    return job_dict, datasets
-
-
-class GalaxyBaseRunResponse(SuccessfulRunResponse):
-
-    def __init__(
-        self,
-        ctx,
-        runnable,
-        user_gi,
-        history_id,
-        log,
-    ):
-        self._ctx = ctx
-        self._runnable = runnable
-        self._user_gi = user_gi
-        self._history_id = history_id
-        self._log = log
-
-        self._job_info = None
-
-        self._outputs_dict = None
-
-    def to_galaxy_output(self, output):
-        """Convert runnable output to a GalaxyOutput object.
-
-        Subclasses for workflow and tool execution override this.
-        """
-        raise NotImplementedError()
-
-    def _get_extra_files(self, dataset_details):
-        extra_files_url = "%s/%s/contents/%s/extra_files" % (
-            self._user_gi._make_url(self._user_gi.histories), self._history_id, dataset_details["id"]
-        )
-        extra_files = Client._get(self._user_gi.jobs, url=extra_files_url)
-        return extra_files
-
-    def _get_metadata(self, history_content_type, content_id):
-        if history_content_type == "dataset":
-            return self._user_gi.histories.show_dataset(
-                self._history_id,
-                content_id,
-            )
-        elif history_content_type == "dataset_collection":
-            return self._user_gi.histories.show_dataset_collection(
-                self._history_id,
-                content_id,
-            )
-        else:
-            raise Exception("Unknown history content type encountered [%s]" % history_content_type)
-
-    def collect_outputs(self, ctx, output_directory):
-        assert self._outputs_dict is None, "collect_outputs pre-condition violated"
-
-        outputs_dict = {}
-        if not output_directory:
-            # TODO: rather than creating a directory just use
-            # Galaxy paths if they are available in this
-            # configuration.
-            output_directory = tempfile.mkdtemp()
-
-        def get_dataset(dataset_details, filename=None):
-            parent_basename = dataset_details.get("cwl_file_name")
-            if not parent_basename:
-                parent_basename = dataset_details.get("name")
-            file_ext = dataset_details["file_ext"]
-            if file_ext == "directory":
-                # TODO: rename output_directory to outputs_directory because we can have output directories
-                # and this is confusing...
-                the_output_directory = os.path.join(output_directory, parent_basename)
-                safe_makedirs(the_output_directory)
-                destination = self.download_output_to(dataset_details, the_output_directory, filename=filename)
-            else:
-                destination = self.download_output_to(dataset_details, output_directory, filename=filename)
-            if filename is None:
-                basename = parent_basename
-            else:
-                basename = os.path.basename(filename)
-
-            return {"path": destination, "basename": basename}
-
-        ctx.vlog("collecting outputs to directory %s" % output_directory)
-        for runnable_output in get_outputs(self._runnable):
-            output_id = runnable_output.get_id()
-            if not output_id:
-                ctx.vlog("Workflow output identified without an ID (label), skipping")
-                continue
-            output_dict_value = None
-            if self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]:
-                galaxy_output = self.to_galaxy_output(runnable_output)
-                cwl_output = output_to_cwl_json(
-                    galaxy_output,
-                    self._get_metadata,
-                    get_dataset,
-                    self._get_extra_files,
-                    pseduo_location=True,
-                )
-                output_dict_value = cwl_output
-            else:
-                # TODO: deprecate this route for finding workflow outputs,
-                # it is a brittle and bad approach...
-                output_dataset_id = self.output_dataset_id(runnable_output)
-                dataset = self._get_metadata("dataset", output_dataset_id)
-                dataset_dict = get_dataset(dataset)
-                ctx.vlog("populated destination [%s]" % dataset_dict["path"])
-
-                if dataset["file_ext"] == "expression.json":
-                    with open(dataset_dict["path"], "r") as f:
-                        output_dict_value = json.load(f)
-                else:
-                    output_dict_value = output_properties(**dataset_dict)
-
-            outputs_dict[output_id] = output_dict_value
-
-        self._outputs_dict = outputs_dict
-        ctx.vlog("collected outputs [%s]" % self._outputs_dict)
-
-    @property
-    def log(self):
-        return self._log
-
-    @property
-    def job_info(self):
-        if self._job_info is not None:
-            return dict(
-                stdout=self._job_info["stdout"],
-                stderr=self._job_info["stderr"],
-                command_line=self._job_info["command_line"],
-            )
-        return None
-
-    @property
-    def outputs_dict(self):
-        return self._outputs_dict
-
-    def download_output_to(self, dataset_details, output_directory, filename=None):
-        if filename is None:
-            local_filename = dataset_details.get("cwl_file_name") or dataset_details.get("name")
-        else:
-            local_filename = filename
-        destination = os.path.join(output_directory, local_filename)
-        self._history_content_download(
-            self._history_id,
-            dataset_details["id"],
-            to_path=destination,
-            filename=filename,
-        )
-        return destination
-
-    def _history_content_download(self, history_id, dataset_id, to_path, filename=None):
-        user_gi = self._user_gi
-        url = user_gi.url + "/histories/%s/contents/%s/display" % (history_id, dataset_id)
-
-        data = {}
-        if filename:
-            data["filename"] = filename
-
-        r = requests.get(url, params=data, verify=user_gi.verify, stream=True, timeout=user_gi.timeout)
-        r.raise_for_status()
-
-        with open(to_path, 'wb') as fp:
-            for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE):
-                if chunk:
-                    fp.write(chunk)
-
-
-class GalaxyToolRunResponse(GalaxyBaseRunResponse):
-
-    def __init__(
-        self,
-        ctx,
-        runnable,
-        user_gi,
-        history_id,
-        log,
-        job_info,
-        api_run_response,
-    ):
-        super(GalaxyToolRunResponse, self).__init__(
-            ctx=ctx,
-            runnable=runnable,
-            user_gi=user_gi,
-            history_id=history_id,
-            log=log,
-        )
-        self._job_info = job_info
-        self.api_run_response = api_run_response
-
-    def is_collection(self, output):
-        # TODO: Make this more rigorous - search both output and output
-        # collections - throw an exception if not found in either place instead
-        # of just assuming all non-datasets are collections.
-        return self.output_dataset_id(output) is None
-
-    def to_galaxy_output(self, runnable_output):
-        output_id = runnable_output.get_id()
-        return tool_response_to_output(self.api_run_response, self._history_id, output_id)
-
-    def output_dataset_id(self, output):
-        outputs = self.api_run_response["outputs"]
-        output_id = output.get_id()
-        output_dataset_id = None
-        self._ctx.vlog("Looking for id [%s] in outputs [%s]" % (output_id, outputs))
-        for output in outputs:
-            if output["output_name"] == output_id:
-                output_dataset_id = output["id"]
-
-        return output_dataset_id
-
-
-class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse):
-
-    def __init__(
-        self,
-        ctx,
-        runnable,
-        user_gi,
-        history_id,
-        log,
-        workflow_id,
-        invocation_id,
-    ):
-        super(GalaxyWorkflowRunResponse, self).__init__(
-            ctx=ctx,
-            runnable=runnable,
-            user_gi=user_gi,
-            history_id=history_id,
-            log=log,
-        )
-        self._workflow_id = workflow_id
-        self._invocation_id = invocation_id
-
-    def to_galaxy_output(self, runnable_output):
-        output_id = runnable_output.get_id()
-        self._ctx.vlog("checking for output in invocation [%s]" % self._invocation)
-        return invocation_to_output(self._invocation, self._history_id, output_id)
-
-    def output_dataset_id(self, output):
-        invocation = self._invocation
-        if "outputs" in invocation:
-            # Use newer workflow outputs API.
-
-            output_name = output.get_id()
-            if output_name in invocation["outputs"]:
-                return invocation["outputs"][output.get_id()]["id"]
-            else:
-                raise Exception("Failed to find output [%s] in invocation outputs [%s]" % (output_name, invocation["outputs"]))
-        else:
-            # Assume the output knows its order_index and such - older line of
-            # development not worth persuing.
-            workflow_output = output.workflow_output
-            order_index = workflow_output.order_index
-
-            invocation_steps = invocation["steps"]
-            output_steps = [s for s in invocation_steps if s["order_index"] == order_index]
-            assert len(output_steps) == 1, "More than one step matching outputs, behavior undefined."
-            output_step = output_steps[0]
-            job_id = output_step["job_id"]
-            assert job_id, "Output doesn't define a job_id, behavior undefined."
-            job_info = self._user_gi.jobs.show_job(job_id, full_details=True)
-            job_outputs = job_info["outputs"]
-            output_name = workflow_output.output_name
-            assert output_name in job_outputs, "No output [%s] found for output job."
-            job_output = job_outputs[output_name]
-            assert "id" in job_output, "Job output [%s] does not contain 'id'." % job_output
-            return job_output["id"]
-
-    @property
-    def _invocation(self):
-        invocation = self._user_gi.workflows.show_invocation(
-            self._workflow_id,
-            self._invocation_id,
-        )
-        return invocation
-
-
-def _tool_id(tool_path):
-    tool_source = get_tool_source(tool_path)
-    return tool_source.parse_id()
-
-
-def _history_id(gi, **kwds):
-    history_id = kwds.get("history_id", None)
-    if history_id is None:
-        history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME)
-        history_id = gi.histories.create_history(history_name)["id"]
-    return history_id
-
-
-def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0):
-
-    def state_func():
-        if _retry_on_timeouts(ctx, gi, lambda gi: has_jobs_in_states(gi, history_id, ["error", "deleted", "deleted_new"])):
-            raise Exception("Problem running workflow, one or more jobs failed.")
-
-        return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))
-
-    return _wait_on_state(state_func, polling_backoff)
-
-
-def _retry_on_timeouts(ctx, gi, f):
-    gi.timeout = 60
-    try_count = 5
-    try:
-        for try_num in range(try_count):
-            start_time = time.time()
-            try:
-                return f(gi)
-            except Exception:
-                end_time = time.time()
-                if end_time - start_time > 45 and (try_num + 1) < try_count:
-                    ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.")
-                    continue
-                else:
-                    raise
-    finally:
-        gi.timeout = None
-
-
-def has_jobs_in_states(gi, history_id, states):
-    params = {"history_id": history_id}
-    jobs_url = gi._make_url(gi.jobs)
-    jobs = Client._get(gi.jobs, params=params, url=jobs_url)
-
-    target_jobs = [j for j in jobs if j["state"] in states]
-
-    return len(target_jobs) > 0
-
-
-def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
-
-    def has_active_jobs(gi):
-        if has_jobs_in_states(gi, history_id, ["new", "upload", "waiting", "queued", "running"]):
-            return True
-        else:
-            return None
-
-    timeout = 60 * 60 * 24
-    wait_on(lambda: _retry_on_timeouts(ctx, gi, has_active_jobs), "active jobs", timeout, polling_backoff)
-
-    def state_func():
-        return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))
-
-    return _wait_on_state(state_func, polling_backoff)
-
-
-def _wait_for_job(gi, job_id):
-    def state_func():
-        return gi.jobs.show_job(job_id, full_details=True)
-
-    return _wait_on_state(state_func)
-
-
-def _wait_on_state(state_func, polling_backoff=0):
-
-    def get_state():
-        response = state_func()
-        state = response["state"]
-        if str(state) not in ["running", "queued", "new", "ready"]:
-            return state
-        else:
-            return None
-    timeout = 60 * 60 * 24
-    final_state = wait_on(get_state, "state", timeout, polling_backoff)
-    return final_state
-
-
-__all__ = (
-    "execute",
-)