Mercurial > repos > iuc > ncbi_eutils_efetch
diff eutils.py @ 0:71bcf87a7031 draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ncbi_entrez_eutils commit 15bcc5104c577b4b9c761f2854fc686c07ffa9db
author | iuc |
---|---|
date | Thu, 07 Jul 2016 02:39:36 -0400 |
parents | |
children | 0fc65a60436f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/eutils.py Thu Jul 07 02:39:36 2016 -0400 @@ -0,0 +1,127 @@ +import os +import json +import StringIO +from Bio import Entrez +Entrez.tool = "GalaxyEutils_1_0" +BATCH_SIZE = 200 + + +class Client(object): + + def __init__(self, history_file=None, user_email=None, admin_email=None): + self.using_history = False + + if user_email is not None and admin_email is not None: + Entrez.email = ';'.join((admin_email, user_email)) + elif user_email is not None: + Entrez.email = user_email + elif admin_email is not None: + Entrez.email = admin_email + else: + Entrez.email = os.environ.get('NCBI_EUTILS_CONTACT', None) + + if Entrez.email is None: + raise Exception("Cannot continue without an email; please set " + "administrator email in NCBI_EUTILS_CONTACT") + + if history_file is not None: + with open(history_file, 'r') as handle: + data = json.loads(handle.read()) + self.query_key = data['QueryKey'] + self.webenv = data['WebEnv'] + self.using_history = True + + def get_history(self): + if not self.using_history: + return {} + else: + return { + 'query_key': self.query_key, + 'WebEnv': self.webenv, + } + + def post(self, database, **payload): + return json.dumps(Entrez.read(Entrez.epost(database, **payload)), indent=4) + + def fetch(self, db, ftype=None, **payload): + os.makedirs("downloads") + + if 'id' in payload: + summary = self.id_summary(db, payload['id']) + else: + summary = self.history_summary(db) + + count = len(summary) + payload['retmax'] = BATCH_SIZE + + # This may be bad. I'm not sure yet. I think it will be ... but UGH. + for i in range(0, count, BATCH_SIZE): + payload['retstart'] = i + file_path = os.path.join('downloads', 'EFetch Results Chunk %s.%s' % (i, ftype)) + with open(file_path, 'w') as handle: + handle.write(Entrez.efetch(db, **payload).read()) + + def id_summary(self, db, id_list): + payload = { + 'db': db, + 'id': id_list, + } + return Entrez.read(Entrez.esummary(**payload)) + + def history_summary(self, db): + if not self.using_history: + raise Exception("History must be available for this method") + + payload = { + 'db': db, + 'query_key': self.query_key, + 'WebEnv': self.webenv, + } + return Entrez.read(Entrez.esummary(**payload)) + + def summary(self, **payload): + return Entrez.esummary(**payload).read() + + def link(self, **payload): + return Entrez.elink(**payload).read() + + def extract_history(self, xml_data): + parsed_data = Entrez.read(StringIO.StringIO(xml_data)) + history = {} + for key in ('QueryKey', 'WebEnv'): + if key in parsed_data: + history[key] = parsed_data[key] + + return history + + def search(self, **payload): + return Entrez.esearch(**payload).read() + + def info(self, **kwargs): + return Entrez.einfo(**kwargs).read() + + def gquery(self, **kwargs): + return Entrez.egquery(**kwargs).read() + + def citmatch(self, **kwargs): + return Entrez.ecitmatch(**kwargs).read() + + @classmethod + def parse_ids(cls, id_list, id, history_file): + """Parse IDs passed on --cli or in a file passed to the cli + """ + merged_ids = [] + if id is not None: + for pid in id.replace('__cn__', ',').replace('\n', ',').split(','): + if pid is not None and len(pid) > 0: + merged_ids.append(pid) + + if id_list is not None: + with open(id_list, 'r') as handle: + merged_ids += [x.strip() for x in handle.readlines()] + + # Exception hanlded here for uniformity + if len(merged_ids) == 0 and history_file is None: + raise Exception("Must provide history file or IDs") + + return merged_ids