Mercurial > repos > iuc > ncbi_eutils_efetch
view eutils.py @ 2:0977ec0f3ba8 draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ncbi_entrez_eutils commit db33413a06c275efd4cc8e771c37facf543a2cfa"
author | iuc |
---|---|
date | Wed, 11 Mar 2020 04:03:36 -0400 |
parents | 0fc65a60436f |
children | c09fcbe4b16a |
line wrap: on
line source
import json import os from io import StringIO from Bio import Entrez Entrez.tool = "GalaxyEutils_1_0" BATCH_SIZE = 200 class Client(object): def __init__(self, history_file=None, user_email=None, admin_email=None): self.using_history = False if user_email is not None and admin_email is not None: Entrez.email = ';'.join((admin_email, user_email)) elif user_email is not None: Entrez.email = user_email elif admin_email is not None: Entrez.email = admin_email else: Entrez.email = os.environ.get('NCBI_EUTILS_CONTACT', None) if Entrez.email is None: raise Exception("Cannot continue without an email; please set " "administrator email in NCBI_EUTILS_CONTACT") if history_file is not None: with open(history_file, 'r') as handle: data = json.loads(handle.read()) self.query_key = data['QueryKey'] self.webenv = data['WebEnv'] self.using_history = True def get_history(self): if not self.using_history: return {} else: return { 'query_key': self.query_key, 'WebEnv': self.webenv, } def post(self, database, **payload): return json.dumps(Entrez.read(Entrez.epost(database, **payload)), indent=4) def fetch(self, db, ftype=None, **payload): os.makedirs("downloads") if 'id' in payload: summary = self.id_summary(db, payload['id']) else: summary = self.history_summary(db) count = len(summary) payload['retmax'] = BATCH_SIZE # This may be bad. I'm not sure yet. I think it will be ... but UGH. for i in range(0, count, BATCH_SIZE): payload['retstart'] = i file_path = os.path.join('downloads', 'EFetch Results Chunk %s.%s' % (i, ftype)) with open(file_path, 'w') as handle: handle.write(Entrez.efetch(db, **payload).read()) def id_summary(self, db, id_list): payload = { 'db': db, 'id': id_list, } return Entrez.read(Entrez.esummary(**payload)) def history_summary(self, db): if not self.using_history: raise Exception("History must be available for this method") payload = { 'db': db, 'query_key': self.query_key, 'WebEnv': self.webenv, } return Entrez.read(Entrez.esummary(**payload)) def summary(self, **payload): return Entrez.esummary(**payload).read() def link(self, **payload): return Entrez.elink(**payload).read() def extract_history(self, xml_data): parsed_data = Entrez.read(StringIO.StringIO(xml_data)) history = {} for key in ('QueryKey', 'WebEnv'): if key in parsed_data: history[key] = parsed_data[key] return history def search(self, **payload): return Entrez.esearch(**payload).read() def info(self, **kwargs): return Entrez.einfo(**kwargs).read() def gquery(self, **kwargs): return Entrez.egquery(**kwargs).read() def citmatch(self, **kwargs): return Entrez.ecitmatch(**kwargs).read() @classmethod def parse_ids(cls, id_list, id, history_file): """Parse IDs passed on --cli or in a file passed to the cli """ merged_ids = [] if id is not None: for pid in id.replace('__cn__', ',').replace('\n', ',').split(','): if pid is not None and len(pid) > 0: merged_ids.append(pid) if id_list is not None: with open(id_list, 'r') as handle: merged_ids += [x.strip() for x in handle.readlines()] # Exception hanlded here for uniformity if len(merged_ids) == 0 and history_file is None: raise Exception("Must provide history file or IDs") return merged_ids