view env/lib/python3.7/site-packages/planemo/galaxy/workflows.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

"""Utilities for Galaxy workflows."""
import json
import os
from collections import namedtuple

import yaml
from bioblend.galaxy.client import Client
from ephemeris import generate_tool_list_from_ga_workflow_files
from ephemeris import shed_tools
from gxformat2.converter import python_to_workflow
from gxformat2.interface import BioBlendImporterGalaxyInterface
from gxformat2.interface import ImporterGalaxyInterface

from planemo.io import warn

FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories."


def load_shed_repos(runnable):
    if runnable.type.name != "galaxy_workflow":
        return []

    path = runnable.path
    if path.endswith(".ga"):
        generate_tool_list_from_ga_workflow_files.generate_tool_list_from_workflow([path], "Tools from workflows", "tools.yml")
        with open("tools.yml", "r") as f:
            tools = yaml.safe_load(f)["tools"]

    else:
        # It'd be better to just infer this from the tool shed ID somehow than
        # require explicit annotation like this... I think?
        with open(path, "r") as f:
            workflow = yaml.safe_load(f)

        tools = workflow.get("tools", [])

    return tools


def install_shed_repos(runnable, admin_gi, ignore_dependency_problems):
    tools_info = load_shed_repos(runnable)
    if tools_info:
        install_tool_manager = shed_tools.InstallRepositoryManager(admin_gi)
        install_results = install_tool_manager.install_repositories(tools_info)
        if install_results.errored_repositories:
            if ignore_dependency_problems:
                warn(FAILED_REPOSITORIES_MESSAGE)
            else:
                raise Exception(FAILED_REPOSITORIES_MESSAGE)


def import_workflow(path, admin_gi, user_gi, from_path=False):
    """Import a workflow path to specified Galaxy instance."""
    if not from_path:
        importer = BioBlendImporterGalaxyInterface(
            admin_gi=admin_gi,
            user_gi=user_gi
        )
        workflow = _raw_dict(path, importer)
        return importer.import_workflow(workflow)
    else:
        # TODO: Update bioblend to allow from_path.
        path = os.path.abspath(path)
        payload = dict(
            from_path=path
        )
        workflows_url = user_gi._make_url(user_gi.workflows)
        workflow = Client._post(user_gi.workflows, payload, url=workflows_url)
        return workflow


def _raw_dict(path, importer=None):
    if path.endswith(".ga"):
        with open(path, "r") as f:
            workflow = json.load(f)
    else:
        if importer is None:
            importer = DummyImporterGalaxyInterface()

        workflow_directory = os.path.dirname(path)
        workflow_directory = os.path.abspath(workflow_directory)
        with open(path, "r") as f:
            workflow = yaml.safe_load(f)
            workflow = python_to_workflow(workflow, importer, workflow_directory)

    return workflow


def find_tool_ids(path):
    tool_ids = []
    workflow = _raw_dict(path)
    for (order_index, step) in workflow["steps"].items():
        tool_id = step.get("tool_id")
        tool_ids.append(tool_id)

    return tool_ids


WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label"])


def describe_outputs(path):
    """Return a list of :class:`WorkflowOutput` objects for target workflow."""
    workflow = _raw_dict(path)
    outputs = []
    for (order_index, step) in workflow["steps"].items():
        step_outputs = step.get("workflow_outputs", [])
        for step_output in step_outputs:
            output = WorkflowOutput(
                int(order_index),
                step_output["output_name"],
                step_output["label"],
            )
            outputs.append(output)
    return outputs


class DummyImporterGalaxyInterface(ImporterGalaxyInterface):

    def import_workflow(self, workflow, **kwds):
        return None


__all__ = (
    "import_workflow",
    "describe_outputs",
)