Mercurial > repos > iuc > data_manager_pangolin_data
comparison data_manager/pangolin_data_dm.py @ 0:33158d21324d draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_pangolin_data commit 902cce06e30ffe8ccba5dc0c3b704eb39fb4c611
| author | iuc |
|---|---|
| date | Wed, 20 Jul 2022 21:02:43 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:33158d21324d |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 | |
| 3 import argparse | |
| 4 import datetime | |
| 5 import json | |
| 6 import operator | |
| 7 import pathlib | |
| 8 import shutil | |
| 9 import subprocess | |
| 10 import sys | |
| 11 import tempfile | |
| 12 from io import StringIO | |
| 13 from typing import Generator, TextIO | |
| 14 | |
| 15 import requests | |
| 16 | |
| 17 | |
| 18 def parse_date(d: str) -> datetime.datetime: | |
| 19 # Parses the publication date from the GitHub API or user input into a datetime object. | |
| 20 date = None | |
| 21 try: | |
| 22 date = datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%SZ") | |
| 23 except ValueError: | |
| 24 date = datetime.datetime.strptime(d, "%Y-%m-%d") | |
| 25 return date | |
| 26 | |
| 27 | |
| 28 def get_model_list(package: str) -> Generator[dict, None, None]: | |
| 29 page_num = 0 | |
| 30 while True: | |
| 31 url = f"https://api.github.com/repos/cov-lineages/{package}/releases" | |
| 32 page_num += 1 | |
| 33 response = requests.get(url + f"?page={page_num}") | |
| 34 if response.status_code == 200: | |
| 35 release_list_chunk = json.loads(response.text) | |
| 36 if not release_list_chunk: | |
| 37 # past the last page of results | |
| 38 return | |
| 39 for e in release_list_chunk: | |
| 40 if e["prerelease"]: | |
| 41 continue | |
| 42 yield dict( | |
| 43 tag_name=e["tag_name"], | |
| 44 name=e["name"], | |
| 45 date=parse_date(e["published_at"]), | |
| 46 tarball_url=e["tarball_url"], | |
| 47 ) | |
| 48 else: | |
| 49 response.raise_for_status() | |
| 50 | |
| 51 | |
| 52 def download_and_unpack( | |
| 53 dependency: str, release: str, output_directory: str | |
| 54 ) -> pathlib.Path: | |
| 55 url = f"git+https://github.com/cov-lineages/{dependency}.git@{release}" | |
| 56 dependency_package_name = dependency.replace("-", "_") | |
| 57 output_path = pathlib.Path(output_directory) / dependency_package_name / release | |
| 58 with tempfile.TemporaryDirectory() as tmpdir: | |
| 59 pip_command = [ | |
| 60 sys.executable, | |
| 61 "-m", | |
| 62 "pip", | |
| 63 "install", | |
| 64 "--upgrade", | |
| 65 "--target", | |
| 66 tmpdir, | |
| 67 url, | |
| 68 ] | |
| 69 # output is saved in tmpdir/dependency, final output needs to be | |
| 70 # in output_directory/dependency/release | |
| 71 subprocess.run(pip_command, check=True) | |
| 72 shutil.move( | |
| 73 str(pathlib.Path(tmpdir) / dependency_package_name), str(output_path) | |
| 74 ) | |
| 75 return output_path | |
| 76 | |
| 77 | |
| 78 def fetch_compatibility_info( | |
| 79 package_name: str, | |
| 80 url: str = "https://raw.githubusercontent.com/cov-lineages/pangolin/master/pangolin/data/data_compatibility.csv", | |
| 81 ) -> list[dict[str, str]]: | |
| 82 response = requests.get(url) | |
| 83 if response.status_code == 200: | |
| 84 compatibility = read_compatibility_info(StringIO(response.text), package_name) | |
| 85 return compatibility | |
| 86 else: | |
| 87 return {} | |
| 88 | |
| 89 | |
| 90 def read_compatibility_info( | |
| 91 input_file: TextIO, package_name: str | |
| 92 ) -> list[dict[str, str]]: | |
| 93 compatibility = {} | |
| 94 for line in input_file: | |
| 95 fields = line.strip().split(",") | |
| 96 if fields[0] != package_name: | |
| 97 continue | |
| 98 if package_name == "constellations": | |
| 99 compatibility[fields[1]] = fields[3] | |
| 100 else: | |
| 101 # for pangolin-data and pangolin-assignment | |
| 102 compatibility[fields[1]] = fields[2] | |
| 103 return compatibility | |
| 104 | |
| 105 | |
| 106 def comma_split(args: str) -> list[str]: | |
| 107 return args.split(",") | |
| 108 | |
| 109 | |
| 110 def git_lfs_install(): | |
| 111 """ | |
| 112 'git-lfs install' must be run after installing git-lfs and before cloning a repo | |
| 113 that uses Git LFS. Code taken from pangolin repo. | |
| 114 """ | |
| 115 try: | |
| 116 subprocess.run( | |
| 117 ["git-lfs", "install"], | |
| 118 check=True, | |
| 119 stdout=subprocess.PIPE, | |
| 120 stderr=subprocess.PIPE, | |
| 121 ) | |
| 122 except subprocess.CalledProcessError as e: | |
| 123 stderr = e.stderr.decode("utf-8") | |
| 124 sys.stderr.write(f"Error: {e}:\n{stderr}\n") | |
| 125 sys.exit(-1) | |
| 126 | |
| 127 | |
| 128 if __name__ == "__main__": | |
| 129 | |
| 130 parser = argparse.ArgumentParser() | |
| 131 parser.add_argument("--latest", default=False, action="store_true") | |
| 132 parser.add_argument("--version_compatibility_file", type=argparse.FileType()) | |
| 133 parser.add_argument("--versions", type=comma_split) | |
| 134 parser.add_argument("--overwrite", default=False, action="store_true") | |
| 135 parser.add_argument("--known_revisions", type=comma_split) | |
| 136 parser.add_argument("datatable_name") | |
| 137 parser.add_argument("datatable_cache_filename") | |
| 138 parser.add_argument("galaxy_config") | |
| 139 args = parser.parse_args() | |
| 140 | |
| 141 if args.datatable_name == "pangolin_data": | |
| 142 package_name = "pangolin-data" | |
| 143 min_version_key = "min_pangolin_version" | |
| 144 elif args.datatable_name == "pangolin_constellations": | |
| 145 package_name = "constellations" | |
| 146 min_version_key = "min_scorpio_version" | |
| 147 elif args.datatable_name == "pangolin_assignment": | |
| 148 package_name = "pangolin-assignment" | |
| 149 min_version_key = "min_pangolin_version" | |
| 150 git_lfs_install() | |
| 151 else: | |
| 152 sys.exit(f"Unknown data table {args.datatable_name}") | |
| 153 | |
| 154 with open(args.galaxy_config) as fh: | |
| 155 config = json.load(fh) | |
| 156 | |
| 157 output_directory = config.get("output_data", [{}])[0].get("extra_files_path", None) | |
| 158 | |
| 159 try: | |
| 160 with open(args.datatable_cache_filename) as fh: | |
| 161 data_manager_dict = json.load(fh) | |
| 162 except IOError: | |
| 163 # on the first run this file doesn't exist | |
| 164 data_manager_dict = {} | |
| 165 | |
| 166 if "data_tables" in data_manager_dict: | |
| 167 if args.datatable_name not in data_manager_dict["data_tables"]: | |
| 168 # got a data_tables entry, probably from a previous run of this script, | |
| 169 # but no entry for this specific data table | |
| 170 data_manager_dict["data_tables"][args.datatable_name] = [] | |
| 171 else: | |
| 172 # got no entry for data tables, start from scratch | |
| 173 data_manager_dict = {"data_tables": {args.datatable_name: []}} | |
| 174 | |
| 175 # known-revisions is populated from the Galaxy `pangolin_data` data table by the wrapper | |
| 176 if args.known_revisions is not None: | |
| 177 existing_release_tags = set(args.known_revisions) | |
| 178 else: | |
| 179 existing_release_tags = set() | |
| 180 if args.latest: | |
| 181 compatibility = fetch_compatibility_info(package_name) | |
| 182 for latest_release in get_model_list(package_name): | |
| 183 # choose the first release for which we have compatibility info | |
| 184 version = latest_release["tag_name"].lstrip("v.") | |
| 185 if version in compatibility: | |
| 186 latest_release[min_version_key] = compatibility[version] | |
| 187 break | |
| 188 if latest_release["tag_name"] in existing_release_tags: | |
| 189 releases = [] | |
| 190 else: | |
| 191 releases = [latest_release] | |
| 192 else: | |
| 193 compatibility = read_compatibility_info( | |
| 194 args.version_compatibility_file, package_name | |
| 195 ) | |
| 196 downloadable_releases = get_model_list(package_name) | |
| 197 releases_wanted = set(args.versions) - set( | |
| 198 [tag.lstrip("v.") for tag in existing_release_tags] | |
| 199 ) | |
| 200 releases = [] | |
| 201 for release in downloadable_releases: | |
| 202 version = release["tag_name"].lstrip("v.") | |
| 203 if version in releases_wanted: | |
| 204 if version in compatibility: | |
| 205 # only add the releases for which we have compatibility info | |
| 206 release[min_version_key] = compatibility[version] | |
| 207 releases.append(release) | |
| 208 releases_wanted.remove(version) | |
| 209 if not releases_wanted: | |
| 210 # we've found all the releases we want | |
| 211 break | |
| 212 if releases_wanted: | |
| 213 missing_releases = " ".join(releases_wanted) | |
| 214 sys.exit( | |
| 215 f"Some of the requested releases ({missing_releases}) are not available." | |
| 216 ) | |
| 217 | |
| 218 for release in releases: | |
| 219 fname = download_and_unpack(package_name, release["tag_name"], output_directory) | |
| 220 if fname is not None: | |
| 221 data_manager_dict["data_tables"][args.datatable_name].append( | |
| 222 { | |
| 223 "value": release["tag_name"], | |
| 224 "description": release["name"], | |
| 225 min_version_key: release[min_version_key], | |
| 226 "date": release["date"].isoformat(), # ISO 8601 is easily sortable | |
| 227 "path": str(output_directory / fname), | |
| 228 } | |
| 229 ) | |
| 230 data_manager_dict["data_tables"][args.datatable_name].sort( | |
| 231 key=operator.itemgetter("value"), reverse=True | |
| 232 ) | |
| 233 with open(args.datatable_cache_filename, "w") as fh: | |
| 234 json.dump(data_manager_dict, fh, indent=2, sort_keys=True) |
