Mercurial > repos > iuc > data_manager_pangolin_data
comparison data_manager/pangolin_data_dm.py @ 0:33158d21324d draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_pangolin_data commit 902cce06e30ffe8ccba5dc0c3b704eb39fb4c611
author | iuc |
---|---|
date | Wed, 20 Jul 2022 21:02:43 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:33158d21324d |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 import argparse | |
4 import datetime | |
5 import json | |
6 import operator | |
7 import pathlib | |
8 import shutil | |
9 import subprocess | |
10 import sys | |
11 import tempfile | |
12 from io import StringIO | |
13 from typing import Generator, TextIO | |
14 | |
15 import requests | |
16 | |
17 | |
18 def parse_date(d: str) -> datetime.datetime: | |
19 # Parses the publication date from the GitHub API or user input into a datetime object. | |
20 date = None | |
21 try: | |
22 date = datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%SZ") | |
23 except ValueError: | |
24 date = datetime.datetime.strptime(d, "%Y-%m-%d") | |
25 return date | |
26 | |
27 | |
28 def get_model_list(package: str) -> Generator[dict, None, None]: | |
29 page_num = 0 | |
30 while True: | |
31 url = f"https://api.github.com/repos/cov-lineages/{package}/releases" | |
32 page_num += 1 | |
33 response = requests.get(url + f"?page={page_num}") | |
34 if response.status_code == 200: | |
35 release_list_chunk = json.loads(response.text) | |
36 if not release_list_chunk: | |
37 # past the last page of results | |
38 return | |
39 for e in release_list_chunk: | |
40 if e["prerelease"]: | |
41 continue | |
42 yield dict( | |
43 tag_name=e["tag_name"], | |
44 name=e["name"], | |
45 date=parse_date(e["published_at"]), | |
46 tarball_url=e["tarball_url"], | |
47 ) | |
48 else: | |
49 response.raise_for_status() | |
50 | |
51 | |
52 def download_and_unpack( | |
53 dependency: str, release: str, output_directory: str | |
54 ) -> pathlib.Path: | |
55 url = f"git+https://github.com/cov-lineages/{dependency}.git@{release}" | |
56 dependency_package_name = dependency.replace("-", "_") | |
57 output_path = pathlib.Path(output_directory) / dependency_package_name / release | |
58 with tempfile.TemporaryDirectory() as tmpdir: | |
59 pip_command = [ | |
60 sys.executable, | |
61 "-m", | |
62 "pip", | |
63 "install", | |
64 "--upgrade", | |
65 "--target", | |
66 tmpdir, | |
67 url, | |
68 ] | |
69 # output is saved in tmpdir/dependency, final output needs to be | |
70 # in output_directory/dependency/release | |
71 subprocess.run(pip_command, check=True) | |
72 shutil.move( | |
73 str(pathlib.Path(tmpdir) / dependency_package_name), str(output_path) | |
74 ) | |
75 return output_path | |
76 | |
77 | |
78 def fetch_compatibility_info( | |
79 package_name: str, | |
80 url: str = "https://raw.githubusercontent.com/cov-lineages/pangolin/master/pangolin/data/data_compatibility.csv", | |
81 ) -> list[dict[str, str]]: | |
82 response = requests.get(url) | |
83 if response.status_code == 200: | |
84 compatibility = read_compatibility_info(StringIO(response.text), package_name) | |
85 return compatibility | |
86 else: | |
87 return {} | |
88 | |
89 | |
90 def read_compatibility_info( | |
91 input_file: TextIO, package_name: str | |
92 ) -> list[dict[str, str]]: | |
93 compatibility = {} | |
94 for line in input_file: | |
95 fields = line.strip().split(",") | |
96 if fields[0] != package_name: | |
97 continue | |
98 if package_name == "constellations": | |
99 compatibility[fields[1]] = fields[3] | |
100 else: | |
101 # for pangolin-data and pangolin-assignment | |
102 compatibility[fields[1]] = fields[2] | |
103 return compatibility | |
104 | |
105 | |
106 def comma_split(args: str) -> list[str]: | |
107 return args.split(",") | |
108 | |
109 | |
110 def git_lfs_install(): | |
111 """ | |
112 'git-lfs install' must be run after installing git-lfs and before cloning a repo | |
113 that uses Git LFS. Code taken from pangolin repo. | |
114 """ | |
115 try: | |
116 subprocess.run( | |
117 ["git-lfs", "install"], | |
118 check=True, | |
119 stdout=subprocess.PIPE, | |
120 stderr=subprocess.PIPE, | |
121 ) | |
122 except subprocess.CalledProcessError as e: | |
123 stderr = e.stderr.decode("utf-8") | |
124 sys.stderr.write(f"Error: {e}:\n{stderr}\n") | |
125 sys.exit(-1) | |
126 | |
127 | |
128 if __name__ == "__main__": | |
129 | |
130 parser = argparse.ArgumentParser() | |
131 parser.add_argument("--latest", default=False, action="store_true") | |
132 parser.add_argument("--version_compatibility_file", type=argparse.FileType()) | |
133 parser.add_argument("--versions", type=comma_split) | |
134 parser.add_argument("--overwrite", default=False, action="store_true") | |
135 parser.add_argument("--known_revisions", type=comma_split) | |
136 parser.add_argument("datatable_name") | |
137 parser.add_argument("datatable_cache_filename") | |
138 parser.add_argument("galaxy_config") | |
139 args = parser.parse_args() | |
140 | |
141 if args.datatable_name == "pangolin_data": | |
142 package_name = "pangolin-data" | |
143 min_version_key = "min_pangolin_version" | |
144 elif args.datatable_name == "pangolin_constellations": | |
145 package_name = "constellations" | |
146 min_version_key = "min_scorpio_version" | |
147 elif args.datatable_name == "pangolin_assignment": | |
148 package_name = "pangolin-assignment" | |
149 min_version_key = "min_pangolin_version" | |
150 git_lfs_install() | |
151 else: | |
152 sys.exit(f"Unknown data table {args.datatable_name}") | |
153 | |
154 with open(args.galaxy_config) as fh: | |
155 config = json.load(fh) | |
156 | |
157 output_directory = config.get("output_data", [{}])[0].get("extra_files_path", None) | |
158 | |
159 try: | |
160 with open(args.datatable_cache_filename) as fh: | |
161 data_manager_dict = json.load(fh) | |
162 except IOError: | |
163 # on the first run this file doesn't exist | |
164 data_manager_dict = {} | |
165 | |
166 if "data_tables" in data_manager_dict: | |
167 if args.datatable_name not in data_manager_dict["data_tables"]: | |
168 # got a data_tables entry, probably from a previous run of this script, | |
169 # but no entry for this specific data table | |
170 data_manager_dict["data_tables"][args.datatable_name] = [] | |
171 else: | |
172 # got no entry for data tables, start from scratch | |
173 data_manager_dict = {"data_tables": {args.datatable_name: []}} | |
174 | |
175 # known-revisions is populated from the Galaxy `pangolin_data` data table by the wrapper | |
176 if args.known_revisions is not None: | |
177 existing_release_tags = set(args.known_revisions) | |
178 else: | |
179 existing_release_tags = set() | |
180 if args.latest: | |
181 compatibility = fetch_compatibility_info(package_name) | |
182 for latest_release in get_model_list(package_name): | |
183 # choose the first release for which we have compatibility info | |
184 version = latest_release["tag_name"].lstrip("v.") | |
185 if version in compatibility: | |
186 latest_release[min_version_key] = compatibility[version] | |
187 break | |
188 if latest_release["tag_name"] in existing_release_tags: | |
189 releases = [] | |
190 else: | |
191 releases = [latest_release] | |
192 else: | |
193 compatibility = read_compatibility_info( | |
194 args.version_compatibility_file, package_name | |
195 ) | |
196 downloadable_releases = get_model_list(package_name) | |
197 releases_wanted = set(args.versions) - set( | |
198 [tag.lstrip("v.") for tag in existing_release_tags] | |
199 ) | |
200 releases = [] | |
201 for release in downloadable_releases: | |
202 version = release["tag_name"].lstrip("v.") | |
203 if version in releases_wanted: | |
204 if version in compatibility: | |
205 # only add the releases for which we have compatibility info | |
206 release[min_version_key] = compatibility[version] | |
207 releases.append(release) | |
208 releases_wanted.remove(version) | |
209 if not releases_wanted: | |
210 # we've found all the releases we want | |
211 break | |
212 if releases_wanted: | |
213 missing_releases = " ".join(releases_wanted) | |
214 sys.exit( | |
215 f"Some of the requested releases ({missing_releases}) are not available." | |
216 ) | |
217 | |
218 for release in releases: | |
219 fname = download_and_unpack(package_name, release["tag_name"], output_directory) | |
220 if fname is not None: | |
221 data_manager_dict["data_tables"][args.datatable_name].append( | |
222 { | |
223 "value": release["tag_name"], | |
224 "description": release["name"], | |
225 min_version_key: release[min_version_key], | |
226 "date": release["date"].isoformat(), # ISO 8601 is easily sortable | |
227 "path": str(output_directory / fname), | |
228 } | |
229 ) | |
230 data_manager_dict["data_tables"][args.datatable_name].sort( | |
231 key=operator.itemgetter("value"), reverse=True | |
232 ) | |
233 with open(args.datatable_cache_filename, "w") as fh: | |
234 json.dump(data_manager_dict, fh, indent=2, sort_keys=True) |