view data_manager/pangolin_data_dm.py @ 0:33158d21324d draft default tip

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_pangolin_data commit 902cce06e30ffe8ccba5dc0c3b704eb39fb4c611
author iuc
date Wed, 20 Jul 2022 21:02:43 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python

import argparse
import datetime
import json
import operator
import pathlib
import shutil
import subprocess
import sys
import tempfile
from io import StringIO
from typing import Generator, TextIO

import requests


def parse_date(d: str) -> datetime.datetime:
    # Parses the publication date from the GitHub API or user input into a datetime object.
    date = None
    try:
        date = datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%SZ")
    except ValueError:
        date = datetime.datetime.strptime(d, "%Y-%m-%d")
    return date


def get_model_list(package: str) -> Generator[dict, None, None]:
    page_num = 0
    while True:
        url = f"https://api.github.com/repos/cov-lineages/{package}/releases"
        page_num += 1
        response = requests.get(url + f"?page={page_num}")
        if response.status_code == 200:
            release_list_chunk = json.loads(response.text)
            if not release_list_chunk:
                # past the last page of results
                return
            for e in release_list_chunk:
                if e["prerelease"]:
                    continue
                yield dict(
                    tag_name=e["tag_name"],
                    name=e["name"],
                    date=parse_date(e["published_at"]),
                    tarball_url=e["tarball_url"],
                )
        else:
            response.raise_for_status()


def download_and_unpack(
    dependency: str, release: str, output_directory: str
) -> pathlib.Path:
    url = f"git+https://github.com/cov-lineages/{dependency}.git@{release}"
    dependency_package_name = dependency.replace("-", "_")
    output_path = pathlib.Path(output_directory) / dependency_package_name / release
    with tempfile.TemporaryDirectory() as tmpdir:
        pip_command = [
            sys.executable,
            "-m",
            "pip",
            "install",
            "--upgrade",
            "--target",
            tmpdir,
            url,
        ]
        # output is saved in tmpdir/dependency, final output needs to be
        # in output_directory/dependency/release
        subprocess.run(pip_command, check=True)
        shutil.move(
            str(pathlib.Path(tmpdir) / dependency_package_name), str(output_path)
        )
    return output_path


def fetch_compatibility_info(
    package_name: str,
    url: str = "https://raw.githubusercontent.com/cov-lineages/pangolin/master/pangolin/data/data_compatibility.csv",
) -> list[dict[str, str]]:
    response = requests.get(url)
    if response.status_code == 200:
        compatibility = read_compatibility_info(StringIO(response.text), package_name)
        return compatibility
    else:
        return {}


def read_compatibility_info(
    input_file: TextIO, package_name: str
) -> list[dict[str, str]]:
    compatibility = {}
    for line in input_file:
        fields = line.strip().split(",")
        if fields[0] != package_name:
            continue
        if package_name == "constellations":
            compatibility[fields[1]] = fields[3]
        else:
            # for pangolin-data and pangolin-assignment
            compatibility[fields[1]] = fields[2]
    return compatibility


def comma_split(args: str) -> list[str]:
    return args.split(",")


def git_lfs_install():
    """
    'git-lfs install' must be run after installing git-lfs and before cloning a repo
    that uses Git LFS. Code taken from pangolin repo.
    """
    try:
        subprocess.run(
            ["git-lfs", "install"],
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
    except subprocess.CalledProcessError as e:
        stderr = e.stderr.decode("utf-8")
        sys.stderr.write(f"Error: {e}:\n{stderr}\n")
        sys.exit(-1)


if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument("--latest", default=False, action="store_true")
    parser.add_argument("--version_compatibility_file", type=argparse.FileType())
    parser.add_argument("--versions", type=comma_split)
    parser.add_argument("--overwrite", default=False, action="store_true")
    parser.add_argument("--known_revisions", type=comma_split)
    parser.add_argument("datatable_name")
    parser.add_argument("datatable_cache_filename")
    parser.add_argument("galaxy_config")
    args = parser.parse_args()

    if args.datatable_name == "pangolin_data":
        package_name = "pangolin-data"
        min_version_key = "min_pangolin_version"
    elif args.datatable_name == "pangolin_constellations":
        package_name = "constellations"
        min_version_key = "min_scorpio_version"
    elif args.datatable_name == "pangolin_assignment":
        package_name = "pangolin-assignment"
        min_version_key = "min_pangolin_version"
        git_lfs_install()
    else:
        sys.exit(f"Unknown data table {args.datatable_name}")

    with open(args.galaxy_config) as fh:
        config = json.load(fh)

    output_directory = config.get("output_data", [{}])[0].get("extra_files_path", None)

    try:
        with open(args.datatable_cache_filename) as fh:
            data_manager_dict = json.load(fh)
    except IOError:
        # on the first run this file doesn't exist
        data_manager_dict = {}

    if "data_tables" in data_manager_dict:
        if args.datatable_name not in data_manager_dict["data_tables"]:
            # got a data_tables entry, probably from a previous run of this script,
            # but no entry for this specific data table
            data_manager_dict["data_tables"][args.datatable_name] = []
    else:
        # got no entry for data tables, start from scratch
        data_manager_dict = {"data_tables": {args.datatable_name: []}}

    # known-revisions is populated from the Galaxy `pangolin_data` data table by the wrapper
    if args.known_revisions is not None:
        existing_release_tags = set(args.known_revisions)
    else:
        existing_release_tags = set()
    if args.latest:
        compatibility = fetch_compatibility_info(package_name)
        for latest_release in get_model_list(package_name):
            # choose the first release for which we have compatibility info
            version = latest_release["tag_name"].lstrip("v.")
            if version in compatibility:
                latest_release[min_version_key] = compatibility[version]
                break
        if latest_release["tag_name"] in existing_release_tags:
            releases = []
        else:
            releases = [latest_release]
    else:
        compatibility = read_compatibility_info(
            args.version_compatibility_file, package_name
        )
        downloadable_releases = get_model_list(package_name)
        releases_wanted = set(args.versions) - set(
            [tag.lstrip("v.") for tag in existing_release_tags]
        )
        releases = []
        for release in downloadable_releases:
            version = release["tag_name"].lstrip("v.")
            if version in releases_wanted:
                if version in compatibility:
                    # only add the releases for which we have compatibility info
                    release[min_version_key] = compatibility[version]
                    releases.append(release)
                    releases_wanted.remove(version)
                    if not releases_wanted:
                        # we've found all the releases we want
                        break
        if releases_wanted:
            missing_releases = " ".join(releases_wanted)
            sys.exit(
                f"Some of the requested releases ({missing_releases}) are not available."
            )

    for release in releases:
        fname = download_and_unpack(package_name, release["tag_name"], output_directory)
        if fname is not None:
            data_manager_dict["data_tables"][args.datatable_name].append(
                {
                    "value": release["tag_name"],
                    "description": release["name"],
                    min_version_key: release[min_version_key],
                    "date": release["date"].isoformat(),  # ISO 8601 is easily sortable
                    "path": str(output_directory / fname),
                }
            )
    data_manager_dict["data_tables"][args.datatable_name].sort(
        key=operator.itemgetter("value"), reverse=True
    )
    with open(args.datatable_cache_filename, "w") as fh:
        json.dump(data_manager_dict, fh, indent=2, sort_keys=True)