Mercurial > repos > iuc > ncbi_eutils_einfo
comparison eutils.py @ 0:32ac58827a97 draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ncbi_entrez_eutils commit 15bcc5104c577b4b9c761f2854fc686c07ffa9db
| author | iuc |
|---|---|
| date | Thu, 07 Jul 2016 02:40:06 -0400 |
| parents | |
| children | 5b1dc5936af2 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:32ac58827a97 |
|---|---|
| 1 import os | |
| 2 import json | |
| 3 import StringIO | |
| 4 from Bio import Entrez | |
| 5 Entrez.tool = "GalaxyEutils_1_0" | |
| 6 BATCH_SIZE = 200 | |
| 7 | |
| 8 | |
| 9 class Client(object): | |
| 10 | |
| 11 def __init__(self, history_file=None, user_email=None, admin_email=None): | |
| 12 self.using_history = False | |
| 13 | |
| 14 if user_email is not None and admin_email is not None: | |
| 15 Entrez.email = ';'.join((admin_email, user_email)) | |
| 16 elif user_email is not None: | |
| 17 Entrez.email = user_email | |
| 18 elif admin_email is not None: | |
| 19 Entrez.email = admin_email | |
| 20 else: | |
| 21 Entrez.email = os.environ.get('NCBI_EUTILS_CONTACT', None) | |
| 22 | |
| 23 if Entrez.email is None: | |
| 24 raise Exception("Cannot continue without an email; please set " | |
| 25 "administrator email in NCBI_EUTILS_CONTACT") | |
| 26 | |
| 27 if history_file is not None: | |
| 28 with open(history_file, 'r') as handle: | |
| 29 data = json.loads(handle.read()) | |
| 30 self.query_key = data['QueryKey'] | |
| 31 self.webenv = data['WebEnv'] | |
| 32 self.using_history = True | |
| 33 | |
| 34 def get_history(self): | |
| 35 if not self.using_history: | |
| 36 return {} | |
| 37 else: | |
| 38 return { | |
| 39 'query_key': self.query_key, | |
| 40 'WebEnv': self.webenv, | |
| 41 } | |
| 42 | |
| 43 def post(self, database, **payload): | |
| 44 return json.dumps(Entrez.read(Entrez.epost(database, **payload)), indent=4) | |
| 45 | |
| 46 def fetch(self, db, ftype=None, **payload): | |
| 47 os.makedirs("downloads") | |
| 48 | |
| 49 if 'id' in payload: | |
| 50 summary = self.id_summary(db, payload['id']) | |
| 51 else: | |
| 52 summary = self.history_summary(db) | |
| 53 | |
| 54 count = len(summary) | |
| 55 payload['retmax'] = BATCH_SIZE | |
| 56 | |
| 57 # This may be bad. I'm not sure yet. I think it will be ... but UGH. | |
| 58 for i in range(0, count, BATCH_SIZE): | |
| 59 payload['retstart'] = i | |
| 60 file_path = os.path.join('downloads', 'EFetch Results Chunk %s.%s' % (i, ftype)) | |
| 61 with open(file_path, 'w') as handle: | |
| 62 handle.write(Entrez.efetch(db, **payload).read()) | |
| 63 | |
| 64 def id_summary(self, db, id_list): | |
| 65 payload = { | |
| 66 'db': db, | |
| 67 'id': id_list, | |
| 68 } | |
| 69 return Entrez.read(Entrez.esummary(**payload)) | |
| 70 | |
| 71 def history_summary(self, db): | |
| 72 if not self.using_history: | |
| 73 raise Exception("History must be available for this method") | |
| 74 | |
| 75 payload = { | |
| 76 'db': db, | |
| 77 'query_key': self.query_key, | |
| 78 'WebEnv': self.webenv, | |
| 79 } | |
| 80 return Entrez.read(Entrez.esummary(**payload)) | |
| 81 | |
| 82 def summary(self, **payload): | |
| 83 return Entrez.esummary(**payload).read() | |
| 84 | |
| 85 def link(self, **payload): | |
| 86 return Entrez.elink(**payload).read() | |
| 87 | |
| 88 def extract_history(self, xml_data): | |
| 89 parsed_data = Entrez.read(StringIO.StringIO(xml_data)) | |
| 90 history = {} | |
| 91 for key in ('QueryKey', 'WebEnv'): | |
| 92 if key in parsed_data: | |
| 93 history[key] = parsed_data[key] | |
| 94 | |
| 95 return history | |
| 96 | |
| 97 def search(self, **payload): | |
| 98 return Entrez.esearch(**payload).read() | |
| 99 | |
| 100 def info(self, **kwargs): | |
| 101 return Entrez.einfo(**kwargs).read() | |
| 102 | |
| 103 def gquery(self, **kwargs): | |
| 104 return Entrez.egquery(**kwargs).read() | |
| 105 | |
| 106 def citmatch(self, **kwargs): | |
| 107 return Entrez.ecitmatch(**kwargs).read() | |
| 108 | |
| 109 @classmethod | |
| 110 def parse_ids(cls, id_list, id, history_file): | |
| 111 """Parse IDs passed on --cli or in a file passed to the cli | |
| 112 """ | |
| 113 merged_ids = [] | |
| 114 if id is not None: | |
| 115 for pid in id.replace('__cn__', ',').replace('\n', ',').split(','): | |
| 116 if pid is not None and len(pid) > 0: | |
| 117 merged_ids.append(pid) | |
| 118 | |
| 119 if id_list is not None: | |
| 120 with open(id_list, 'r') as handle: | |
| 121 merged_ids += [x.strip() for x in handle.readlines()] | |
| 122 | |
| 123 # Exception hanlded here for uniformity | |
| 124 if len(merged_ids) == 0 and history_file is None: | |
| 125 raise Exception("Must provide history file or IDs") | |
| 126 | |
| 127 return merged_ids |
