Mercurial > repos > iuc > ncbi_eutils_elink
diff eutils.py @ 3:55b3067762f9 draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ncbi_entrez_eutils commit dae34e5e182b4cceb808d7353080f14aa9a78ca9"
author | iuc |
---|---|
date | Wed, 23 Sep 2020 09:50:37 +0000 |
parents | 1ed9a99b9b1f |
children | c75419361087 |
line wrap: on
line diff
--- a/eutils.py Wed Mar 11 04:02:10 2020 -0400 +++ b/eutils.py Wed Sep 23 09:50:37 2020 +0000 @@ -12,6 +12,7 @@ def __init__(self, history_file=None, user_email=None, admin_email=None): self.using_history = False + self.using_parsedids = False if user_email is not None and admin_email is not None: Entrez.email = ';'.join((admin_email, user_email)) @@ -29,18 +30,69 @@ if history_file is not None: with open(history_file, 'r') as handle: data = json.loads(handle.read()) - self.query_key = data['QueryKey'] - self.webenv = data['WebEnv'] - self.using_history = True + # esearch + if 'QueryKey' in data: + self.query_key = data['QueryKey'] + self.webenv = data['WebEnv'] + self.query_keys = [] + self.query_keys += [data['QueryKey']] + self.using_history = True + elif 'query_key' in data: + self.query_key = data['query_key'] + self.webenv = data['WebEnv'] + self.query_keys = [] + self.query_keys += [data['query_key']] + self.using_history = True + elif 'esearchresult' in data: + self.query_key = data['esearchresult']['querykey'] + self.webenv = data['esearchresult']['webenv'] + self.query_keys = [] + self.query_keys += [data['esearchresult']['querykey']] + self.using_history = True + # elink + elif 'linksets' in data: + # elink for cmd=neighbor_history + if 'linksetdbhistories' in data['linksets'][0]: + self.webenv = data['linksets'][0]['webenv'] + self.query_key = data['linksets'][0]['linksetdbhistories'][0]['querykey'] + self.using_history = True + # elink for cmd=neighbor|neighbor_score + elif 'linksetdbs' in data['linksets'][0]: + self.using_parsedids = True + # elink for neighbor + if isinstance(data['linksets'][0]['linksetdbs'][0]['links'][0], str): + self.idstr = ','.join(data['linksets'][0]['linksetdbs'][0]['links']) + # elink for neighbor_score + else: + self.idstr = ','.join(map(lambda x: x['id'], data['linksets'][0]['linksetdbs'][0]['links'])) + if 'linksetdbhistories' in data['linksets'][0]: + self.webenv = data['linksets'][0]['webenv'] + self.query_keys = [] + for query in data['linksets'][0]['linksetdbhistories']: + if 'querykey' in query: + self.query_keys += [query['querykey']] + else: + print("No match") + print(data) def get_history(self): - if not self.using_history: - return {} - else: + if self.using_history: return { 'query_key': self.query_key, 'WebEnv': self.webenv, } + elif self.using_parsedids: + return { + 'id': self.idstr, + } + else: + return {} + + def get_histories(self): + histories = [] + for key in self.query_keys: + histories += [{'WebEnv': self.webenv, 'query_key': key}] + return histories def post(self, database, **payload): return json.dumps(Entrez.read(Entrez.epost(database, **payload)), indent=4) @@ -50,8 +102,10 @@ if 'id' in payload: summary = self.id_summary(db, payload['id']) + elif 'WebEnv' not in payload or 'query_key' not in payload: + summary = self.history_summary(db) else: - summary = self.history_summary(db) + summary = payload count = len(summary) payload['retmax'] = BATCH_SIZE @@ -87,15 +141,90 @@ def link(self, **payload): return Entrez.elink(**payload).read() - def extract_history(self, xml_data): - parsed_data = Entrez.read(StringIO.StringIO(xml_data)) + def extract_history_from_xml_file(self, xml_file): history = {} - for key in ('QueryKey', 'WebEnv'): - if key in parsed_data: - history[key] = parsed_data[key] + with open(xml_file, 'r') as handle: + xml_str = handle.read() + history = self.extract_history_from_xml(xml_str) + return history + + def extract_history_from_xml(self, xml_str): + try: + parsed_data = Entrez.read(StringIO(xml_str)) + history = {} + gotit = 0 + + # New code doesn't work for esearch input to elink - Parsing esearch output (reading an xml history) does not work as an elink input payload, which needs 'QueryKey'. Notably, if parsing elink output as input to elink, conversion of xml 'QueryKey' to 'query_key' is needed for some reason. Also Notably, efetch returned results using the 'QueryKey' key + # For esearch xml history results + if 'QueryKey' in parsed_data: + history['query_key'] = parsed_data['QueryKey'] + gotit += 1 + if 'WebEnv' in parsed_data: + history['WebEnv'] = parsed_data['WebEnv'] + gotit += 1 + # For elink xml history results + if gotit < 2: + if 'LinkSetDbHistory' in parsed_data[0]: + if 'QueryKey' in parsed_data[0]['LinkSetDbHistory'][0]: + history['query_key'] = parsed_data[0]['LinkSetDbHistory'][0]['QueryKey'] + gotit += 1 + if 'WebEnv' in parsed_data[0]: + history['WebEnv'] = parsed_data[0]['WebEnv'] + gotit += 1 + if gotit < 2: + raise Exception("Could not find WebEnv in xml response") + except Exception as e: + print("Error parsing...") + print(xml_str) + raise(e) return history + def extract_histories_from_xml_file(self, xml_file): + histories = [] + with open(xml_file, 'r') as handle: + xml_str = handle.read() + histories = self.extract_histories_from_xml(xml_str) + return histories + + def extract_histories_from_xml(self, xml_str): + try: + parsed_data = Entrez.read(StringIO(xml_str)) + histories = [] + gotit = 0 + + # New code doesn't work for esearch input to elink - Parsing esearch output (reading an xml history) does not work as an elink input payload, which needs 'QueryKey'. Notably, if parsing elink output as input to elink, conversion of xml 'QueryKey' to 'query_key' is needed for some reason. Also Notably, efetch returned results using the 'QueryKey' key + # For esearch xml history results + if 'QueryKey' in parsed_data: + tmp_hist = {} + tmp_hist['query_key'] = parsed_data['QueryKey'] + gotit += 1 + if 'WebEnv' in parsed_data: + tmp_hist['WebEnv'] = parsed_data['WebEnv'] + gotit += 1 + if gotit == 2: + histories += [tmp_hist] + # For elink xml history results + else: + gotenv = 0 + if 'LinkSetDbHistory' in parsed_data[0]: + for query in parsed_data[0]['LinkSetDbHistory']: + tmp_hist = {} + if 'WebEnv' in parsed_data[0]: + tmp_hist['WebEnv'] = parsed_data[0]['WebEnv'] + if 'QueryKey' in query: + tmp_hist['query_key'] = query['QueryKey'] + histories += [tmp_hist] + gotit += 1 + if gotit == 0 and gotenv == 0: + raise Exception("Could not find WebEnv in xml response") + except Exception as e: + print("Error parsing...") + print(xml_str) + raise(e) + + return histories + def search(self, **payload): return Entrez.esearch(**payload).read() @@ -109,7 +238,90 @@ return Entrez.ecitmatch(**kwargs).read() @classmethod - def parse_ids(cls, id_list, id, history_file): + def jsonstring2jsondata(cls, json_str): + json_handle = StringIO(json_str) + json_data = json.loads(json_handle.read()) + return json_data + + @classmethod + def jsonfile2UIlist(cls, json_file): + merged_ids = [] + with open(json_file, 'r') as handle: + json_data = json.loads(handle.read()) + for id in cls.jsondata2UIlist(json_data): + merged_ids += [id] + return merged_ids + + @classmethod + def jsondata2UIlist(cls, json_data): + merged_ids = [] + + # Always prioritize the result links as opposed to the search links + # elink - retrieves linked IDs for cmd=neighbor|neighbor_score only + if 'linksets' in json_data: + for lnk in json_data['linksets'][0]['linksetdbs']: + if 'links' in lnk: + for id in lnk['links']: + # elink for neighbor + if isinstance(id, str): + merged_ids.append(id) + # elink for neighbor_score + else: + merged_ids.append(id['id']) + # esearch + elif 'esearchresult' in json_data: + for id in json_data['esearchresult']['idlist']: + merged_ids += [id] + + return merged_ids + + @classmethod + def xmlfile2UIlist(cls, xml_file): + merged_ids = [] + with open(xml_file, 'r') as handle: + xml_data = Entrez.read(handle) + for id in cls.xmldata2UIlist(xml_data): + merged_ids += [id] + return merged_ids + + @classmethod + def xmlstring2UIlist(cls, xml_str): + merged_ids = [] + xml_data = Entrez.read(StringIO(xml_str)) + for id in cls.xmldata2UIlist(xml_data): + merged_ids += [id] + return merged_ids + + @classmethod + def xmldata2UIlist(cls, xml_data): + merged_ids = [] + + try: + # Always prioritize the result links as opposed to the search links + # elink - retrieves linked IDs for cmd=neighbor|neighbor_score only + if 'LinkSetDb' in xml_data[0]: + for lnk in xml_data[0]['LinkSetDb'][0]['Link']: + # elink for neighbor + if isinstance(lnk, str): + merged_ids.append(lnk) + # elink for neighbor_score + else: + merged_ids.append(lnk['Id']) + # esearch + elif 'IdList' in xml_data: + for id in xml_data['IdList']: + merged_ids += [id] + # If it was not elink output, we will end up here + except Exception: + # esearch + if 'IdList' in xml_data: + for id in xml_data['IdList']: + merged_ids += [id] + + return merged_ids + + @classmethod + def parse_ids(cls, id_list, id, history_file, xml_file, json_file): """Parse IDs passed on --cli or in a file passed to the cli """ merged_ids = [] @@ -122,8 +334,21 @@ with open(id_list, 'r') as handle: merged_ids += [x.strip() for x in handle.readlines()] - # Exception hanlded here for uniformity - if len(merged_ids) == 0 and history_file is None: - raise Exception("Must provide history file or IDs") + if xml_file is not None: + tmp_ids = cls.xmlfile2UIlist(xml_file) + for id in tmp_ids: + merged_ids += [id] + + if json_file is not None: + tmp_ids = cls.jsonfile2UIlist(json_file) + for id in tmp_ids: + merged_ids += [id] return merged_ids + + @classmethod + def getVersion(cls): + """Return the biopython version + """ + import Bio + return Bio.__version__