Mercurial > repos > iuc > ncbi_eutils_efetch
comparison eutils.py @ 0:71bcf87a7031 draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ncbi_entrez_eutils commit 15bcc5104c577b4b9c761f2854fc686c07ffa9db
author | iuc |
---|---|
date | Thu, 07 Jul 2016 02:39:36 -0400 |
parents | |
children | 0fc65a60436f |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:71bcf87a7031 |
---|---|
1 import os | |
2 import json | |
3 import StringIO | |
4 from Bio import Entrez | |
5 Entrez.tool = "GalaxyEutils_1_0" | |
6 BATCH_SIZE = 200 | |
7 | |
8 | |
9 class Client(object): | |
10 | |
11 def __init__(self, history_file=None, user_email=None, admin_email=None): | |
12 self.using_history = False | |
13 | |
14 if user_email is not None and admin_email is not None: | |
15 Entrez.email = ';'.join((admin_email, user_email)) | |
16 elif user_email is not None: | |
17 Entrez.email = user_email | |
18 elif admin_email is not None: | |
19 Entrez.email = admin_email | |
20 else: | |
21 Entrez.email = os.environ.get('NCBI_EUTILS_CONTACT', None) | |
22 | |
23 if Entrez.email is None: | |
24 raise Exception("Cannot continue without an email; please set " | |
25 "administrator email in NCBI_EUTILS_CONTACT") | |
26 | |
27 if history_file is not None: | |
28 with open(history_file, 'r') as handle: | |
29 data = json.loads(handle.read()) | |
30 self.query_key = data['QueryKey'] | |
31 self.webenv = data['WebEnv'] | |
32 self.using_history = True | |
33 | |
34 def get_history(self): | |
35 if not self.using_history: | |
36 return {} | |
37 else: | |
38 return { | |
39 'query_key': self.query_key, | |
40 'WebEnv': self.webenv, | |
41 } | |
42 | |
43 def post(self, database, **payload): | |
44 return json.dumps(Entrez.read(Entrez.epost(database, **payload)), indent=4) | |
45 | |
46 def fetch(self, db, ftype=None, **payload): | |
47 os.makedirs("downloads") | |
48 | |
49 if 'id' in payload: | |
50 summary = self.id_summary(db, payload['id']) | |
51 else: | |
52 summary = self.history_summary(db) | |
53 | |
54 count = len(summary) | |
55 payload['retmax'] = BATCH_SIZE | |
56 | |
57 # This may be bad. I'm not sure yet. I think it will be ... but UGH. | |
58 for i in range(0, count, BATCH_SIZE): | |
59 payload['retstart'] = i | |
60 file_path = os.path.join('downloads', 'EFetch Results Chunk %s.%s' % (i, ftype)) | |
61 with open(file_path, 'w') as handle: | |
62 handle.write(Entrez.efetch(db, **payload).read()) | |
63 | |
64 def id_summary(self, db, id_list): | |
65 payload = { | |
66 'db': db, | |
67 'id': id_list, | |
68 } | |
69 return Entrez.read(Entrez.esummary(**payload)) | |
70 | |
71 def history_summary(self, db): | |
72 if not self.using_history: | |
73 raise Exception("History must be available for this method") | |
74 | |
75 payload = { | |
76 'db': db, | |
77 'query_key': self.query_key, | |
78 'WebEnv': self.webenv, | |
79 } | |
80 return Entrez.read(Entrez.esummary(**payload)) | |
81 | |
82 def summary(self, **payload): | |
83 return Entrez.esummary(**payload).read() | |
84 | |
85 def link(self, **payload): | |
86 return Entrez.elink(**payload).read() | |
87 | |
88 def extract_history(self, xml_data): | |
89 parsed_data = Entrez.read(StringIO.StringIO(xml_data)) | |
90 history = {} | |
91 for key in ('QueryKey', 'WebEnv'): | |
92 if key in parsed_data: | |
93 history[key] = parsed_data[key] | |
94 | |
95 return history | |
96 | |
97 def search(self, **payload): | |
98 return Entrez.esearch(**payload).read() | |
99 | |
100 def info(self, **kwargs): | |
101 return Entrez.einfo(**kwargs).read() | |
102 | |
103 def gquery(self, **kwargs): | |
104 return Entrez.egquery(**kwargs).read() | |
105 | |
106 def citmatch(self, **kwargs): | |
107 return Entrez.ecitmatch(**kwargs).read() | |
108 | |
109 @classmethod | |
110 def parse_ids(cls, id_list, id, history_file): | |
111 """Parse IDs passed on --cli or in a file passed to the cli | |
112 """ | |
113 merged_ids = [] | |
114 if id is not None: | |
115 for pid in id.replace('__cn__', ',').replace('\n', ',').split(','): | |
116 if pid is not None and len(pid) > 0: | |
117 merged_ids.append(pid) | |
118 | |
119 if id_list is not None: | |
120 with open(id_list, 'r') as handle: | |
121 merged_ids += [x.strip() for x in handle.readlines()] | |
122 | |
123 # Exception hanlded here for uniformity | |
124 if len(merged_ids) == 0 and history_file is None: | |
125 raise Exception("Must provide history file or IDs") | |
126 | |
127 return merged_ids |