comparison data_manager/data_manager_diamond_database_builder.py @ 2:5558f74bd296 draft

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_diamond_database_builder commit 75abf7d4b23ed7ae8abce80609d81b20bc882863"
author iuc
date Sun, 21 Mar 2021 12:07:34 +0000
parents 5a0d0bee4f8d
children
comparison
equal deleted inserted replaced
1:5a0d0bee4f8d 2:5558f74bd296
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 import bz2
3 import gzip
2 import json 4 import json
5 import optparse
6 import os
7 import shutil
8 import subprocess
3 import sys 9 import sys
4 import os 10 import tarfile
5 import tempfile 11 import tempfile
6 import shutil 12 import urllib.error
7 import optparse 13 import urllib.parse
8 import urllib2 14 import urllib.request
9 import subprocess 15 import zipfile
10 from ftplib import FTP 16 from ftplib import FTP
11 import tarfile
12 import zipfile
13 import gzip
14 import bz2
15 17
16 CHUNK_SIZE = 2**20 # 1mb 18 CHUNK_SIZE = 2**20 # 1mb
17 19
18 20
19 def cleanup_before_exit(tmp_dir): 21 def cleanup_before_exit(tmp_dir):
20 if tmp_dir and os.path.exists(tmp_dir): 22 if tmp_dir and os.path.exists(tmp_dir):
21 shutil.rmtree(tmp_dir) 23 shutil.rmtree(tmp_dir)
22
23
24 def stop_err(msg):
25 sys.stderr.write(msg)
26 sys.exit(1)
27 24
28 25
29 def _get_files_in_ftp_path(ftp, path): 26 def _get_files_in_ftp_path(ftp, path):
30 path_contents = [] 27 path_contents = []
31 ftp.retrlines('MLSD %s' % (path), path_contents.append) 28 ftp.retrlines('MLSD %s' % (path), path_contents.append)
52 49
53 def _get_stream_readers_for_bz2(file_obj, tmp_dir): 50 def _get_stream_readers_for_bz2(file_obj, tmp_dir):
54 return [bz2.BZ2File(file_obj.name, 'rb')] 51 return [bz2.BZ2File(file_obj.name, 'rb')]
55 52
56 53
57 def download_from_ncbi(data_manager_dict, params, target_directory, database_id, database_name): 54 def download_from_ncbi(data_manager_dict, params, target_directory,
55 database_id, database_name):
58 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov' 56 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov'
59 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/' 57 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/'
60 COMPRESSED_EXTENSIONS = [('.tar.gz', _get_stream_readers_for_tar), ('.tar.bz2', _get_stream_readers_for_tar), ('.zip', _get_stream_readers_for_zip), ('.gz', _get_stream_readers_for_gzip), ('.bz2', _get_stream_readers_for_bz2)] 58 COMPRESSED_EXTENSIONS = [('.tar.gz', _get_stream_readers_for_tar),
61 59 ('.tar.bz2', _get_stream_readers_for_tar),
62 ncbi_identifier = params['param_dict']['reference_source']['requested_identifier'] 60 ('.zip', _get_stream_readers_for_zip),
61 ('.gz', _get_stream_readers_for_gzip),
62 ('.bz2', _get_stream_readers_for_bz2)]
63
64 ncbi_identifier = params['reference_source']['requested_identifier']
63 ftp = FTP(NCBI_FTP_SERVER) 65 ftp = FTP(NCBI_FTP_SERVER)
64 ftp.login() 66 ftp.login()
65 67
66 path_contents = _get_files_in_ftp_path(ftp, NCBI_DOWNLOAD_PATH) 68 path_contents = _get_files_in_ftp_path(ftp, NCBI_DOWNLOAD_PATH)
67 69
77 raise Exception('Unable to determine filename for NCBI database for %s: %s' % (ncbi_identifier, path_contents)) 79 raise Exception('Unable to determine filename for NCBI database for %s: %s' % (ncbi_identifier, path_contents))
78 80
79 tmp_dir = tempfile.mkdtemp(prefix='tmp-data-manager-ncbi-') 81 tmp_dir = tempfile.mkdtemp(prefix='tmp-data-manager-ncbi-')
80 ncbi_fasta_filename = os.path.join(tmp_dir, "%s%s" % (ncbi_identifier, ext)) 82 ncbi_fasta_filename = os.path.join(tmp_dir, "%s%s" % (ncbi_identifier, ext))
81 83
82 fasta_base_filename = "%s.fa" % database_id 84 # fasta_base_filename = "%s.fa" % database_id
83 fasta_filename = os.path.join(target_directory, fasta_base_filename) 85 # fasta_filename = os.path.join(target_directory, fasta_base_filename)
84 fasta_writer = open(fasta_filename, 'wb+') 86 # fasta_writer = open(fasta_filename, 'wb+')
85 87
86 tmp_extract_dir = os.path.join(tmp_dir, 'extracted_fasta') 88 tmp_extract_dir = os.path.join(tmp_dir, 'extracted_fasta')
87 os.mkdir(tmp_extract_dir) 89 os.mkdir(tmp_extract_dir)
88 90
89 tmp_fasta = open(ncbi_fasta_filename, 'wb+') 91 tmp_fasta = open(ncbi_fasta_filename, 'wb+')
104 cleanup_before_exit(tmp_dir) 106 cleanup_before_exit(tmp_dir)
105 107
106 108
107 def download_from_url(data_manager_dict, params, target_directory, database_id, database_name): 109 def download_from_url(data_manager_dict, params, target_directory, database_id, database_name):
108 # TODO: we should automatically do decompression here 110 # TODO: we should automatically do decompression here
109 urls = filter(bool, map(lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split('\n'))) 111 urls = list(filter(bool, [x.strip() for x in params['reference_source']['user_url'].split('\n')]))
110 fasta_reader = [urllib2.urlopen(url) for url in urls] 112 fasta_reader = [urllib.request.urlopen(url) for url in urls]
111 113
112 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params) 114 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params)
113 _add_data_table_entry(data_manager_dict, data_table_entry) 115 _add_data_table_entry(data_manager_dict, data_table_entry)
114 116
115 117
116 def download_from_history(data_manager_dict, params, target_directory, database_id, database_name): 118 def download_from_history(data_manager_dict, params, target_directory, database_id, database_name):
117 # TODO: allow multiple FASTA input files 119 # TODO: allow multiple FASTA input files
118 input_filename = params['param_dict']['reference_source']['input_fasta'] 120 input_filename = params['reference_source']['input_fasta']
119 if isinstance(input_filename, list): 121 if isinstance(input_filename, list):
120 fasta_reader = [open(filename, 'rb') for filename in input_filename] 122 fasta_reader = [open(filename, 'rb') for filename in input_filename]
121 else: 123 else:
122 fasta_reader = open(input_filename) 124 fasta_reader = open(input_filename, 'rb')
123 125
124 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params) 126 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params)
125 _add_data_table_entry(data_manager_dict, data_table_entry) 127 _add_data_table_entry(data_manager_dict, data_table_entry)
126 128
127 129
128 def copy_from_directory(data_manager_dict, params, target_directory, database_id, database_name): 130 def copy_from_directory(data_manager_dict, params, target_directory, database_id, database_name):
129 input_filename = params['param_dict']['reference_source']['fasta_filename'] 131 input_filename = params['reference_source']['fasta_filename']
130 create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink' 132 create_symlink = params['reference_source']['create_symlink'] == 'create_symlink'
131 if create_symlink: 133 if create_symlink:
132 data_table_entry = _create_symlink(input_filename, target_directory, database_id, database_name) 134 data_table_entry = _create_symlink(input_filename, target_directory, database_id, database_name)
133 else: 135 else:
134 if isinstance(input_filename, list): 136 if isinstance(input_filename, list):
135 fasta_reader = [open(filename, 'rb') for filename in input_filename] 137 fasta_reader = [open(filename, 'rb') for filename in input_filename]
144 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get('diamond_database', []) 146 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get('diamond_database', [])
145 data_manager_dict['data_tables']['diamond_database'].append(data_table_entry) 147 data_manager_dict['data_tables']['diamond_database'].append(data_table_entry)
146 return data_manager_dict 148 return data_manager_dict
147 149
148 150
149 def _stream_fasta_to_file(fasta_stream, target_directory, database_id, database_name, params, close_stream=True): 151 def _stream_fasta_to_file(fasta_stream, target_directory, database_id,
152 database_name, params, close_stream=True):
150 fasta_base_filename = "%s.fa" % database_id 153 fasta_base_filename = "%s.fa" % database_id
151 fasta_filename = os.path.join(target_directory, fasta_base_filename) 154 fasta_filename = os.path.join(target_directory, fasta_base_filename)
152 155
153 temp_fasta = tempfile.NamedTemporaryFile(delete=False, suffix=".fasta") 156 temp_fasta = tempfile.NamedTemporaryFile(delete=False, suffix=".fasta")
154 temp_fasta.close() 157 temp_fasta.close()
155 fasta_writer = open(temp_fasta.name, 'wb+') 158 fasta_writer = open(temp_fasta.name, 'wb+')
156 159
157 if isinstance(fasta_stream, list) and len(fasta_stream) == 1: 160 if not isinstance(fasta_stream, list):
158 fasta_stream = fasta_stream[0] 161 fasta_stream = [fasta_stream]
159 162
160 if isinstance(fasta_stream, list): 163 last_char = None
161 last_char = None 164 for fh in fasta_stream:
162 for fh in fasta_stream: 165 if last_char not in [None, '\n', '\r']:
163 if last_char not in [None, '\n', '\r']: 166 fasta_writer.write('\n')
164 fasta_writer.write('\n')
165 while True:
166 data = fh.read(CHUNK_SIZE)
167 if data:
168 fasta_writer.write(data)
169 last_char = data[-1]
170 else:
171 break
172 if close_stream:
173 fh.close()
174 else:
175 while True: 167 while True:
176 data = fasta_stream.read(CHUNK_SIZE) 168 data = fh.read(CHUNK_SIZE)
177 if data: 169 if data:
178 fasta_writer.write(data) 170 fasta_writer.write(data)
171 last_char = data[-1]
179 else: 172 else:
180 break 173 break
181 if close_stream: 174 if close_stream:
182 fasta_stream.close() 175 fh.close()
183 176
184 fasta_writer.close() 177 fasta_writer.close()
185 178
186 args = ['diamond', 'makedb', '--in', temp_fasta.name, '--db', fasta_filename] 179 args = ['diamond', 'makedb',
180 '--in', temp_fasta.name,
181 '--db', fasta_filename]
182 if params['tax_cond']['tax_select'] == "history":
183 for i in ["taxonmap", "taxonnodes", "taxonnames"]:
184 args.extend(['--' + i, params['tax_cond'][i]])
185 elif params['tax_cond']['tax_select'] == "ncbi":
186 if os.path.isfile(os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.FULL.gz')):
187 args.extend(['--taxonmap',
188 os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.FULL.gz')])
189 elif os.path.isfile(os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.FULL')):
190 args.extend(['--taxonmap',
191 os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.FULL')])
192 elif os.path.isfile(os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.gz')):
193 args.extend(['--taxonmap',
194 os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.gz')])
195 elif os.path.isfile(os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid')):
196 args.extend(['--taxonmap',
197 os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid')])
198 else:
199 raise Exception('Unable to find prot.accession2taxid file in %s' % (params['tax_cond']['ncbi_tax']))
200
201 args.extend(['--taxonnodes',
202 os.path.join(params['tax_cond']['ncbi_tax'], 'nodes.dmp')])
203 args.extend(['--taxonnames',
204 os.path.join(params['tax_cond']['ncbi_tax'], 'names.dmp')])
187 205
188 tmp_stderr = tempfile.NamedTemporaryFile(prefix="tmp-data-manager-diamond-database-builder-stderr") 206 tmp_stderr = tempfile.NamedTemporaryFile(prefix="tmp-data-manager-diamond-database-builder-stderr")
189 proc = subprocess.Popen(args=args, shell=False, cwd=target_directory, stderr=tmp_stderr.fileno()) 207 proc = subprocess.Popen(args=args, shell=False, cwd=target_directory,
208 stderr=tmp_stderr.fileno())
190 return_code = proc.wait() 209 return_code = proc.wait()
191 if return_code: 210 if return_code:
192 tmp_stderr.flush() 211 tmp_stderr.flush()
193 tmp_stderr.seek(0) 212 tmp_stderr.seek(0)
194 print >> sys.stderr, "Error building diamond database:" 213 print("Error building diamond database:", file=sys.stderr)
195 while True: 214 while True:
196 chunk = tmp_stderr.read(CHUNK_SIZE) 215 chunk = tmp_stderr.read(CHUNK_SIZE)
197 if not chunk: 216 if not chunk:
198 break 217 break
199 sys.stderr.write(chunk) 218 sys.stderr.write(chunk.decode('utf-8'))
200 sys.exit(return_code) 219 sys.exit(return_code)
201 tmp_stderr.close() 220 tmp_stderr.close()
202 os.remove(temp_fasta.name) 221 os.remove(temp_fasta.name)
203 return dict(value=database_id, name=database_name, db_path="%s.dmnd" % fasta_base_filename) 222 return dict(value=database_id, name=database_name,
223 db_path="%s.dmnd" % fasta_base_filename)
204 224
205 225
206 def _create_symlink(input_filename, target_directory, database_id, database_name): 226 def _create_symlink(input_filename, target_directory, database_id, database_name):
207 fasta_base_filename = "%s.fa" % database_id 227 fasta_base_filename = "%s.fa" % database_id
208 fasta_filename = os.path.join(target_directory, fasta_base_filename) 228 fasta_filename = os.path.join(target_directory, fasta_base_filename)
209 os.symlink(input_filename, fasta_filename) 229 os.symlink(input_filename, fasta_filename)
210 return dict(value=database_id, name=database_name, db_path=fasta_base_filename) 230 return dict(value=database_id, name=database_name, db_path=fasta_base_filename)
211 231
212 232
213 REFERENCE_SOURCE_TO_DOWNLOAD = dict(ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory) 233 REFERENCE_SOURCE_TO_DOWNLOAD = dict(ncbi=download_from_ncbi,
234 url=download_from_url,
235 history=download_from_history,
236 directory=copy_from_directory)
214 237
215 238
216 def main(): 239 def main():
217 # Parse Command Line 240 # Parse Command Line
218 parser = optparse.OptionParser() 241 parser = optparse.OptionParser()
219 parser.add_option('-d', '--dbkey_description', dest='dbkey_description', action='store', type="string", default=None, help='dbkey_description') 242 parser.add_option('-d', '--dbkey_description', dest='dbkey_description',
243 action='store', type="string", default=None,
244 help='dbkey_description')
220 (options, args) = parser.parse_args() 245 (options, args) = parser.parse_args()
221 246
222 filename = args[0] 247 filename = args[0]
223 248
224 params = json.loads(open(filename).read()) 249 with open(filename) as fp:
250 params = json.load(fp)
225 target_directory = params['output_data'][0]['extra_files_path'] 251 target_directory = params['output_data'][0]['extra_files_path']
226 os.mkdir(target_directory) 252 os.mkdir(target_directory)
227 data_manager_dict = {} 253 data_manager_dict = {}
228 254
229 database_id = params['param_dict']['database_id'] 255 param_dict = params['param_dict']
230 database_name = params['param_dict']['database_name'] 256 database_id = param_dict['database_id']
257 database_name = param_dict['database_name']
258 if param_dict['tax_cond']['tax_select'] == "ncbi":
259 param_dict['tax_cond']['ncbi_tax'] = args[1]
231 260
232 # Fetch the FASTA 261 # Fetch the FASTA
233 REFERENCE_SOURCE_TO_DOWNLOAD[params['param_dict']['reference_source']['reference_source_selector']](data_manager_dict, params, target_directory, database_id, database_name) 262 REFERENCE_SOURCE_TO_DOWNLOAD[param_dict['reference_source']['reference_source_selector']](data_manager_dict, param_dict, target_directory, database_id, database_name)
234 263
235 # save info to json file 264 # save info to json file
236 open(filename, 'w').write(json.dumps(data_manager_dict, sort_keys=True)) 265 open(filename, 'w').write(json.dumps(data_manager_dict, sort_keys=True))
237 266
238 267