Mercurial > repos > bgruening > data_manager_diamond_database_builder
comparison data_manager/data_manager_diamond_database_builder.py @ 1:5a0d0bee4f8d draft
"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/data_managers/data_manager_diamond_database_builder commit b2d290a8b609ebbc7f4b93716370143c41062ad4"
author | bgruening |
---|---|
date | Tue, 03 Dec 2019 17:39:48 -0500 |
parents | ce62d0912b10 |
children | 5558f74bd296 |
comparison
equal
deleted
inserted
replaced
0:ce62d0912b10 | 1:5a0d0bee4f8d |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 import json | |
2 import sys | 3 import sys |
3 import os | 4 import os |
4 import tempfile | 5 import tempfile |
5 import shutil | 6 import shutil |
6 import optparse | 7 import optparse |
10 import tarfile | 11 import tarfile |
11 import zipfile | 12 import zipfile |
12 import gzip | 13 import gzip |
13 import bz2 | 14 import bz2 |
14 | 15 |
15 from galaxy.util.json import from_json_string, to_json_string | 16 CHUNK_SIZE = 2**20 # 1mb |
16 | 17 |
17 CHUNK_SIZE = 2**20 #1mb | 18 |
18 | 19 def cleanup_before_exit(tmp_dir): |
19 def cleanup_before_exit( tmp_dir ): | 20 if tmp_dir and os.path.exists(tmp_dir): |
20 if tmp_dir and os.path.exists( tmp_dir ): | 21 shutil.rmtree(tmp_dir) |
21 shutil.rmtree( tmp_dir ) | |
22 | 22 |
23 | 23 |
24 def stop_err(msg): | 24 def stop_err(msg): |
25 sys.stderr.write(msg) | 25 sys.stderr.write(msg) |
26 sys.exit(1) | 26 sys.exit(1) |
27 | 27 |
28 | 28 |
29 def _get_files_in_ftp_path( ftp, path ): | 29 def _get_files_in_ftp_path(ftp, path): |
30 path_contents = [] | 30 path_contents = [] |
31 ftp.retrlines( 'MLSD %s' % ( path ), path_contents.append ) | 31 ftp.retrlines('MLSD %s' % (path), path_contents.append) |
32 return [ line.split( ';' )[ -1 ].lstrip() for line in path_contents ] | 32 return [line.split(';')[-1].lstrip() for line in path_contents] |
33 | 33 |
34 | 34 |
35 def _get_stream_readers_for_tar( file_obj, tmp_dir ): | 35 def _get_stream_readers_for_tar(file_obj, tmp_dir): |
36 fasta_tar = tarfile.open( fileobj=file_obj, mode='r:*' ) | 36 fasta_tar = tarfile.open(fileobj=file_obj, mode='r:*') |
37 return [ fasta_tar.extractfile( member ) for member in fasta_tar.getmembers() ] | 37 return [fasta_tar.extractfile(member) for member in fasta_tar.getmembers()] |
38 | 38 |
39 | 39 |
40 def _get_stream_readers_for_zip( file_obj, tmp_dir ): | 40 def _get_stream_readers_for_zip(file_obj, tmp_dir): |
41 fasta_zip = zipfile.ZipFile( file_obj, 'r' ) | 41 fasta_zip = zipfile.ZipFile(file_obj, 'r') |
42 rval = [] | 42 rval = [] |
43 for member in fasta_zip.namelist(): | 43 for member in fasta_zip.namelist(): |
44 fasta_zip.extract( member, tmp_dir ) | 44 fasta_zip.extract(member, tmp_dir) |
45 rval.append( open( os.path.join( tmp_dir, member ), 'rb' ) ) | 45 rval.append(open(os.path.join(tmp_dir, member), 'rb')) |
46 return rval | 46 return rval |
47 | 47 |
48 | 48 |
49 def _get_stream_readers_for_gzip( file_obj, tmp_dir ): | 49 def _get_stream_readers_for_gzip(file_obj, tmp_dir): |
50 return [ gzip.GzipFile( fileobj=file_obj, mode='rb' ) ] | 50 return [gzip.GzipFile(fileobj=file_obj, mode='rb')] |
51 | 51 |
52 | 52 |
53 def _get_stream_readers_for_bz2( file_obj, tmp_dir ): | 53 def _get_stream_readers_for_bz2(file_obj, tmp_dir): |
54 return [ bz2.BZ2File( file_obj.name, 'rb' ) ] | 54 return [bz2.BZ2File(file_obj.name, 'rb')] |
55 | 55 |
56 | 56 |
57 def download_from_ncbi( data_manager_dict, params, target_directory, database_id, database_name ): | 57 def download_from_ncbi(data_manager_dict, params, target_directory, database_id, database_name): |
58 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov' | 58 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov' |
59 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/' | 59 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/' |
60 COMPRESSED_EXTENSIONS = [ ( '.tar.gz', _get_stream_readers_for_tar ), ( '.tar.bz2', _get_stream_readers_for_tar ), ( '.zip', _get_stream_readers_for_zip ), ( '.gz', _get_stream_readers_for_gzip ), ( '.bz2', _get_stream_readers_for_bz2 ) ] | 60 COMPRESSED_EXTENSIONS = [('.tar.gz', _get_stream_readers_for_tar), ('.tar.bz2', _get_stream_readers_for_tar), ('.zip', _get_stream_readers_for_zip), ('.gz', _get_stream_readers_for_gzip), ('.bz2', _get_stream_readers_for_bz2)] |
61 | 61 |
62 ncbi_identifier = params['param_dict']['reference_source']['requested_identifier'] | 62 ncbi_identifier = params['param_dict']['reference_source']['requested_identifier'] |
63 ftp = FTP( NCBI_FTP_SERVER ) | 63 ftp = FTP(NCBI_FTP_SERVER) |
64 ftp.login() | 64 ftp.login() |
65 | 65 |
66 path_contents = _get_files_in_ftp_path( ftp, NCBI_DOWNLOAD_PATH ) | 66 path_contents = _get_files_in_ftp_path(ftp, NCBI_DOWNLOAD_PATH) |
67 | 67 |
68 ncbi_file_name = None | 68 ncbi_file_name = None |
69 get_stream_reader = None | 69 get_stream_reader = None |
70 ext = None | 70 ext = None |
71 for ext, get_stream_reader in COMPRESSED_EXTENSIONS: | 71 for ext, get_stream_reader in COMPRESSED_EXTENSIONS: |
72 if "%s%s" % ( ncbi_identifier, ext ) in path_contents: | 72 if "%s%s" % (ncbi_identifier, ext) in path_contents: |
73 ncbi_file_name = "%s%s%s" % ( NCBI_DOWNLOAD_PATH, ncbi_identifier, ext ) | 73 ncbi_file_name = "%s%s%s" % (NCBI_DOWNLOAD_PATH, ncbi_identifier, ext) |
74 break | 74 break |
75 | 75 |
76 if not ncbi_file_name: | 76 if not ncbi_file_name: |
77 raise Exception( 'Unable to determine filename for NCBI database for %s: %s' % ( ncbi_identifier, path_contents ) ) | 77 raise Exception('Unable to determine filename for NCBI database for %s: %s' % (ncbi_identifier, path_contents)) |
78 | 78 |
79 tmp_dir = tempfile.mkdtemp( prefix='tmp-data-manager-ncbi-' ) | 79 tmp_dir = tempfile.mkdtemp(prefix='tmp-data-manager-ncbi-') |
80 ncbi_fasta_filename = os.path.join( tmp_dir, "%s%s" % ( ncbi_identifier, ext ) ) | 80 ncbi_fasta_filename = os.path.join(tmp_dir, "%s%s" % (ncbi_identifier, ext)) |
81 | 81 |
82 fasta_base_filename = "%s.fa" % database_id | 82 fasta_base_filename = "%s.fa" % database_id |
83 fasta_filename = os.path.join( target_directory, fasta_base_filename ) | 83 fasta_filename = os.path.join(target_directory, fasta_base_filename) |
84 fasta_writer = open( fasta_filename, 'wb+' ) | 84 fasta_writer = open(fasta_filename, 'wb+') |
85 | 85 |
86 tmp_extract_dir = os.path.join ( tmp_dir, 'extracted_fasta' ) | 86 tmp_extract_dir = os.path.join(tmp_dir, 'extracted_fasta') |
87 os.mkdir( tmp_extract_dir ) | 87 os.mkdir(tmp_extract_dir) |
88 | 88 |
89 tmp_fasta = open( ncbi_fasta_filename, 'wb+' ) | 89 tmp_fasta = open(ncbi_fasta_filename, 'wb+') |
90 | 90 |
91 ftp.retrbinary( 'RETR %s' % ncbi_file_name, tmp_fasta.write ) | 91 ftp.retrbinary('RETR %s' % ncbi_file_name, tmp_fasta.write) |
92 | 92 |
93 tmp_fasta.flush() | 93 tmp_fasta.flush() |
94 tmp_fasta.seek( 0 ) | 94 tmp_fasta.seek(0) |
95 | 95 |
96 fasta_readers = get_stream_reader( tmp_fasta, tmp_extract_dir ) | 96 fasta_readers = get_stream_reader(tmp_fasta, tmp_extract_dir) |
97 | 97 |
98 data_table_entry = _stream_fasta_to_file( fasta_readers, target_directory, database_id, database_name, params ) | 98 data_table_entry = _stream_fasta_to_file(fasta_readers, target_directory, database_id, database_name, params) |
99 _add_data_table_entry( data_manager_dict, data_table_entry ) | 99 _add_data_table_entry(data_manager_dict, data_table_entry) |
100 | 100 |
101 for fasta_reader in fasta_readers: | 101 for fasta_reader in fasta_readers: |
102 fasta_reader.close() | 102 fasta_reader.close() |
103 tmp_fasta.close() | 103 tmp_fasta.close() |
104 cleanup_before_exit( tmp_dir ) | 104 cleanup_before_exit(tmp_dir) |
105 | 105 |
106 | 106 |
107 def download_from_url( data_manager_dict, params, target_directory, database_id, database_name ): | 107 def download_from_url(data_manager_dict, params, target_directory, database_id, database_name): |
108 #TODO: we should automatically do decompression here | 108 # TODO: we should automatically do decompression here |
109 urls = filter( bool, map( lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split( '\n' ) ) ) | 109 urls = filter(bool, map(lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split('\n'))) |
110 fasta_reader = [ urllib2.urlopen( url ) for url in urls ] | 110 fasta_reader = [urllib2.urlopen(url) for url in urls] |
111 | 111 |
112 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) | 112 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params) |
113 _add_data_table_entry( data_manager_dict, data_table_entry ) | 113 _add_data_table_entry(data_manager_dict, data_table_entry) |
114 | 114 |
115 | 115 |
116 def download_from_history( data_manager_dict, params, target_directory, database_id, database_name ): | 116 def download_from_history(data_manager_dict, params, target_directory, database_id, database_name): |
117 #TODO: allow multiple FASTA input files | 117 # TODO: allow multiple FASTA input files |
118 input_filename = params['param_dict']['reference_source']['input_fasta'] | 118 input_filename = params['param_dict']['reference_source']['input_fasta'] |
119 if isinstance( input_filename, list ): | 119 if isinstance(input_filename, list): |
120 fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ] | 120 fasta_reader = [open(filename, 'rb') for filename in input_filename] |
121 else: | 121 else: |
122 fasta_reader = open( input_filename ) | 122 fasta_reader = open(input_filename) |
123 | 123 |
124 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) | 124 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params) |
125 _add_data_table_entry( data_manager_dict, data_table_entry ) | 125 _add_data_table_entry(data_manager_dict, data_table_entry) |
126 | 126 |
127 | 127 |
128 def copy_from_directory( data_manager_dict, params, target_directory, database_id, database_name ): | 128 def copy_from_directory(data_manager_dict, params, target_directory, database_id, database_name): |
129 input_filename = params['param_dict']['reference_source']['fasta_filename'] | 129 input_filename = params['param_dict']['reference_source']['fasta_filename'] |
130 create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink' | 130 create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink' |
131 if create_symlink: | 131 if create_symlink: |
132 data_table_entry = _create_symlink( input_filename, target_directory, database_id, database_name ) | 132 data_table_entry = _create_symlink(input_filename, target_directory, database_id, database_name) |
133 else: | 133 else: |
134 if isinstance( input_filename, list ): | 134 if isinstance(input_filename, list): |
135 fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ] | 135 fasta_reader = [open(filename, 'rb') for filename in input_filename] |
136 else: | 136 else: |
137 fasta_reader = open( input_filename ) | 137 fasta_reader = open(input_filename) |
138 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) | 138 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params) |
139 _add_data_table_entry( data_manager_dict, data_table_entry ) | 139 _add_data_table_entry(data_manager_dict, data_table_entry) |
140 | 140 |
141 | 141 |
142 def _add_data_table_entry( data_manager_dict, data_table_entry ): | 142 def _add_data_table_entry(data_manager_dict, data_table_entry): |
143 data_manager_dict['data_tables'] = data_manager_dict.get( 'data_tables', {} ) | 143 data_manager_dict['data_tables'] = data_manager_dict.get('data_tables', {}) |
144 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get( 'diamond_database', [] ) | 144 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get('diamond_database', []) |
145 data_manager_dict['data_tables']['diamond_database'].append( data_table_entry ) | 145 data_manager_dict['data_tables']['diamond_database'].append(data_table_entry) |
146 return data_manager_dict | 146 return data_manager_dict |
147 | 147 |
148 | 148 |
149 def _stream_fasta_to_file( fasta_stream, target_directory, database_id, database_name, params, close_stream=True ): | 149 def _stream_fasta_to_file(fasta_stream, target_directory, database_id, database_name, params, close_stream=True): |
150 fasta_base_filename = "%s.fa" % database_id | 150 fasta_base_filename = "%s.fa" % database_id |
151 fasta_filename = os.path.join( target_directory, fasta_base_filename ) | 151 fasta_filename = os.path.join(target_directory, fasta_base_filename) |
152 | 152 |
153 temp_fasta = tempfile.NamedTemporaryFile( delete=False, suffix=".fasta" ) | 153 temp_fasta = tempfile.NamedTemporaryFile(delete=False, suffix=".fasta") |
154 temp_fasta.close() | 154 temp_fasta.close() |
155 fasta_writer = open( temp_fasta.name, 'wb+' ) | 155 fasta_writer = open(temp_fasta.name, 'wb+') |
156 | 156 |
157 if isinstance( fasta_stream, list ) and len( fasta_stream ) == 1: | 157 if isinstance(fasta_stream, list) and len(fasta_stream) == 1: |
158 fasta_stream = fasta_stream[0] | 158 fasta_stream = fasta_stream[0] |
159 | 159 |
160 if isinstance( fasta_stream, list ): | 160 if isinstance(fasta_stream, list): |
161 last_char = None | 161 last_char = None |
162 for fh in fasta_stream: | 162 for fh in fasta_stream: |
163 if last_char not in [ None, '\n', '\r' ]: | 163 if last_char not in [None, '\n', '\r']: |
164 fasta_writer.write( '\n' ) | 164 fasta_writer.write('\n') |
165 while True: | 165 while True: |
166 data = fh.read( CHUNK_SIZE ) | 166 data = fh.read(CHUNK_SIZE) |
167 if data: | 167 if data: |
168 fasta_writer.write( data ) | 168 fasta_writer.write(data) |
169 last_char = data[-1] | 169 last_char = data[-1] |
170 else: | 170 else: |
171 break | 171 break |
172 if close_stream: | 172 if close_stream: |
173 fh.close() | 173 fh.close() |
174 else: | 174 else: |
175 while True: | 175 while True: |
176 data = fasta_stream.read( CHUNK_SIZE ) | 176 data = fasta_stream.read(CHUNK_SIZE) |
177 if data: | 177 if data: |
178 fasta_writer.write( data ) | 178 fasta_writer.write(data) |
179 else: | 179 else: |
180 break | 180 break |
181 if close_stream: | 181 if close_stream: |
182 fasta_stream.close() | 182 fasta_stream.close() |
183 | 183 |
184 fasta_writer.close() | 184 fasta_writer.close() |
185 | 185 |
186 args = [ 'diamond', 'makedb', '--in', temp_fasta.name, '--db', fasta_filename] | 186 args = ['diamond', 'makedb', '--in', temp_fasta.name, '--db', fasta_filename] |
187 | 187 |
188 tmp_stderr = tempfile.NamedTemporaryFile( prefix = "tmp-data-manager-diamond-database-builder-stderr" ) | 188 tmp_stderr = tempfile.NamedTemporaryFile(prefix="tmp-data-manager-diamond-database-builder-stderr") |
189 proc = subprocess.Popen( args=args, shell=False, cwd=target_directory, stderr=tmp_stderr.fileno() ) | 189 proc = subprocess.Popen(args=args, shell=False, cwd=target_directory, stderr=tmp_stderr.fileno()) |
190 return_code = proc.wait() | 190 return_code = proc.wait() |
191 if return_code: | 191 if return_code: |
192 tmp_stderr.flush() | 192 tmp_stderr.flush() |
193 tmp_stderr.seek(0) | 193 tmp_stderr.seek(0) |
194 print >> sys.stderr, "Error building diamond database:" | 194 print >> sys.stderr, "Error building diamond database:" |
195 while True: | 195 while True: |
196 chunk = tmp_stderr.read( CHUNK_SIZE ) | 196 chunk = tmp_stderr.read(CHUNK_SIZE) |
197 if not chunk: | 197 if not chunk: |
198 break | 198 break |
199 sys.stderr.write( chunk ) | 199 sys.stderr.write(chunk) |
200 sys.exit( return_code ) | 200 sys.exit(return_code) |
201 tmp_stderr.close() | 201 tmp_stderr.close() |
202 os.remove( temp_fasta.name ) | 202 os.remove(temp_fasta.name) |
203 return dict( value=database_id, name=database_name, db_path="%s.dmnd" % fasta_base_filename ) | 203 return dict(value=database_id, name=database_name, db_path="%s.dmnd" % fasta_base_filename) |
204 | 204 |
205 | 205 |
206 def _create_symlink( input_filename, target_directory, database_id, database_name ): | 206 def _create_symlink(input_filename, target_directory, database_id, database_name): |
207 fasta_base_filename = "%s.fa" % database_id | 207 fasta_base_filename = "%s.fa" % database_id |
208 fasta_filename = os.path.join( target_directory, fasta_base_filename ) | 208 fasta_filename = os.path.join(target_directory, fasta_base_filename) |
209 os.symlink( input_filename, fasta_filename ) | 209 os.symlink(input_filename, fasta_filename) |
210 return dict( value=database_id, name=database_name, db_path=fasta_base_filename ) | 210 return dict(value=database_id, name=database_name, db_path=fasta_base_filename) |
211 | 211 |
212 | 212 |
213 REFERENCE_SOURCE_TO_DOWNLOAD = dict( ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory ) | 213 REFERENCE_SOURCE_TO_DOWNLOAD = dict(ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory) |
214 | |
214 | 215 |
215 def main(): | 216 def main(): |
216 #Parse Command Line | 217 # Parse Command Line |
217 parser = optparse.OptionParser() | 218 parser = optparse.OptionParser() |
218 parser.add_option( '-d', '--dbkey_description', dest='dbkey_description', action='store', type="string", default=None, help='dbkey_description' ) | 219 parser.add_option('-d', '--dbkey_description', dest='dbkey_description', action='store', type="string", default=None, help='dbkey_description') |
219 (options, args) = parser.parse_args() | 220 (options, args) = parser.parse_args() |
220 | 221 |
221 filename = args[0] | 222 filename = args[0] |
222 | 223 |
223 params = from_json_string( open( filename ).read() ) | 224 params = json.loads(open(filename).read()) |
224 target_directory = params[ 'output_data' ][0]['extra_files_path'] | 225 target_directory = params['output_data'][0]['extra_files_path'] |
225 os.mkdir( target_directory ) | 226 os.mkdir(target_directory) |
226 data_manager_dict = {} | 227 data_manager_dict = {} |
227 | 228 |
228 database_id = params['param_dict']['database_id'] | 229 database_id = params['param_dict']['database_id'] |
229 database_name = params['param_dict']['database_name'] | 230 database_name = params['param_dict']['database_name'] |
230 | 231 |
231 #Fetch the FASTA | 232 # Fetch the FASTA |
232 REFERENCE_SOURCE_TO_DOWNLOAD[ params['param_dict']['reference_source']['reference_source_selector'] ]( data_manager_dict, params, target_directory, database_id, database_name ) | 233 REFERENCE_SOURCE_TO_DOWNLOAD[params['param_dict']['reference_source']['reference_source_selector']](data_manager_dict, params, target_directory, database_id, database_name) |
233 | 234 |
234 #save info to json file | 235 # save info to json file |
235 open( filename, 'wb' ).write( to_json_string( data_manager_dict ) ) | 236 open(filename, 'w').write(json.dumps(data_manager_dict, sort_keys=True)) |
237 | |
236 | 238 |
237 if __name__ == "__main__": | 239 if __name__ == "__main__": |
238 main() | 240 main() |