Mercurial > repos > bgruening > uniprot_rest_interface
comparison uniprot.py @ 9:468c71dac78a draft
planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools/uniprot_rest_interface commit da476148d1c609f5c26e880a3e593f0fa71ff2f6
author | bgruening |
---|---|
date | Wed, 22 May 2024 21:18:15 +0000 |
parents | af5eccf83605 |
children | 95fb5712344f |
comparison
equal
deleted
inserted
replaced
8:af5eccf83605 | 9:468c71dac78a |
---|---|
1 #!/usr/bin/env python | |
2 """ | |
3 uniprot python interface | |
4 to access the uniprot database | |
5 | |
6 Based on work from Jan Rudolph: https://github.com/jdrudolph/uniprot | |
7 available services: | |
8 map | |
9 retrieve | |
10 | |
11 rewitten using inspiration form: https://findwork.dev/blog/advanced-usage-python-requests-timeouts-retries-hooks/ | |
12 """ | |
13 import argparse | 1 import argparse |
2 import json | |
3 import re | |
14 import sys | 4 import sys |
5 import time | |
6 import zlib | |
7 from urllib.parse import ( | |
8 parse_qs, | |
9 urlencode, | |
10 urlparse, | |
11 ) | |
12 from xml.etree import ElementTree | |
15 | 13 |
16 import requests | 14 import requests |
17 from requests.adapters import HTTPAdapter | 15 from requests.adapters import ( |
18 from requests.packages.urllib3.util.retry import Retry | 16 HTTPAdapter, |
19 | 17 Retry, |
20 | |
21 DEFAULT_TIMEOUT = 5 # seconds | |
22 URL = 'https://legacy.uniprot.org/' | |
23 | |
24 retry_strategy = Retry( | |
25 total=5, | |
26 backoff_factor=2, | |
27 status_forcelist=[429, 500, 502, 503, 504], | |
28 allowed_methods=["HEAD", "GET", "OPTIONS", "POST"] | |
29 ) | 18 ) |
30 | 19 |
31 | 20 |
32 class TimeoutHTTPAdapter(HTTPAdapter): | 21 POLLING_INTERVAL = 3 |
33 def __init__(self, *args, **kwargs): | 22 API_URL = "https://rest.uniprot.org" |
34 self.timeout = DEFAULT_TIMEOUT | 23 |
35 if "timeout" in kwargs: | 24 |
36 self.timeout = kwargs["timeout"] | 25 retries = Retry(total=5, backoff_factor=0.25, status_forcelist=[500, 502, 503, 504]) |
37 del kwargs["timeout"] | 26 session = requests.Session() |
38 super().__init__(*args, **kwargs) | 27 session.mount("https://", HTTPAdapter(max_retries=retries)) |
39 | 28 |
40 def send(self, request, **kwargs): | 29 |
41 timeout = kwargs.get("timeout") | 30 def check_response(response): |
42 if timeout is None: | 31 try: |
43 kwargs["timeout"] = self.timeout | 32 response.raise_for_status() |
44 return super().send(request, **kwargs) | 33 except requests.HTTPError: |
45 | 34 print(response.json()) |
46 | 35 raise |
47 def _map(query, f, t, format='tab', chunk_size=100): | 36 |
48 """ _map is not meant for use with the python interface, use `map` instead | 37 |
49 """ | 38 def submit_id_mapping(from_db, to_db, ids): |
50 tool = 'uploadlists/' | 39 print(f"{from_db} {to_db}") |
51 data = {'format': format, 'from': f, 'to': t} | 40 request = requests.post( |
52 | 41 f"{API_URL}/idmapping/run", |
53 req = [] | 42 data={"from": from_db, "to": to_db, "ids": ",".join(ids)}, |
54 for i in range(0, len(query), chunk_size): | 43 ) |
55 q = query[i:i + chunk_size] | 44 check_response(request) |
56 req.append(dict([("url", URL + tool), | 45 return request.json()["jobId"] |
57 ('data', data), | 46 |
58 ("files", {'file': ' '.join(q)})])) | 47 |
59 return req | 48 def get_next_link(headers): |
60 response = requests.post(URL + tool, data=data) | 49 re_next_link = re.compile(r'<(.+)>; rel="next"') |
61 response.raise_for_status() | 50 if "Link" in headers: |
62 page = response.text | 51 match = re_next_link.match(headers["Link"]) |
63 if "The service is temporarily unavailable" in page: | 52 if match: |
64 exit("The UNIPROT service is temporarily unavailable. Please try again later.") | 53 return match.group(1) |
65 return page | 54 |
66 | 55 |
67 | 56 def check_id_mapping_results_ready(job_id): |
68 if __name__ == '__main__': | 57 while True: |
69 parser = argparse.ArgumentParser(description='retrieve uniprot mapping') | 58 request = session.get(f"{API_URL}/idmapping/status/{job_id}") |
70 subparsers = parser.add_subparsers(dest='tool') | 59 check_response(request) |
71 | 60 j = request.json() |
72 mapping = subparsers.add_parser('map') | 61 if "jobStatus" in j: |
73 mapping.add_argument('f', help='from') | 62 if j["jobStatus"] == "RUNNING": |
74 mapping.add_argument('t', help='to') | 63 print(f"Retrying in {POLLING_INTERVAL}s") |
75 mapping.add_argument('inp', nargs='?', type=argparse.FileType('r'), | 64 time.sleep(POLLING_INTERVAL) |
76 default=sys.stdin, help='input file (default: stdin)') | 65 else: |
77 mapping.add_argument('out', nargs='?', type=argparse.FileType('w'), | 66 raise Exception(j["jobStatus"]) |
78 default=sys.stdout, help='output file (default: stdout)') | 67 else: |
79 mapping.add_argument('--format', default='tab', help='output format') | 68 return bool(j["results"] or j["failedIds"]) |
80 | 69 |
81 retrieve = subparsers.add_parser('retrieve') | 70 |
82 retrieve.add_argument('inp', metavar='in', nargs='?', type=argparse.FileType('r'), | 71 def get_batch(batch_response, file_format, compressed): |
83 default=sys.stdin, help='input file (default: stdin)') | 72 batch_url = get_next_link(batch_response.headers) |
84 retrieve.add_argument('out', nargs='?', type=argparse.FileType('w'), | 73 while batch_url: |
85 default=sys.stdout, help='output file (default: stdout)') | 74 batch_response = session.get(batch_url) |
86 retrieve.add_argument('-f', '--format', help='specify output format', default='txt') | 75 batch_response.raise_for_status() |
76 yield decode_results(batch_response, file_format, compressed) | |
77 batch_url = get_next_link(batch_response.headers) | |
78 | |
79 | |
80 def combine_batches(all_results, batch_results, file_format): | |
81 if file_format == "json": | |
82 for key in ("results", "failedIds"): | |
83 if key in batch_results and batch_results[key]: | |
84 all_results[key] += batch_results[key] | |
85 elif file_format == "tsv": | |
86 return all_results + batch_results[1:] | |
87 else: | |
88 return all_results + batch_results | |
89 return all_results | |
90 | |
91 | |
92 def get_id_mapping_results_link(job_id): | |
93 url = f"{API_URL}/idmapping/details/{job_id}" | |
94 request = session.get(url) | |
95 check_response(request) | |
96 return request.json()["redirectURL"] | |
97 | |
98 | |
99 def decode_results(response, file_format, compressed): | |
100 if compressed: | |
101 decompressed = zlib.decompress(response.content, 16 + zlib.MAX_WBITS) | |
102 if file_format == "json": | |
103 j = json.loads(decompressed.decode("utf-8")) | |
104 return j | |
105 elif file_format == "tsv": | |
106 return [line for line in decompressed.decode("utf-8").split("\n") if line] | |
107 elif file_format == "xlsx": | |
108 return [decompressed] | |
109 elif file_format == "xml": | |
110 return [decompressed.decode("utf-8")] | |
111 else: | |
112 return decompressed.decode("utf-8") | |
113 elif file_format == "json": | |
114 return response.json() | |
115 elif file_format == "tsv": | |
116 return [line for line in response.text.split("\n") if line] | |
117 elif file_format == "xlsx": | |
118 return [response.content] | |
119 elif file_format == "xml": | |
120 return [response.text] | |
121 return response.text | |
122 | |
123 | |
124 def get_xml_namespace(element): | |
125 m = re.match(r"\{(.*)\}", element.tag) | |
126 return m.groups()[0] if m else "" | |
127 | |
128 | |
129 def merge_xml_results(xml_results): | |
130 merged_root = ElementTree.fromstring(xml_results[0]) | |
131 for result in xml_results[1:]: | |
132 root = ElementTree.fromstring(result) | |
133 for child in root.findall("{http://uniprot.org/uniprot}entry"): | |
134 merged_root.insert(-1, child) | |
135 ElementTree.register_namespace("", get_xml_namespace(merged_root[0])) | |
136 return ElementTree.tostring(merged_root, encoding="utf-8", xml_declaration=True) | |
137 | |
138 | |
139 def print_progress_batches(batch_index, size, total): | |
140 n_fetched = min((batch_index + 1) * size, total) | |
141 print(f"Fetched: {n_fetched} / {total}") | |
142 | |
143 | |
144 def get_id_mapping_results_search(url): | |
145 parsed = urlparse(url) | |
146 query = parse_qs(parsed.query) | |
147 file_format = query["format"][0] if "format" in query else "json" | |
148 if "size" in query: | |
149 size = int(query["size"][0]) | |
150 else: | |
151 size = 500 | |
152 query["size"] = size | |
153 compressed = ( | |
154 query["compressed"][0].lower() == "true" if "compressed" in query else False | |
155 ) | |
156 parsed = parsed._replace(query=urlencode(query, doseq=True)) | |
157 url = parsed.geturl() | |
158 request = session.get(url) | |
159 check_response(request) | |
160 results = decode_results(request, file_format, compressed) | |
161 total = int(request.headers["x-total-results"]) | |
162 print_progress_batches(0, size, total) | |
163 for i, batch in enumerate(get_batch(request, file_format, compressed), 1): | |
164 results = combine_batches(results, batch, file_format) | |
165 print_progress_batches(i, size, total) | |
166 if file_format == "xml": | |
167 return merge_xml_results(results) | |
168 return results | |
169 | |
170 | |
171 # print(results) | |
172 # {'results': [{'from': 'P05067', 'to': 'CHEMBL2487'}], 'failedIds': ['P12345']} | |
173 | |
174 if __name__ == "__main__": | |
175 parser = argparse.ArgumentParser(description="retrieve uniprot mapping") | |
176 subparsers = parser.add_subparsers(dest="tool") | |
177 | |
178 mapping = subparsers.add_parser("map") | |
179 mapping.add_argument("f", help="from") | |
180 mapping.add_argument("t", help="to") | |
181 mapping.add_argument( | |
182 "inp", | |
183 nargs="?", | |
184 type=argparse.FileType("r"), | |
185 default=sys.stdin, | |
186 help="input file (default: stdin)", | |
187 ) | |
188 mapping.add_argument( | |
189 "out", | |
190 nargs="?", | |
191 type=argparse.FileType("w"), | |
192 default=sys.stdout, | |
193 help="output file (default: stdout)", | |
194 ) | |
195 mapping.add_argument("--format", default="tab", help="output format") | |
196 | |
197 retrieve = subparsers.add_parser("retrieve") | |
198 retrieve.add_argument( | |
199 "inp", | |
200 metavar="in", | |
201 nargs="?", | |
202 type=argparse.FileType("r"), | |
203 default=sys.stdin, | |
204 help="input file (default: stdin)", | |
205 ) | |
206 retrieve.add_argument( | |
207 "out", | |
208 nargs="?", | |
209 type=argparse.FileType("w"), | |
210 default=sys.stdout, | |
211 help="output file (default: stdout)", | |
212 ) | |
213 retrieve.add_argument("-f", "--format", help="specify output format", default="txt") | |
214 mapping = subparsers.add_parser("menu") | |
87 | 215 |
88 args = parser.parse_args() | 216 args = parser.parse_args() |
217 | |
218 # code for auto generating the from - to conditional | |
219 if args.tool == "menu": | |
220 from lxml import etree | |
221 | |
222 request = session.get("https://rest.uniprot.org/configure/idmapping/fields") | |
223 check_response(request) | |
224 fields = request.json() | |
225 | |
226 tos = dict() | |
227 from_cond = etree.Element("conditional", name="from_cond") | |
228 from_select = etree.SubElement( | |
229 from_cond, "param", name="from", type="select", label="Source database:" | |
230 ) | |
231 | |
232 rules = dict() | |
233 for rule in fields["rules"]: | |
234 rules[rule["ruleId"]] = rule["tos"] | |
235 | |
236 for group in fields["groups"]: | |
237 group_name = group["groupName"] | |
238 group_name = group_name.replace("databases", "DBs") | |
239 for item in group["items"]: | |
240 if item["to"]: | |
241 tos[item["name"]] = f"{group_name} - {item['displayName']}" | |
242 | |
243 for group in fields["groups"]: | |
244 group_name = group["groupName"] | |
245 group_name = group_name.replace("databases", "DBs") | |
246 for item in group["items"]: | |
247 if not item["from"]: | |
248 continue | |
249 option = etree.SubElement(from_select, "option", value=item["name"]) | |
250 option.text = f"{group_name} - {item['displayName']}" | |
251 when = etree.SubElement(from_cond, "when", value=item["name"]) | |
252 | |
253 to_select = etree.SubElement( | |
254 when, "param", name="to", type="select", label="Target database:" | |
255 ) | |
256 ruleId = item["ruleId"] | |
257 for to in rules[ruleId]: | |
258 option = etree.SubElement(to_select, "option", value=to) | |
259 option.text = tos[to] | |
260 etree.indent(from_cond, space=" ") | |
261 print(etree.tostring(from_cond, pretty_print=True, encoding="unicode")) | |
262 sys.exit(0) | |
89 | 263 |
90 # get the IDs from the file as sorted list | 264 # get the IDs from the file as sorted list |
91 # (sorted is convenient for testing) | 265 # (sorted is convenient for testing) |
92 query = set() | 266 query = set() |
93 for line in args.inp: | 267 for line in args.inp: |
94 query.add(line.strip()) | 268 query.add(line.strip()) |
95 query = sorted(query) | 269 query = sorted(query) |
96 | 270 |
97 if args.tool == 'map': | 271 if args.tool == "map": |
98 pload = _map(query, args.f, args.t, chunk_size=100) | 272 job_id = submit_id_mapping(from_db=args.f, to_db=args.t, ids=query) |
99 elif args.tool == 'retrieve': | 273 elif args.tool == "retrieve": |
100 pload = _map(query, 'ACC+ID', 'ACC', args.format, chunk_size=100) | 274 job_id = submit_id_mapping( |
101 | 275 from_db="UniProtKB_AC-ID", to_db="UniProtKB", ids=query |
102 adapter = TimeoutHTTPAdapter(max_retries=retry_strategy) | 276 ) |
103 http = requests.Session() | 277 |
104 http.mount("https://", adapter) | 278 if check_id_mapping_results_ready(job_id): |
105 for i, p in enumerate(pload): | 279 link = get_id_mapping_results_link(job_id) |
106 response = http.post(**p) | 280 link = f"{link}?format={args.format}" |
107 args.out.write(response.text) | 281 print(link) |
108 http.close() | 282 results = get_id_mapping_results_search(link) |
283 | |
284 if not isinstance(results, str): | |
285 results = "\n".join(results) | |
286 args.out.write(f"{results}\n") |