Mercurial > repos > bgruening > data_manager_diamond_database_builder
comparison data_manager/data_manager_diamond_database_builder.py @ 2:5558f74bd296 draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_diamond_database_builder commit 75abf7d4b23ed7ae8abce80609d81b20bc882863"
author | iuc |
---|---|
date | Sun, 21 Mar 2021 12:07:34 +0000 |
parents | 5a0d0bee4f8d |
children |
comparison
equal
deleted
inserted
replaced
1:5a0d0bee4f8d | 2:5558f74bd296 |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 import bz2 | |
3 import gzip | |
2 import json | 4 import json |
5 import optparse | |
6 import os | |
7 import shutil | |
8 import subprocess | |
3 import sys | 9 import sys |
4 import os | 10 import tarfile |
5 import tempfile | 11 import tempfile |
6 import shutil | 12 import urllib.error |
7 import optparse | 13 import urllib.parse |
8 import urllib2 | 14 import urllib.request |
9 import subprocess | 15 import zipfile |
10 from ftplib import FTP | 16 from ftplib import FTP |
11 import tarfile | |
12 import zipfile | |
13 import gzip | |
14 import bz2 | |
15 | 17 |
16 CHUNK_SIZE = 2**20 # 1mb | 18 CHUNK_SIZE = 2**20 # 1mb |
17 | 19 |
18 | 20 |
19 def cleanup_before_exit(tmp_dir): | 21 def cleanup_before_exit(tmp_dir): |
20 if tmp_dir and os.path.exists(tmp_dir): | 22 if tmp_dir and os.path.exists(tmp_dir): |
21 shutil.rmtree(tmp_dir) | 23 shutil.rmtree(tmp_dir) |
22 | |
23 | |
24 def stop_err(msg): | |
25 sys.stderr.write(msg) | |
26 sys.exit(1) | |
27 | 24 |
28 | 25 |
29 def _get_files_in_ftp_path(ftp, path): | 26 def _get_files_in_ftp_path(ftp, path): |
30 path_contents = [] | 27 path_contents = [] |
31 ftp.retrlines('MLSD %s' % (path), path_contents.append) | 28 ftp.retrlines('MLSD %s' % (path), path_contents.append) |
52 | 49 |
53 def _get_stream_readers_for_bz2(file_obj, tmp_dir): | 50 def _get_stream_readers_for_bz2(file_obj, tmp_dir): |
54 return [bz2.BZ2File(file_obj.name, 'rb')] | 51 return [bz2.BZ2File(file_obj.name, 'rb')] |
55 | 52 |
56 | 53 |
57 def download_from_ncbi(data_manager_dict, params, target_directory, database_id, database_name): | 54 def download_from_ncbi(data_manager_dict, params, target_directory, |
55 database_id, database_name): | |
58 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov' | 56 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov' |
59 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/' | 57 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/' |
60 COMPRESSED_EXTENSIONS = [('.tar.gz', _get_stream_readers_for_tar), ('.tar.bz2', _get_stream_readers_for_tar), ('.zip', _get_stream_readers_for_zip), ('.gz', _get_stream_readers_for_gzip), ('.bz2', _get_stream_readers_for_bz2)] | 58 COMPRESSED_EXTENSIONS = [('.tar.gz', _get_stream_readers_for_tar), |
61 | 59 ('.tar.bz2', _get_stream_readers_for_tar), |
62 ncbi_identifier = params['param_dict']['reference_source']['requested_identifier'] | 60 ('.zip', _get_stream_readers_for_zip), |
61 ('.gz', _get_stream_readers_for_gzip), | |
62 ('.bz2', _get_stream_readers_for_bz2)] | |
63 | |
64 ncbi_identifier = params['reference_source']['requested_identifier'] | |
63 ftp = FTP(NCBI_FTP_SERVER) | 65 ftp = FTP(NCBI_FTP_SERVER) |
64 ftp.login() | 66 ftp.login() |
65 | 67 |
66 path_contents = _get_files_in_ftp_path(ftp, NCBI_DOWNLOAD_PATH) | 68 path_contents = _get_files_in_ftp_path(ftp, NCBI_DOWNLOAD_PATH) |
67 | 69 |
77 raise Exception('Unable to determine filename for NCBI database for %s: %s' % (ncbi_identifier, path_contents)) | 79 raise Exception('Unable to determine filename for NCBI database for %s: %s' % (ncbi_identifier, path_contents)) |
78 | 80 |
79 tmp_dir = tempfile.mkdtemp(prefix='tmp-data-manager-ncbi-') | 81 tmp_dir = tempfile.mkdtemp(prefix='tmp-data-manager-ncbi-') |
80 ncbi_fasta_filename = os.path.join(tmp_dir, "%s%s" % (ncbi_identifier, ext)) | 82 ncbi_fasta_filename = os.path.join(tmp_dir, "%s%s" % (ncbi_identifier, ext)) |
81 | 83 |
82 fasta_base_filename = "%s.fa" % database_id | 84 # fasta_base_filename = "%s.fa" % database_id |
83 fasta_filename = os.path.join(target_directory, fasta_base_filename) | 85 # fasta_filename = os.path.join(target_directory, fasta_base_filename) |
84 fasta_writer = open(fasta_filename, 'wb+') | 86 # fasta_writer = open(fasta_filename, 'wb+') |
85 | 87 |
86 tmp_extract_dir = os.path.join(tmp_dir, 'extracted_fasta') | 88 tmp_extract_dir = os.path.join(tmp_dir, 'extracted_fasta') |
87 os.mkdir(tmp_extract_dir) | 89 os.mkdir(tmp_extract_dir) |
88 | 90 |
89 tmp_fasta = open(ncbi_fasta_filename, 'wb+') | 91 tmp_fasta = open(ncbi_fasta_filename, 'wb+') |
104 cleanup_before_exit(tmp_dir) | 106 cleanup_before_exit(tmp_dir) |
105 | 107 |
106 | 108 |
107 def download_from_url(data_manager_dict, params, target_directory, database_id, database_name): | 109 def download_from_url(data_manager_dict, params, target_directory, database_id, database_name): |
108 # TODO: we should automatically do decompression here | 110 # TODO: we should automatically do decompression here |
109 urls = filter(bool, map(lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split('\n'))) | 111 urls = list(filter(bool, [x.strip() for x in params['reference_source']['user_url'].split('\n')])) |
110 fasta_reader = [urllib2.urlopen(url) for url in urls] | 112 fasta_reader = [urllib.request.urlopen(url) for url in urls] |
111 | 113 |
112 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params) | 114 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params) |
113 _add_data_table_entry(data_manager_dict, data_table_entry) | 115 _add_data_table_entry(data_manager_dict, data_table_entry) |
114 | 116 |
115 | 117 |
116 def download_from_history(data_manager_dict, params, target_directory, database_id, database_name): | 118 def download_from_history(data_manager_dict, params, target_directory, database_id, database_name): |
117 # TODO: allow multiple FASTA input files | 119 # TODO: allow multiple FASTA input files |
118 input_filename = params['param_dict']['reference_source']['input_fasta'] | 120 input_filename = params['reference_source']['input_fasta'] |
119 if isinstance(input_filename, list): | 121 if isinstance(input_filename, list): |
120 fasta_reader = [open(filename, 'rb') for filename in input_filename] | 122 fasta_reader = [open(filename, 'rb') for filename in input_filename] |
121 else: | 123 else: |
122 fasta_reader = open(input_filename) | 124 fasta_reader = open(input_filename, 'rb') |
123 | 125 |
124 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params) | 126 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params) |
125 _add_data_table_entry(data_manager_dict, data_table_entry) | 127 _add_data_table_entry(data_manager_dict, data_table_entry) |
126 | 128 |
127 | 129 |
128 def copy_from_directory(data_manager_dict, params, target_directory, database_id, database_name): | 130 def copy_from_directory(data_manager_dict, params, target_directory, database_id, database_name): |
129 input_filename = params['param_dict']['reference_source']['fasta_filename'] | 131 input_filename = params['reference_source']['fasta_filename'] |
130 create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink' | 132 create_symlink = params['reference_source']['create_symlink'] == 'create_symlink' |
131 if create_symlink: | 133 if create_symlink: |
132 data_table_entry = _create_symlink(input_filename, target_directory, database_id, database_name) | 134 data_table_entry = _create_symlink(input_filename, target_directory, database_id, database_name) |
133 else: | 135 else: |
134 if isinstance(input_filename, list): | 136 if isinstance(input_filename, list): |
135 fasta_reader = [open(filename, 'rb') for filename in input_filename] | 137 fasta_reader = [open(filename, 'rb') for filename in input_filename] |
144 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get('diamond_database', []) | 146 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get('diamond_database', []) |
145 data_manager_dict['data_tables']['diamond_database'].append(data_table_entry) | 147 data_manager_dict['data_tables']['diamond_database'].append(data_table_entry) |
146 return data_manager_dict | 148 return data_manager_dict |
147 | 149 |
148 | 150 |
149 def _stream_fasta_to_file(fasta_stream, target_directory, database_id, database_name, params, close_stream=True): | 151 def _stream_fasta_to_file(fasta_stream, target_directory, database_id, |
152 database_name, params, close_stream=True): | |
150 fasta_base_filename = "%s.fa" % database_id | 153 fasta_base_filename = "%s.fa" % database_id |
151 fasta_filename = os.path.join(target_directory, fasta_base_filename) | 154 fasta_filename = os.path.join(target_directory, fasta_base_filename) |
152 | 155 |
153 temp_fasta = tempfile.NamedTemporaryFile(delete=False, suffix=".fasta") | 156 temp_fasta = tempfile.NamedTemporaryFile(delete=False, suffix=".fasta") |
154 temp_fasta.close() | 157 temp_fasta.close() |
155 fasta_writer = open(temp_fasta.name, 'wb+') | 158 fasta_writer = open(temp_fasta.name, 'wb+') |
156 | 159 |
157 if isinstance(fasta_stream, list) and len(fasta_stream) == 1: | 160 if not isinstance(fasta_stream, list): |
158 fasta_stream = fasta_stream[0] | 161 fasta_stream = [fasta_stream] |
159 | 162 |
160 if isinstance(fasta_stream, list): | 163 last_char = None |
161 last_char = None | 164 for fh in fasta_stream: |
162 for fh in fasta_stream: | 165 if last_char not in [None, '\n', '\r']: |
163 if last_char not in [None, '\n', '\r']: | 166 fasta_writer.write('\n') |
164 fasta_writer.write('\n') | |
165 while True: | |
166 data = fh.read(CHUNK_SIZE) | |
167 if data: | |
168 fasta_writer.write(data) | |
169 last_char = data[-1] | |
170 else: | |
171 break | |
172 if close_stream: | |
173 fh.close() | |
174 else: | |
175 while True: | 167 while True: |
176 data = fasta_stream.read(CHUNK_SIZE) | 168 data = fh.read(CHUNK_SIZE) |
177 if data: | 169 if data: |
178 fasta_writer.write(data) | 170 fasta_writer.write(data) |
171 last_char = data[-1] | |
179 else: | 172 else: |
180 break | 173 break |
181 if close_stream: | 174 if close_stream: |
182 fasta_stream.close() | 175 fh.close() |
183 | 176 |
184 fasta_writer.close() | 177 fasta_writer.close() |
185 | 178 |
186 args = ['diamond', 'makedb', '--in', temp_fasta.name, '--db', fasta_filename] | 179 args = ['diamond', 'makedb', |
180 '--in', temp_fasta.name, | |
181 '--db', fasta_filename] | |
182 if params['tax_cond']['tax_select'] == "history": | |
183 for i in ["taxonmap", "taxonnodes", "taxonnames"]: | |
184 args.extend(['--' + i, params['tax_cond'][i]]) | |
185 elif params['tax_cond']['tax_select'] == "ncbi": | |
186 if os.path.isfile(os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.FULL.gz')): | |
187 args.extend(['--taxonmap', | |
188 os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.FULL.gz')]) | |
189 elif os.path.isfile(os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.FULL')): | |
190 args.extend(['--taxonmap', | |
191 os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.FULL')]) | |
192 elif os.path.isfile(os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.gz')): | |
193 args.extend(['--taxonmap', | |
194 os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid.gz')]) | |
195 elif os.path.isfile(os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid')): | |
196 args.extend(['--taxonmap', | |
197 os.path.join(params['tax_cond']['ncbi_tax'], 'prot.accession2taxid')]) | |
198 else: | |
199 raise Exception('Unable to find prot.accession2taxid file in %s' % (params['tax_cond']['ncbi_tax'])) | |
200 | |
201 args.extend(['--taxonnodes', | |
202 os.path.join(params['tax_cond']['ncbi_tax'], 'nodes.dmp')]) | |
203 args.extend(['--taxonnames', | |
204 os.path.join(params['tax_cond']['ncbi_tax'], 'names.dmp')]) | |
187 | 205 |
188 tmp_stderr = tempfile.NamedTemporaryFile(prefix="tmp-data-manager-diamond-database-builder-stderr") | 206 tmp_stderr = tempfile.NamedTemporaryFile(prefix="tmp-data-manager-diamond-database-builder-stderr") |
189 proc = subprocess.Popen(args=args, shell=False, cwd=target_directory, stderr=tmp_stderr.fileno()) | 207 proc = subprocess.Popen(args=args, shell=False, cwd=target_directory, |
208 stderr=tmp_stderr.fileno()) | |
190 return_code = proc.wait() | 209 return_code = proc.wait() |
191 if return_code: | 210 if return_code: |
192 tmp_stderr.flush() | 211 tmp_stderr.flush() |
193 tmp_stderr.seek(0) | 212 tmp_stderr.seek(0) |
194 print >> sys.stderr, "Error building diamond database:" | 213 print("Error building diamond database:", file=sys.stderr) |
195 while True: | 214 while True: |
196 chunk = tmp_stderr.read(CHUNK_SIZE) | 215 chunk = tmp_stderr.read(CHUNK_SIZE) |
197 if not chunk: | 216 if not chunk: |
198 break | 217 break |
199 sys.stderr.write(chunk) | 218 sys.stderr.write(chunk.decode('utf-8')) |
200 sys.exit(return_code) | 219 sys.exit(return_code) |
201 tmp_stderr.close() | 220 tmp_stderr.close() |
202 os.remove(temp_fasta.name) | 221 os.remove(temp_fasta.name) |
203 return dict(value=database_id, name=database_name, db_path="%s.dmnd" % fasta_base_filename) | 222 return dict(value=database_id, name=database_name, |
223 db_path="%s.dmnd" % fasta_base_filename) | |
204 | 224 |
205 | 225 |
206 def _create_symlink(input_filename, target_directory, database_id, database_name): | 226 def _create_symlink(input_filename, target_directory, database_id, database_name): |
207 fasta_base_filename = "%s.fa" % database_id | 227 fasta_base_filename = "%s.fa" % database_id |
208 fasta_filename = os.path.join(target_directory, fasta_base_filename) | 228 fasta_filename = os.path.join(target_directory, fasta_base_filename) |
209 os.symlink(input_filename, fasta_filename) | 229 os.symlink(input_filename, fasta_filename) |
210 return dict(value=database_id, name=database_name, db_path=fasta_base_filename) | 230 return dict(value=database_id, name=database_name, db_path=fasta_base_filename) |
211 | 231 |
212 | 232 |
213 REFERENCE_SOURCE_TO_DOWNLOAD = dict(ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory) | 233 REFERENCE_SOURCE_TO_DOWNLOAD = dict(ncbi=download_from_ncbi, |
234 url=download_from_url, | |
235 history=download_from_history, | |
236 directory=copy_from_directory) | |
214 | 237 |
215 | 238 |
216 def main(): | 239 def main(): |
217 # Parse Command Line | 240 # Parse Command Line |
218 parser = optparse.OptionParser() | 241 parser = optparse.OptionParser() |
219 parser.add_option('-d', '--dbkey_description', dest='dbkey_description', action='store', type="string", default=None, help='dbkey_description') | 242 parser.add_option('-d', '--dbkey_description', dest='dbkey_description', |
243 action='store', type="string", default=None, | |
244 help='dbkey_description') | |
220 (options, args) = parser.parse_args() | 245 (options, args) = parser.parse_args() |
221 | 246 |
222 filename = args[0] | 247 filename = args[0] |
223 | 248 |
224 params = json.loads(open(filename).read()) | 249 with open(filename) as fp: |
250 params = json.load(fp) | |
225 target_directory = params['output_data'][0]['extra_files_path'] | 251 target_directory = params['output_data'][0]['extra_files_path'] |
226 os.mkdir(target_directory) | 252 os.mkdir(target_directory) |
227 data_manager_dict = {} | 253 data_manager_dict = {} |
228 | 254 |
229 database_id = params['param_dict']['database_id'] | 255 param_dict = params['param_dict'] |
230 database_name = params['param_dict']['database_name'] | 256 database_id = param_dict['database_id'] |
257 database_name = param_dict['database_name'] | |
258 if param_dict['tax_cond']['tax_select'] == "ncbi": | |
259 param_dict['tax_cond']['ncbi_tax'] = args[1] | |
231 | 260 |
232 # Fetch the FASTA | 261 # Fetch the FASTA |
233 REFERENCE_SOURCE_TO_DOWNLOAD[params['param_dict']['reference_source']['reference_source_selector']](data_manager_dict, params, target_directory, database_id, database_name) | 262 REFERENCE_SOURCE_TO_DOWNLOAD[param_dict['reference_source']['reference_source_selector']](data_manager_dict, param_dict, target_directory, database_id, database_name) |
234 | 263 |
235 # save info to json file | 264 # save info to json file |
236 open(filename, 'w').write(json.dumps(data_manager_dict, sort_keys=True)) | 265 open(filename, 'w').write(json.dumps(data_manager_dict, sort_keys=True)) |
237 | 266 |
238 | 267 |