Mercurial > repos > iuc > data_manager_pangolin_data
diff data_manager/pangolin_data_dm.py @ 0:33158d21324d draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_pangolin_data commit 902cce06e30ffe8ccba5dc0c3b704eb39fb4c611
author | iuc |
---|---|
date | Wed, 20 Jul 2022 21:02:43 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/data_manager/pangolin_data_dm.py Wed Jul 20 21:02:43 2022 +0000 @@ -0,0 +1,234 @@ +#!/usr/bin/env python + +import argparse +import datetime +import json +import operator +import pathlib +import shutil +import subprocess +import sys +import tempfile +from io import StringIO +from typing import Generator, TextIO + +import requests + + +def parse_date(d: str) -> datetime.datetime: + # Parses the publication date from the GitHub API or user input into a datetime object. + date = None + try: + date = datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + date = datetime.datetime.strptime(d, "%Y-%m-%d") + return date + + +def get_model_list(package: str) -> Generator[dict, None, None]: + page_num = 0 + while True: + url = f"https://api.github.com/repos/cov-lineages/{package}/releases" + page_num += 1 + response = requests.get(url + f"?page={page_num}") + if response.status_code == 200: + release_list_chunk = json.loads(response.text) + if not release_list_chunk: + # past the last page of results + return + for e in release_list_chunk: + if e["prerelease"]: + continue + yield dict( + tag_name=e["tag_name"], + name=e["name"], + date=parse_date(e["published_at"]), + tarball_url=e["tarball_url"], + ) + else: + response.raise_for_status() + + +def download_and_unpack( + dependency: str, release: str, output_directory: str +) -> pathlib.Path: + url = f"git+https://github.com/cov-lineages/{dependency}.git@{release}" + dependency_package_name = dependency.replace("-", "_") + output_path = pathlib.Path(output_directory) / dependency_package_name / release + with tempfile.TemporaryDirectory() as tmpdir: + pip_command = [ + sys.executable, + "-m", + "pip", + "install", + "--upgrade", + "--target", + tmpdir, + url, + ] + # output is saved in tmpdir/dependency, final output needs to be + # in output_directory/dependency/release + subprocess.run(pip_command, check=True) + shutil.move( + str(pathlib.Path(tmpdir) / dependency_package_name), str(output_path) + ) + return output_path + + +def fetch_compatibility_info( + package_name: str, + url: str = "https://raw.githubusercontent.com/cov-lineages/pangolin/master/pangolin/data/data_compatibility.csv", +) -> list[dict[str, str]]: + response = requests.get(url) + if response.status_code == 200: + compatibility = read_compatibility_info(StringIO(response.text), package_name) + return compatibility + else: + return {} + + +def read_compatibility_info( + input_file: TextIO, package_name: str +) -> list[dict[str, str]]: + compatibility = {} + for line in input_file: + fields = line.strip().split(",") + if fields[0] != package_name: + continue + if package_name == "constellations": + compatibility[fields[1]] = fields[3] + else: + # for pangolin-data and pangolin-assignment + compatibility[fields[1]] = fields[2] + return compatibility + + +def comma_split(args: str) -> list[str]: + return args.split(",") + + +def git_lfs_install(): + """ + 'git-lfs install' must be run after installing git-lfs and before cloning a repo + that uses Git LFS. Code taken from pangolin repo. + """ + try: + subprocess.run( + ["git-lfs", "install"], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + except subprocess.CalledProcessError as e: + stderr = e.stderr.decode("utf-8") + sys.stderr.write(f"Error: {e}:\n{stderr}\n") + sys.exit(-1) + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser() + parser.add_argument("--latest", default=False, action="store_true") + parser.add_argument("--version_compatibility_file", type=argparse.FileType()) + parser.add_argument("--versions", type=comma_split) + parser.add_argument("--overwrite", default=False, action="store_true") + parser.add_argument("--known_revisions", type=comma_split) + parser.add_argument("datatable_name") + parser.add_argument("datatable_cache_filename") + parser.add_argument("galaxy_config") + args = parser.parse_args() + + if args.datatable_name == "pangolin_data": + package_name = "pangolin-data" + min_version_key = "min_pangolin_version" + elif args.datatable_name == "pangolin_constellations": + package_name = "constellations" + min_version_key = "min_scorpio_version" + elif args.datatable_name == "pangolin_assignment": + package_name = "pangolin-assignment" + min_version_key = "min_pangolin_version" + git_lfs_install() + else: + sys.exit(f"Unknown data table {args.datatable_name}") + + with open(args.galaxy_config) as fh: + config = json.load(fh) + + output_directory = config.get("output_data", [{}])[0].get("extra_files_path", None) + + try: + with open(args.datatable_cache_filename) as fh: + data_manager_dict = json.load(fh) + except IOError: + # on the first run this file doesn't exist + data_manager_dict = {} + + if "data_tables" in data_manager_dict: + if args.datatable_name not in data_manager_dict["data_tables"]: + # got a data_tables entry, probably from a previous run of this script, + # but no entry for this specific data table + data_manager_dict["data_tables"][args.datatable_name] = [] + else: + # got no entry for data tables, start from scratch + data_manager_dict = {"data_tables": {args.datatable_name: []}} + + # known-revisions is populated from the Galaxy `pangolin_data` data table by the wrapper + if args.known_revisions is not None: + existing_release_tags = set(args.known_revisions) + else: + existing_release_tags = set() + if args.latest: + compatibility = fetch_compatibility_info(package_name) + for latest_release in get_model_list(package_name): + # choose the first release for which we have compatibility info + version = latest_release["tag_name"].lstrip("v.") + if version in compatibility: + latest_release[min_version_key] = compatibility[version] + break + if latest_release["tag_name"] in existing_release_tags: + releases = [] + else: + releases = [latest_release] + else: + compatibility = read_compatibility_info( + args.version_compatibility_file, package_name + ) + downloadable_releases = get_model_list(package_name) + releases_wanted = set(args.versions) - set( + [tag.lstrip("v.") for tag in existing_release_tags] + ) + releases = [] + for release in downloadable_releases: + version = release["tag_name"].lstrip("v.") + if version in releases_wanted: + if version in compatibility: + # only add the releases for which we have compatibility info + release[min_version_key] = compatibility[version] + releases.append(release) + releases_wanted.remove(version) + if not releases_wanted: + # we've found all the releases we want + break + if releases_wanted: + missing_releases = " ".join(releases_wanted) + sys.exit( + f"Some of the requested releases ({missing_releases}) are not available." + ) + + for release in releases: + fname = download_and_unpack(package_name, release["tag_name"], output_directory) + if fname is not None: + data_manager_dict["data_tables"][args.datatable_name].append( + { + "value": release["tag_name"], + "description": release["name"], + min_version_key: release[min_version_key], + "date": release["date"].isoformat(), # ISO 8601 is easily sortable + "path": str(output_directory / fname), + } + ) + data_manager_dict["data_tables"][args.datatable_name].sort( + key=operator.itemgetter("value"), reverse=True + ) + with open(args.datatable_cache_filename, "w") as fh: + json.dump(data_manager_dict, fh, indent=2, sort_keys=True)