comparison uniprot.py @ 9:468c71dac78a draft

planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools/uniprot_rest_interface commit da476148d1c609f5c26e880a3e593f0fa71ff2f6
author bgruening
date Wed, 22 May 2024 21:18:15 +0000
parents af5eccf83605
children 95fb5712344f
comparison
equal deleted inserted replaced
8:af5eccf83605 9:468c71dac78a
1 #!/usr/bin/env python
2 """
3 uniprot python interface
4 to access the uniprot database
5
6 Based on work from Jan Rudolph: https://github.com/jdrudolph/uniprot
7 available services:
8 map
9 retrieve
10
11 rewitten using inspiration form: https://findwork.dev/blog/advanced-usage-python-requests-timeouts-retries-hooks/
12 """
13 import argparse 1 import argparse
2 import json
3 import re
14 import sys 4 import sys
5 import time
6 import zlib
7 from urllib.parse import (
8 parse_qs,
9 urlencode,
10 urlparse,
11 )
12 from xml.etree import ElementTree
15 13
16 import requests 14 import requests
17 from requests.adapters import HTTPAdapter 15 from requests.adapters import (
18 from requests.packages.urllib3.util.retry import Retry 16 HTTPAdapter,
19 17 Retry,
20
21 DEFAULT_TIMEOUT = 5 # seconds
22 URL = 'https://legacy.uniprot.org/'
23
24 retry_strategy = Retry(
25 total=5,
26 backoff_factor=2,
27 status_forcelist=[429, 500, 502, 503, 504],
28 allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
29 ) 18 )
30 19
31 20
32 class TimeoutHTTPAdapter(HTTPAdapter): 21 POLLING_INTERVAL = 3
33 def __init__(self, *args, **kwargs): 22 API_URL = "https://rest.uniprot.org"
34 self.timeout = DEFAULT_TIMEOUT 23
35 if "timeout" in kwargs: 24
36 self.timeout = kwargs["timeout"] 25 retries = Retry(total=5, backoff_factor=0.25, status_forcelist=[500, 502, 503, 504])
37 del kwargs["timeout"] 26 session = requests.Session()
38 super().__init__(*args, **kwargs) 27 session.mount("https://", HTTPAdapter(max_retries=retries))
39 28
40 def send(self, request, **kwargs): 29
41 timeout = kwargs.get("timeout") 30 def check_response(response):
42 if timeout is None: 31 try:
43 kwargs["timeout"] = self.timeout 32 response.raise_for_status()
44 return super().send(request, **kwargs) 33 except requests.HTTPError:
45 34 print(response.json())
46 35 raise
47 def _map(query, f, t, format='tab', chunk_size=100): 36
48 """ _map is not meant for use with the python interface, use `map` instead 37
49 """ 38 def submit_id_mapping(from_db, to_db, ids):
50 tool = 'uploadlists/' 39 print(f"{from_db} {to_db}")
51 data = {'format': format, 'from': f, 'to': t} 40 request = requests.post(
52 41 f"{API_URL}/idmapping/run",
53 req = [] 42 data={"from": from_db, "to": to_db, "ids": ",".join(ids)},
54 for i in range(0, len(query), chunk_size): 43 )
55 q = query[i:i + chunk_size] 44 check_response(request)
56 req.append(dict([("url", URL + tool), 45 return request.json()["jobId"]
57 ('data', data), 46
58 ("files", {'file': ' '.join(q)})])) 47
59 return req 48 def get_next_link(headers):
60 response = requests.post(URL + tool, data=data) 49 re_next_link = re.compile(r'<(.+)>; rel="next"')
61 response.raise_for_status() 50 if "Link" in headers:
62 page = response.text 51 match = re_next_link.match(headers["Link"])
63 if "The service is temporarily unavailable" in page: 52 if match:
64 exit("The UNIPROT service is temporarily unavailable. Please try again later.") 53 return match.group(1)
65 return page 54
66 55
67 56 def check_id_mapping_results_ready(job_id):
68 if __name__ == '__main__': 57 while True:
69 parser = argparse.ArgumentParser(description='retrieve uniprot mapping') 58 request = session.get(f"{API_URL}/idmapping/status/{job_id}")
70 subparsers = parser.add_subparsers(dest='tool') 59 check_response(request)
71 60 j = request.json()
72 mapping = subparsers.add_parser('map') 61 if "jobStatus" in j:
73 mapping.add_argument('f', help='from') 62 if j["jobStatus"] == "RUNNING":
74 mapping.add_argument('t', help='to') 63 print(f"Retrying in {POLLING_INTERVAL}s")
75 mapping.add_argument('inp', nargs='?', type=argparse.FileType('r'), 64 time.sleep(POLLING_INTERVAL)
76 default=sys.stdin, help='input file (default: stdin)') 65 else:
77 mapping.add_argument('out', nargs='?', type=argparse.FileType('w'), 66 raise Exception(j["jobStatus"])
78 default=sys.stdout, help='output file (default: stdout)') 67 else:
79 mapping.add_argument('--format', default='tab', help='output format') 68 return bool(j["results"] or j["failedIds"])
80 69
81 retrieve = subparsers.add_parser('retrieve') 70
82 retrieve.add_argument('inp', metavar='in', nargs='?', type=argparse.FileType('r'), 71 def get_batch(batch_response, file_format, compressed):
83 default=sys.stdin, help='input file (default: stdin)') 72 batch_url = get_next_link(batch_response.headers)
84 retrieve.add_argument('out', nargs='?', type=argparse.FileType('w'), 73 while batch_url:
85 default=sys.stdout, help='output file (default: stdout)') 74 batch_response = session.get(batch_url)
86 retrieve.add_argument('-f', '--format', help='specify output format', default='txt') 75 batch_response.raise_for_status()
76 yield decode_results(batch_response, file_format, compressed)
77 batch_url = get_next_link(batch_response.headers)
78
79
80 def combine_batches(all_results, batch_results, file_format):
81 if file_format == "json":
82 for key in ("results", "failedIds"):
83 if key in batch_results and batch_results[key]:
84 all_results[key] += batch_results[key]
85 elif file_format == "tsv":
86 return all_results + batch_results[1:]
87 else:
88 return all_results + batch_results
89 return all_results
90
91
92 def get_id_mapping_results_link(job_id):
93 url = f"{API_URL}/idmapping/details/{job_id}"
94 request = session.get(url)
95 check_response(request)
96 return request.json()["redirectURL"]
97
98
99 def decode_results(response, file_format, compressed):
100 if compressed:
101 decompressed = zlib.decompress(response.content, 16 + zlib.MAX_WBITS)
102 if file_format == "json":
103 j = json.loads(decompressed.decode("utf-8"))
104 return j
105 elif file_format == "tsv":
106 return [line for line in decompressed.decode("utf-8").split("\n") if line]
107 elif file_format == "xlsx":
108 return [decompressed]
109 elif file_format == "xml":
110 return [decompressed.decode("utf-8")]
111 else:
112 return decompressed.decode("utf-8")
113 elif file_format == "json":
114 return response.json()
115 elif file_format == "tsv":
116 return [line for line in response.text.split("\n") if line]
117 elif file_format == "xlsx":
118 return [response.content]
119 elif file_format == "xml":
120 return [response.text]
121 return response.text
122
123
124 def get_xml_namespace(element):
125 m = re.match(r"\{(.*)\}", element.tag)
126 return m.groups()[0] if m else ""
127
128
129 def merge_xml_results(xml_results):
130 merged_root = ElementTree.fromstring(xml_results[0])
131 for result in xml_results[1:]:
132 root = ElementTree.fromstring(result)
133 for child in root.findall("{http://uniprot.org/uniprot}entry"):
134 merged_root.insert(-1, child)
135 ElementTree.register_namespace("", get_xml_namespace(merged_root[0]))
136 return ElementTree.tostring(merged_root, encoding="utf-8", xml_declaration=True)
137
138
139 def print_progress_batches(batch_index, size, total):
140 n_fetched = min((batch_index + 1) * size, total)
141 print(f"Fetched: {n_fetched} / {total}")
142
143
144 def get_id_mapping_results_search(url):
145 parsed = urlparse(url)
146 query = parse_qs(parsed.query)
147 file_format = query["format"][0] if "format" in query else "json"
148 if "size" in query:
149 size = int(query["size"][0])
150 else:
151 size = 500
152 query["size"] = size
153 compressed = (
154 query["compressed"][0].lower() == "true" if "compressed" in query else False
155 )
156 parsed = parsed._replace(query=urlencode(query, doseq=True))
157 url = parsed.geturl()
158 request = session.get(url)
159 check_response(request)
160 results = decode_results(request, file_format, compressed)
161 total = int(request.headers["x-total-results"])
162 print_progress_batches(0, size, total)
163 for i, batch in enumerate(get_batch(request, file_format, compressed), 1):
164 results = combine_batches(results, batch, file_format)
165 print_progress_batches(i, size, total)
166 if file_format == "xml":
167 return merge_xml_results(results)
168 return results
169
170
171 # print(results)
172 # {'results': [{'from': 'P05067', 'to': 'CHEMBL2487'}], 'failedIds': ['P12345']}
173
174 if __name__ == "__main__":
175 parser = argparse.ArgumentParser(description="retrieve uniprot mapping")
176 subparsers = parser.add_subparsers(dest="tool")
177
178 mapping = subparsers.add_parser("map")
179 mapping.add_argument("f", help="from")
180 mapping.add_argument("t", help="to")
181 mapping.add_argument(
182 "inp",
183 nargs="?",
184 type=argparse.FileType("r"),
185 default=sys.stdin,
186 help="input file (default: stdin)",
187 )
188 mapping.add_argument(
189 "out",
190 nargs="?",
191 type=argparse.FileType("w"),
192 default=sys.stdout,
193 help="output file (default: stdout)",
194 )
195 mapping.add_argument("--format", default="tab", help="output format")
196
197 retrieve = subparsers.add_parser("retrieve")
198 retrieve.add_argument(
199 "inp",
200 metavar="in",
201 nargs="?",
202 type=argparse.FileType("r"),
203 default=sys.stdin,
204 help="input file (default: stdin)",
205 )
206 retrieve.add_argument(
207 "out",
208 nargs="?",
209 type=argparse.FileType("w"),
210 default=sys.stdout,
211 help="output file (default: stdout)",
212 )
213 retrieve.add_argument("-f", "--format", help="specify output format", default="txt")
214 mapping = subparsers.add_parser("menu")
87 215
88 args = parser.parse_args() 216 args = parser.parse_args()
217
218 # code for auto generating the from - to conditional
219 if args.tool == "menu":
220 from lxml import etree
221
222 request = session.get("https://rest.uniprot.org/configure/idmapping/fields")
223 check_response(request)
224 fields = request.json()
225
226 tos = dict()
227 from_cond = etree.Element("conditional", name="from_cond")
228 from_select = etree.SubElement(
229 from_cond, "param", name="from", type="select", label="Source database:"
230 )
231
232 rules = dict()
233 for rule in fields["rules"]:
234 rules[rule["ruleId"]] = rule["tos"]
235
236 for group in fields["groups"]:
237 group_name = group["groupName"]
238 group_name = group_name.replace("databases", "DBs")
239 for item in group["items"]:
240 if item["to"]:
241 tos[item["name"]] = f"{group_name} - {item['displayName']}"
242
243 for group in fields["groups"]:
244 group_name = group["groupName"]
245 group_name = group_name.replace("databases", "DBs")
246 for item in group["items"]:
247 if not item["from"]:
248 continue
249 option = etree.SubElement(from_select, "option", value=item["name"])
250 option.text = f"{group_name} - {item['displayName']}"
251 when = etree.SubElement(from_cond, "when", value=item["name"])
252
253 to_select = etree.SubElement(
254 when, "param", name="to", type="select", label="Target database:"
255 )
256 ruleId = item["ruleId"]
257 for to in rules[ruleId]:
258 option = etree.SubElement(to_select, "option", value=to)
259 option.text = tos[to]
260 etree.indent(from_cond, space=" ")
261 print(etree.tostring(from_cond, pretty_print=True, encoding="unicode"))
262 sys.exit(0)
89 263
90 # get the IDs from the file as sorted list 264 # get the IDs from the file as sorted list
91 # (sorted is convenient for testing) 265 # (sorted is convenient for testing)
92 query = set() 266 query = set()
93 for line in args.inp: 267 for line in args.inp:
94 query.add(line.strip()) 268 query.add(line.strip())
95 query = sorted(query) 269 query = sorted(query)
96 270
97 if args.tool == 'map': 271 if args.tool == "map":
98 pload = _map(query, args.f, args.t, chunk_size=100) 272 job_id = submit_id_mapping(from_db=args.f, to_db=args.t, ids=query)
99 elif args.tool == 'retrieve': 273 elif args.tool == "retrieve":
100 pload = _map(query, 'ACC+ID', 'ACC', args.format, chunk_size=100) 274 job_id = submit_id_mapping(
101 275 from_db="UniProtKB_AC-ID", to_db="UniProtKB", ids=query
102 adapter = TimeoutHTTPAdapter(max_retries=retry_strategy) 276 )
103 http = requests.Session() 277
104 http.mount("https://", adapter) 278 if check_id_mapping_results_ready(job_id):
105 for i, p in enumerate(pload): 279 link = get_id_mapping_results_link(job_id)
106 response = http.post(**p) 280 link = f"{link}?format={args.format}"
107 args.out.write(response.text) 281 print(link)
108 http.close() 282 results = get_id_mapping_results_search(link)
283
284 if not isinstance(results, str):
285 results = "\n".join(results)
286 args.out.write(f"{results}\n")