comparison eutils.py @ 0:68cd8d564e0a draft

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ncbi_entrez_eutils commit 15bcc5104c577b4b9c761f2854fc686c07ffa9db
author iuc
date Thu, 07 Jul 2016 02:39:21 -0400
parents
children 732a52c18758
comparison
equal deleted inserted replaced
-1:000000000000 0:68cd8d564e0a
1 import os
2 import json
3 import StringIO
4 from Bio import Entrez
5 Entrez.tool = "GalaxyEutils_1_0"
6 BATCH_SIZE = 200
7
8
9 class Client(object):
10
11 def __init__(self, history_file=None, user_email=None, admin_email=None):
12 self.using_history = False
13
14 if user_email is not None and admin_email is not None:
15 Entrez.email = ';'.join((admin_email, user_email))
16 elif user_email is not None:
17 Entrez.email = user_email
18 elif admin_email is not None:
19 Entrez.email = admin_email
20 else:
21 Entrez.email = os.environ.get('NCBI_EUTILS_CONTACT', None)
22
23 if Entrez.email is None:
24 raise Exception("Cannot continue without an email; please set "
25 "administrator email in NCBI_EUTILS_CONTACT")
26
27 if history_file is not None:
28 with open(history_file, 'r') as handle:
29 data = json.loads(handle.read())
30 self.query_key = data['QueryKey']
31 self.webenv = data['WebEnv']
32 self.using_history = True
33
34 def get_history(self):
35 if not self.using_history:
36 return {}
37 else:
38 return {
39 'query_key': self.query_key,
40 'WebEnv': self.webenv,
41 }
42
43 def post(self, database, **payload):
44 return json.dumps(Entrez.read(Entrez.epost(database, **payload)), indent=4)
45
46 def fetch(self, db, ftype=None, **payload):
47 os.makedirs("downloads")
48
49 if 'id' in payload:
50 summary = self.id_summary(db, payload['id'])
51 else:
52 summary = self.history_summary(db)
53
54 count = len(summary)
55 payload['retmax'] = BATCH_SIZE
56
57 # This may be bad. I'm not sure yet. I think it will be ... but UGH.
58 for i in range(0, count, BATCH_SIZE):
59 payload['retstart'] = i
60 file_path = os.path.join('downloads', 'EFetch Results Chunk %s.%s' % (i, ftype))
61 with open(file_path, 'w') as handle:
62 handle.write(Entrez.efetch(db, **payload).read())
63
64 def id_summary(self, db, id_list):
65 payload = {
66 'db': db,
67 'id': id_list,
68 }
69 return Entrez.read(Entrez.esummary(**payload))
70
71 def history_summary(self, db):
72 if not self.using_history:
73 raise Exception("History must be available for this method")
74
75 payload = {
76 'db': db,
77 'query_key': self.query_key,
78 'WebEnv': self.webenv,
79 }
80 return Entrez.read(Entrez.esummary(**payload))
81
82 def summary(self, **payload):
83 return Entrez.esummary(**payload).read()
84
85 def link(self, **payload):
86 return Entrez.elink(**payload).read()
87
88 def extract_history(self, xml_data):
89 parsed_data = Entrez.read(StringIO.StringIO(xml_data))
90 history = {}
91 for key in ('QueryKey', 'WebEnv'):
92 if key in parsed_data:
93 history[key] = parsed_data[key]
94
95 return history
96
97 def search(self, **payload):
98 return Entrez.esearch(**payload).read()
99
100 def info(self, **kwargs):
101 return Entrez.einfo(**kwargs).read()
102
103 def gquery(self, **kwargs):
104 return Entrez.egquery(**kwargs).read()
105
106 def citmatch(self, **kwargs):
107 return Entrez.ecitmatch(**kwargs).read()
108
109 @classmethod
110 def parse_ids(cls, id_list, id, history_file):
111 """Parse IDs passed on --cli or in a file passed to the cli
112 """
113 merged_ids = []
114 if id is not None:
115 for pid in id.replace('__cn__', ',').replace('\n', ',').split(','):
116 if pid is not None and len(pid) > 0:
117 merged_ids.append(pid)
118
119 if id_list is not None:
120 with open(id_list, 'r') as handle:
121 merged_ids += [x.strip() for x in handle.readlines()]
122
123 # Exception hanlded here for uniformity
124 if len(merged_ids) == 0 and history_file is None:
125 raise Exception("Must provide history file or IDs")
126
127 return merged_ids