comparison data_manager/pangolin_data_dm.py @ 0:33158d21324d draft default tip

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_pangolin_data commit 902cce06e30ffe8ccba5dc0c3b704eb39fb4c611
author iuc
date Wed, 20 Jul 2022 21:02:43 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:33158d21324d
1 #!/usr/bin/env python
2
3 import argparse
4 import datetime
5 import json
6 import operator
7 import pathlib
8 import shutil
9 import subprocess
10 import sys
11 import tempfile
12 from io import StringIO
13 from typing import Generator, TextIO
14
15 import requests
16
17
18 def parse_date(d: str) -> datetime.datetime:
19 # Parses the publication date from the GitHub API or user input into a datetime object.
20 date = None
21 try:
22 date = datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%SZ")
23 except ValueError:
24 date = datetime.datetime.strptime(d, "%Y-%m-%d")
25 return date
26
27
28 def get_model_list(package: str) -> Generator[dict, None, None]:
29 page_num = 0
30 while True:
31 url = f"https://api.github.com/repos/cov-lineages/{package}/releases"
32 page_num += 1
33 response = requests.get(url + f"?page={page_num}")
34 if response.status_code == 200:
35 release_list_chunk = json.loads(response.text)
36 if not release_list_chunk:
37 # past the last page of results
38 return
39 for e in release_list_chunk:
40 if e["prerelease"]:
41 continue
42 yield dict(
43 tag_name=e["tag_name"],
44 name=e["name"],
45 date=parse_date(e["published_at"]),
46 tarball_url=e["tarball_url"],
47 )
48 else:
49 response.raise_for_status()
50
51
52 def download_and_unpack(
53 dependency: str, release: str, output_directory: str
54 ) -> pathlib.Path:
55 url = f"git+https://github.com/cov-lineages/{dependency}.git@{release}"
56 dependency_package_name = dependency.replace("-", "_")
57 output_path = pathlib.Path(output_directory) / dependency_package_name / release
58 with tempfile.TemporaryDirectory() as tmpdir:
59 pip_command = [
60 sys.executable,
61 "-m",
62 "pip",
63 "install",
64 "--upgrade",
65 "--target",
66 tmpdir,
67 url,
68 ]
69 # output is saved in tmpdir/dependency, final output needs to be
70 # in output_directory/dependency/release
71 subprocess.run(pip_command, check=True)
72 shutil.move(
73 str(pathlib.Path(tmpdir) / dependency_package_name), str(output_path)
74 )
75 return output_path
76
77
78 def fetch_compatibility_info(
79 package_name: str,
80 url: str = "https://raw.githubusercontent.com/cov-lineages/pangolin/master/pangolin/data/data_compatibility.csv",
81 ) -> list[dict[str, str]]:
82 response = requests.get(url)
83 if response.status_code == 200:
84 compatibility = read_compatibility_info(StringIO(response.text), package_name)
85 return compatibility
86 else:
87 return {}
88
89
90 def read_compatibility_info(
91 input_file: TextIO, package_name: str
92 ) -> list[dict[str, str]]:
93 compatibility = {}
94 for line in input_file:
95 fields = line.strip().split(",")
96 if fields[0] != package_name:
97 continue
98 if package_name == "constellations":
99 compatibility[fields[1]] = fields[3]
100 else:
101 # for pangolin-data and pangolin-assignment
102 compatibility[fields[1]] = fields[2]
103 return compatibility
104
105
106 def comma_split(args: str) -> list[str]:
107 return args.split(",")
108
109
110 def git_lfs_install():
111 """
112 'git-lfs install' must be run after installing git-lfs and before cloning a repo
113 that uses Git LFS. Code taken from pangolin repo.
114 """
115 try:
116 subprocess.run(
117 ["git-lfs", "install"],
118 check=True,
119 stdout=subprocess.PIPE,
120 stderr=subprocess.PIPE,
121 )
122 except subprocess.CalledProcessError as e:
123 stderr = e.stderr.decode("utf-8")
124 sys.stderr.write(f"Error: {e}:\n{stderr}\n")
125 sys.exit(-1)
126
127
128 if __name__ == "__main__":
129
130 parser = argparse.ArgumentParser()
131 parser.add_argument("--latest", default=False, action="store_true")
132 parser.add_argument("--version_compatibility_file", type=argparse.FileType())
133 parser.add_argument("--versions", type=comma_split)
134 parser.add_argument("--overwrite", default=False, action="store_true")
135 parser.add_argument("--known_revisions", type=comma_split)
136 parser.add_argument("datatable_name")
137 parser.add_argument("datatable_cache_filename")
138 parser.add_argument("galaxy_config")
139 args = parser.parse_args()
140
141 if args.datatable_name == "pangolin_data":
142 package_name = "pangolin-data"
143 min_version_key = "min_pangolin_version"
144 elif args.datatable_name == "pangolin_constellations":
145 package_name = "constellations"
146 min_version_key = "min_scorpio_version"
147 elif args.datatable_name == "pangolin_assignment":
148 package_name = "pangolin-assignment"
149 min_version_key = "min_pangolin_version"
150 git_lfs_install()
151 else:
152 sys.exit(f"Unknown data table {args.datatable_name}")
153
154 with open(args.galaxy_config) as fh:
155 config = json.load(fh)
156
157 output_directory = config.get("output_data", [{}])[0].get("extra_files_path", None)
158
159 try:
160 with open(args.datatable_cache_filename) as fh:
161 data_manager_dict = json.load(fh)
162 except IOError:
163 # on the first run this file doesn't exist
164 data_manager_dict = {}
165
166 if "data_tables" in data_manager_dict:
167 if args.datatable_name not in data_manager_dict["data_tables"]:
168 # got a data_tables entry, probably from a previous run of this script,
169 # but no entry for this specific data table
170 data_manager_dict["data_tables"][args.datatable_name] = []
171 else:
172 # got no entry for data tables, start from scratch
173 data_manager_dict = {"data_tables": {args.datatable_name: []}}
174
175 # known-revisions is populated from the Galaxy `pangolin_data` data table by the wrapper
176 if args.known_revisions is not None:
177 existing_release_tags = set(args.known_revisions)
178 else:
179 existing_release_tags = set()
180 if args.latest:
181 compatibility = fetch_compatibility_info(package_name)
182 for latest_release in get_model_list(package_name):
183 # choose the first release for which we have compatibility info
184 version = latest_release["tag_name"].lstrip("v.")
185 if version in compatibility:
186 latest_release[min_version_key] = compatibility[version]
187 break
188 if latest_release["tag_name"] in existing_release_tags:
189 releases = []
190 else:
191 releases = [latest_release]
192 else:
193 compatibility = read_compatibility_info(
194 args.version_compatibility_file, package_name
195 )
196 downloadable_releases = get_model_list(package_name)
197 releases_wanted = set(args.versions) - set(
198 [tag.lstrip("v.") for tag in existing_release_tags]
199 )
200 releases = []
201 for release in downloadable_releases:
202 version = release["tag_name"].lstrip("v.")
203 if version in releases_wanted:
204 if version in compatibility:
205 # only add the releases for which we have compatibility info
206 release[min_version_key] = compatibility[version]
207 releases.append(release)
208 releases_wanted.remove(version)
209 if not releases_wanted:
210 # we've found all the releases we want
211 break
212 if releases_wanted:
213 missing_releases = " ".join(releases_wanted)
214 sys.exit(
215 f"Some of the requested releases ({missing_releases}) are not available."
216 )
217
218 for release in releases:
219 fname = download_and_unpack(package_name, release["tag_name"], output_directory)
220 if fname is not None:
221 data_manager_dict["data_tables"][args.datatable_name].append(
222 {
223 "value": release["tag_name"],
224 "description": release["name"],
225 min_version_key: release[min_version_key],
226 "date": release["date"].isoformat(), # ISO 8601 is easily sortable
227 "path": str(output_directory / fname),
228 }
229 )
230 data_manager_dict["data_tables"][args.datatable_name].sort(
231 key=operator.itemgetter("value"), reverse=True
232 )
233 with open(args.datatable_cache_filename, "w") as fh:
234 json.dump(data_manager_dict, fh, indent=2, sort_keys=True)