Mercurial > repos > iuc > ncbi_eutils_efetch
view eutils.py @ 10:0ccb2855dd55 draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ncbi_entrez_eutils commit 851b88d92fa7740880a5edf407f629bf3f7f7bf0
author | iuc |
---|---|
date | Fri, 04 Oct 2024 14:35:17 +0000 |
parents | 6c4b39080ed7 |
children |
line wrap: on
line source
import json import os from io import StringIO from Bio import Entrez Entrez.tool = "GalaxyEutils_1_0" BATCH_SIZE = 200 class Client(object): def __init__(self, history_file=None, user_email=None, admin_email=None): self.using_history = False self.using_parsedids = False if user_email is not None and admin_email is not None: Entrez.email = ';'.join((admin_email, user_email)) elif user_email is not None: Entrez.email = user_email elif admin_email is not None: Entrez.email = admin_email else: Entrez.email = os.environ.get('NCBI_EUTILS_CONTACT', None) if Entrez.email is None: raise Exception("Cannot continue without an email; please set " "administrator email in NCBI_EUTILS_CONTACT") if history_file is not None: with open(history_file, 'r') as handle: data = json.loads(handle.read()) # esearch if 'QueryKey' in data: self.query_key = data['QueryKey'] self.webenv = data['WebEnv'] self.query_keys = [] self.query_keys += [data['QueryKey']] self.using_history = True elif 'query_key' in data: self.query_key = data['query_key'] self.webenv = data['WebEnv'] self.query_keys = [] self.query_keys += [data['query_key']] self.using_history = True elif 'esearchresult' in data: self.query_key = data['esearchresult']['querykey'] self.webenv = data['esearchresult']['webenv'] self.query_keys = [] self.query_keys += [data['esearchresult']['querykey']] self.using_history = True # elink elif 'linksets' in data: # elink for cmd=neighbor_history if 'linksetdbhistories' in data['linksets'][0]: self.webenv = data['linksets'][0]['webenv'] self.query_key = data['linksets'][0]['linksetdbhistories'][0]['querykey'] self.using_history = True # elink for cmd=neighbor|neighbor_score elif 'linksetdbs' in data['linksets'][0]: self.using_parsedids = True # elink for neighbor if isinstance(data['linksets'][0]['linksetdbs'][0]['links'][0], str): self.idstr = ','.join(data['linksets'][0]['linksetdbs'][0]['links']) # elink for neighbor_score else: self.idstr = ','.join(map(lambda x: x['id'], data['linksets'][0]['linksetdbs'][0]['links'])) if 'linksetdbhistories' in data['linksets'][0]: self.webenv = data['linksets'][0]['webenv'] self.query_keys = [] for query in data['linksets'][0]['linksetdbhistories']: if 'querykey' in query: self.query_keys += [query['querykey']] else: print("No match") print(data) def get_history(self): if self.using_history: return { 'query_key': self.query_key, 'WebEnv': self.webenv, } elif self.using_parsedids: return { 'id': self.idstr, } else: return {} def get_histories(self): histories = [] for key in self.query_keys: histories += [{'WebEnv': self.webenv, 'query_key': key}] return histories def post(self, database, **payload): return json.dumps(Entrez.read(Entrez.epost(database, **payload)), indent=4) def fetch(self, db, ftype=None, **payload): os.makedirs("downloads") if 'id' in payload: summary = self.id_summary(db, payload['id']) elif 'WebEnv' not in payload or 'query_key' not in payload: summary = self.history_summary(db) else: summary = payload count = len(summary) payload['retmax'] = BATCH_SIZE # This may be bad. I'm not sure yet. I think it will be ... but UGH. for i in range(0, count, BATCH_SIZE): payload['retstart'] = i file_path = os.path.join('downloads', 'EFetch Results Chunk %s.%s' % (i, ftype)) with open(file_path, 'w') as handle: handle.write(Entrez.efetch(db, **payload).read()) def id_summary(self, db, id_list): payload = { 'db': db, 'id': id_list, } return Entrez.read(Entrez.esummary(**payload)) def history_summary(self, db): if not self.using_history: raise Exception("History must be available for this method") payload = { 'db': db, 'query_key': self.query_key, 'WebEnv': self.webenv, } return Entrez.read(Entrez.esummary(**payload)) def summary(self, **payload): return Entrez.esummary(**payload).read() def link(self, **payload): return Entrez.elink(**payload).read() def extract_history_from_xml_file(self, xml_file): history = {} with open(xml_file, 'r') as handle: xml_str = handle.read() history = self.extract_history_from_xml(xml_str) return history def extract_history_from_xml(self, xml_str): try: parsed_data = Entrez.read(StringIO(xml_str)) history = {} gotit = 0 # New code doesn't work for esearch input to elink - Parsing esearch output (reading an xml history) does not work as an elink input payload, which needs 'QueryKey'. Notably, if parsing elink output as input to elink, conversion of xml 'QueryKey' to 'query_key' is needed for some reason. Also Notably, efetch returned results using the 'QueryKey' key # For esearch xml history results if 'QueryKey' in parsed_data: history['query_key'] = parsed_data['QueryKey'] gotit += 1 if 'WebEnv' in parsed_data: history['WebEnv'] = parsed_data['WebEnv'] gotit += 1 # For elink xml history results if gotit < 2: if 'LinkSetDbHistory' in parsed_data[0]: if 'QueryKey' in parsed_data[0]['LinkSetDbHistory'][0]: history['query_key'] = parsed_data[0]['LinkSetDbHistory'][0]['QueryKey'] gotit += 1 if 'WebEnv' in parsed_data[0]: history['WebEnv'] = parsed_data[0]['WebEnv'] gotit += 1 if gotit < 2: raise Exception("Could not find WebEnv in xml response") except Exception as e: print("Error parsing...") print(xml_str) raise e return history def extract_histories_from_xml_file(self, xml_file): histories = [] with open(xml_file, 'r') as handle: xml_str = handle.read() histories = self.extract_histories_from_xml(xml_str) return histories def extract_histories_from_xml(self, xml_str): try: parsed_data = Entrez.read(StringIO(xml_str)) histories = [] gotit = 0 # New code doesn't work for esearch input to elink - Parsing esearch output (reading an xml history) does not work as an elink input payload, which needs 'QueryKey'. Notably, if parsing elink output as input to elink, conversion of xml 'QueryKey' to 'query_key' is needed for some reason. Also Notably, efetch returned results using the 'QueryKey' key # For esearch xml history results if 'QueryKey' in parsed_data: tmp_hist = {} tmp_hist['query_key'] = parsed_data['QueryKey'] gotit += 1 if 'WebEnv' in parsed_data: tmp_hist['WebEnv'] = parsed_data['WebEnv'] gotit += 1 if gotit == 2: histories += [tmp_hist] # For elink xml history results else: gotenv = 0 if 'LinkSetDbHistory' in parsed_data[0]: for query in parsed_data[0]['LinkSetDbHistory']: tmp_hist = {} if 'WebEnv' in parsed_data[0]: tmp_hist['WebEnv'] = parsed_data[0]['WebEnv'] if 'QueryKey' in query: tmp_hist['query_key'] = query['QueryKey'] histories += [tmp_hist] gotit += 1 if gotit == 0 and gotenv == 0: raise Exception("Could not find WebEnv in xml response") except Exception as e: print("Error parsing...") print(xml_str) raise e return histories def search(self, **payload): return Entrez.esearch(**payload).read() def info(self, **kwargs): return Entrez.einfo(**kwargs).read() def gquery(self, **kwargs): return Entrez.egquery(**kwargs).read() def citmatch(self, **kwargs): return Entrez.ecitmatch(**kwargs).read() @classmethod def jsonstring2jsondata(cls, json_str): json_handle = StringIO(json_str) json_data = json.loads(json_handle.read()) return json_data @classmethod def jsonfile2UIlist(cls, json_file): merged_ids = [] with open(json_file, 'r') as handle: json_data = json.loads(handle.read()) for id in cls.jsondata2UIlist(json_data): merged_ids += [id] return merged_ids @classmethod def jsondata2UIlist(cls, json_data): merged_ids = [] # Always prioritize the result links as opposed to the search links # elink - retrieves linked IDs for cmd=neighbor|neighbor_score only if 'linksets' in json_data: for lnk in json_data['linksets'][0]['linksetdbs']: if 'links' in lnk: for id in lnk['links']: # elink for neighbor if isinstance(id, str): merged_ids.append(id) # elink for neighbor_score else: merged_ids.append(id['id']) # esearch elif 'esearchresult' in json_data: for id in json_data['esearchresult']['idlist']: merged_ids += [id] return merged_ids @classmethod def xmlfile2UIlist(cls, xml_file): merged_ids = [] with open(xml_file, 'r') as handle: xml_data = Entrez.read(handle) for id in cls.xmldata2UIlist(xml_data): merged_ids += [id] return merged_ids @classmethod def xmlstring2UIlist(cls, xml_str): merged_ids = [] xml_data = Entrez.read(StringIO(xml_str)) for id in cls.xmldata2UIlist(xml_data): merged_ids += [id] return merged_ids @classmethod def xmldata2UIlist(cls, xml_data): merged_ids = [] try: # Always prioritize the result links as opposed to the search links # elink - retrieves linked IDs for cmd=neighbor|neighbor_score only if 'LinkSetDb' in xml_data[0]: for lnk in xml_data[0]['LinkSetDb'][0]['Link']: # elink for neighbor if isinstance(lnk, str): merged_ids.append(lnk) # elink for neighbor_score else: merged_ids.append(lnk['Id']) # esearch elif 'IdList' in xml_data: for id in xml_data['IdList']: merged_ids += [id] # If it was not elink output, we will end up here except Exception: # esearch if 'IdList' in xml_data: for id in xml_data['IdList']: merged_ids += [id] return merged_ids @classmethod def parse_ids(cls, id_list, id, history_file, xml_file, json_file): """Parse IDs passed on --cli or in a file passed to the cli """ merged_ids = [] if id is not None: for pid in id.replace('__cn__', ',').replace('\n', ',').split(','): if pid is not None and len(pid) > 0: merged_ids.append(pid) if id_list is not None: with open(id_list, 'r') as handle: merged_ids += [x.strip() for x in handle.readlines()] if xml_file is not None: tmp_ids = cls.xmlfile2UIlist(xml_file) for id in tmp_ids: merged_ids += [id] if json_file is not None: tmp_ids = cls.jsonfile2UIlist(json_file) for id in tmp_ids: merged_ids += [id] return merged_ids @classmethod def getVersion(cls): """Return the biopython version """ import Bio return Bio.__version__