comparison data_manager/data_manager_diamond_database_builder.py @ 1:5a0d0bee4f8d draft

"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/data_managers/data_manager_diamond_database_builder commit b2d290a8b609ebbc7f4b93716370143c41062ad4"
author bgruening
date Tue, 03 Dec 2019 17:39:48 -0500
parents ce62d0912b10
children 5558f74bd296
comparison
equal deleted inserted replaced
0:ce62d0912b10 1:5a0d0bee4f8d
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 import json
2 import sys 3 import sys
3 import os 4 import os
4 import tempfile 5 import tempfile
5 import shutil 6 import shutil
6 import optparse 7 import optparse
10 import tarfile 11 import tarfile
11 import zipfile 12 import zipfile
12 import gzip 13 import gzip
13 import bz2 14 import bz2
14 15
15 from galaxy.util.json import from_json_string, to_json_string 16 CHUNK_SIZE = 2**20 # 1mb
16 17
17 CHUNK_SIZE = 2**20 #1mb 18
18 19 def cleanup_before_exit(tmp_dir):
19 def cleanup_before_exit( tmp_dir ): 20 if tmp_dir and os.path.exists(tmp_dir):
20 if tmp_dir and os.path.exists( tmp_dir ): 21 shutil.rmtree(tmp_dir)
21 shutil.rmtree( tmp_dir )
22 22
23 23
24 def stop_err(msg): 24 def stop_err(msg):
25 sys.stderr.write(msg) 25 sys.stderr.write(msg)
26 sys.exit(1) 26 sys.exit(1)
27 27
28 28
29 def _get_files_in_ftp_path( ftp, path ): 29 def _get_files_in_ftp_path(ftp, path):
30 path_contents = [] 30 path_contents = []
31 ftp.retrlines( 'MLSD %s' % ( path ), path_contents.append ) 31 ftp.retrlines('MLSD %s' % (path), path_contents.append)
32 return [ line.split( ';' )[ -1 ].lstrip() for line in path_contents ] 32 return [line.split(';')[-1].lstrip() for line in path_contents]
33 33
34 34
35 def _get_stream_readers_for_tar( file_obj, tmp_dir ): 35 def _get_stream_readers_for_tar(file_obj, tmp_dir):
36 fasta_tar = tarfile.open( fileobj=file_obj, mode='r:*' ) 36 fasta_tar = tarfile.open(fileobj=file_obj, mode='r:*')
37 return [ fasta_tar.extractfile( member ) for member in fasta_tar.getmembers() ] 37 return [fasta_tar.extractfile(member) for member in fasta_tar.getmembers()]
38 38
39 39
40 def _get_stream_readers_for_zip( file_obj, tmp_dir ): 40 def _get_stream_readers_for_zip(file_obj, tmp_dir):
41 fasta_zip = zipfile.ZipFile( file_obj, 'r' ) 41 fasta_zip = zipfile.ZipFile(file_obj, 'r')
42 rval = [] 42 rval = []
43 for member in fasta_zip.namelist(): 43 for member in fasta_zip.namelist():
44 fasta_zip.extract( member, tmp_dir ) 44 fasta_zip.extract(member, tmp_dir)
45 rval.append( open( os.path.join( tmp_dir, member ), 'rb' ) ) 45 rval.append(open(os.path.join(tmp_dir, member), 'rb'))
46 return rval 46 return rval
47 47
48 48
49 def _get_stream_readers_for_gzip( file_obj, tmp_dir ): 49 def _get_stream_readers_for_gzip(file_obj, tmp_dir):
50 return [ gzip.GzipFile( fileobj=file_obj, mode='rb' ) ] 50 return [gzip.GzipFile(fileobj=file_obj, mode='rb')]
51 51
52 52
53 def _get_stream_readers_for_bz2( file_obj, tmp_dir ): 53 def _get_stream_readers_for_bz2(file_obj, tmp_dir):
54 return [ bz2.BZ2File( file_obj.name, 'rb' ) ] 54 return [bz2.BZ2File(file_obj.name, 'rb')]
55 55
56 56
57 def download_from_ncbi( data_manager_dict, params, target_directory, database_id, database_name ): 57 def download_from_ncbi(data_manager_dict, params, target_directory, database_id, database_name):
58 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov' 58 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov'
59 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/' 59 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/'
60 COMPRESSED_EXTENSIONS = [ ( '.tar.gz', _get_stream_readers_for_tar ), ( '.tar.bz2', _get_stream_readers_for_tar ), ( '.zip', _get_stream_readers_for_zip ), ( '.gz', _get_stream_readers_for_gzip ), ( '.bz2', _get_stream_readers_for_bz2 ) ] 60 COMPRESSED_EXTENSIONS = [('.tar.gz', _get_stream_readers_for_tar), ('.tar.bz2', _get_stream_readers_for_tar), ('.zip', _get_stream_readers_for_zip), ('.gz', _get_stream_readers_for_gzip), ('.bz2', _get_stream_readers_for_bz2)]
61 61
62 ncbi_identifier = params['param_dict']['reference_source']['requested_identifier'] 62 ncbi_identifier = params['param_dict']['reference_source']['requested_identifier']
63 ftp = FTP( NCBI_FTP_SERVER ) 63 ftp = FTP(NCBI_FTP_SERVER)
64 ftp.login() 64 ftp.login()
65 65
66 path_contents = _get_files_in_ftp_path( ftp, NCBI_DOWNLOAD_PATH ) 66 path_contents = _get_files_in_ftp_path(ftp, NCBI_DOWNLOAD_PATH)
67 67
68 ncbi_file_name = None 68 ncbi_file_name = None
69 get_stream_reader = None 69 get_stream_reader = None
70 ext = None 70 ext = None
71 for ext, get_stream_reader in COMPRESSED_EXTENSIONS: 71 for ext, get_stream_reader in COMPRESSED_EXTENSIONS:
72 if "%s%s" % ( ncbi_identifier, ext ) in path_contents: 72 if "%s%s" % (ncbi_identifier, ext) in path_contents:
73 ncbi_file_name = "%s%s%s" % ( NCBI_DOWNLOAD_PATH, ncbi_identifier, ext ) 73 ncbi_file_name = "%s%s%s" % (NCBI_DOWNLOAD_PATH, ncbi_identifier, ext)
74 break 74 break
75 75
76 if not ncbi_file_name: 76 if not ncbi_file_name:
77 raise Exception( 'Unable to determine filename for NCBI database for %s: %s' % ( ncbi_identifier, path_contents ) ) 77 raise Exception('Unable to determine filename for NCBI database for %s: %s' % (ncbi_identifier, path_contents))
78 78
79 tmp_dir = tempfile.mkdtemp( prefix='tmp-data-manager-ncbi-' ) 79 tmp_dir = tempfile.mkdtemp(prefix='tmp-data-manager-ncbi-')
80 ncbi_fasta_filename = os.path.join( tmp_dir, "%s%s" % ( ncbi_identifier, ext ) ) 80 ncbi_fasta_filename = os.path.join(tmp_dir, "%s%s" % (ncbi_identifier, ext))
81 81
82 fasta_base_filename = "%s.fa" % database_id 82 fasta_base_filename = "%s.fa" % database_id
83 fasta_filename = os.path.join( target_directory, fasta_base_filename ) 83 fasta_filename = os.path.join(target_directory, fasta_base_filename)
84 fasta_writer = open( fasta_filename, 'wb+' ) 84 fasta_writer = open(fasta_filename, 'wb+')
85 85
86 tmp_extract_dir = os.path.join ( tmp_dir, 'extracted_fasta' ) 86 tmp_extract_dir = os.path.join(tmp_dir, 'extracted_fasta')
87 os.mkdir( tmp_extract_dir ) 87 os.mkdir(tmp_extract_dir)
88 88
89 tmp_fasta = open( ncbi_fasta_filename, 'wb+' ) 89 tmp_fasta = open(ncbi_fasta_filename, 'wb+')
90 90
91 ftp.retrbinary( 'RETR %s' % ncbi_file_name, tmp_fasta.write ) 91 ftp.retrbinary('RETR %s' % ncbi_file_name, tmp_fasta.write)
92 92
93 tmp_fasta.flush() 93 tmp_fasta.flush()
94 tmp_fasta.seek( 0 ) 94 tmp_fasta.seek(0)
95 95
96 fasta_readers = get_stream_reader( tmp_fasta, tmp_extract_dir ) 96 fasta_readers = get_stream_reader(tmp_fasta, tmp_extract_dir)
97 97
98 data_table_entry = _stream_fasta_to_file( fasta_readers, target_directory, database_id, database_name, params ) 98 data_table_entry = _stream_fasta_to_file(fasta_readers, target_directory, database_id, database_name, params)
99 _add_data_table_entry( data_manager_dict, data_table_entry ) 99 _add_data_table_entry(data_manager_dict, data_table_entry)
100 100
101 for fasta_reader in fasta_readers: 101 for fasta_reader in fasta_readers:
102 fasta_reader.close() 102 fasta_reader.close()
103 tmp_fasta.close() 103 tmp_fasta.close()
104 cleanup_before_exit( tmp_dir ) 104 cleanup_before_exit(tmp_dir)
105 105
106 106
107 def download_from_url( data_manager_dict, params, target_directory, database_id, database_name ): 107 def download_from_url(data_manager_dict, params, target_directory, database_id, database_name):
108 #TODO: we should automatically do decompression here 108 # TODO: we should automatically do decompression here
109 urls = filter( bool, map( lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split( '\n' ) ) ) 109 urls = filter(bool, map(lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split('\n')))
110 fasta_reader = [ urllib2.urlopen( url ) for url in urls ] 110 fasta_reader = [urllib2.urlopen(url) for url in urls]
111 111
112 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) 112 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params)
113 _add_data_table_entry( data_manager_dict, data_table_entry ) 113 _add_data_table_entry(data_manager_dict, data_table_entry)
114 114
115 115
116 def download_from_history( data_manager_dict, params, target_directory, database_id, database_name ): 116 def download_from_history(data_manager_dict, params, target_directory, database_id, database_name):
117 #TODO: allow multiple FASTA input files 117 # TODO: allow multiple FASTA input files
118 input_filename = params['param_dict']['reference_source']['input_fasta'] 118 input_filename = params['param_dict']['reference_source']['input_fasta']
119 if isinstance( input_filename, list ): 119 if isinstance(input_filename, list):
120 fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ] 120 fasta_reader = [open(filename, 'rb') for filename in input_filename]
121 else: 121 else:
122 fasta_reader = open( input_filename ) 122 fasta_reader = open(input_filename)
123 123
124 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) 124 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params)
125 _add_data_table_entry( data_manager_dict, data_table_entry ) 125 _add_data_table_entry(data_manager_dict, data_table_entry)
126 126
127 127
128 def copy_from_directory( data_manager_dict, params, target_directory, database_id, database_name ): 128 def copy_from_directory(data_manager_dict, params, target_directory, database_id, database_name):
129 input_filename = params['param_dict']['reference_source']['fasta_filename'] 129 input_filename = params['param_dict']['reference_source']['fasta_filename']
130 create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink' 130 create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink'
131 if create_symlink: 131 if create_symlink:
132 data_table_entry = _create_symlink( input_filename, target_directory, database_id, database_name ) 132 data_table_entry = _create_symlink(input_filename, target_directory, database_id, database_name)
133 else: 133 else:
134 if isinstance( input_filename, list ): 134 if isinstance(input_filename, list):
135 fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ] 135 fasta_reader = [open(filename, 'rb') for filename in input_filename]
136 else: 136 else:
137 fasta_reader = open( input_filename ) 137 fasta_reader = open(input_filename)
138 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) 138 data_table_entry = _stream_fasta_to_file(fasta_reader, target_directory, database_id, database_name, params)
139 _add_data_table_entry( data_manager_dict, data_table_entry ) 139 _add_data_table_entry(data_manager_dict, data_table_entry)
140 140
141 141
142 def _add_data_table_entry( data_manager_dict, data_table_entry ): 142 def _add_data_table_entry(data_manager_dict, data_table_entry):
143 data_manager_dict['data_tables'] = data_manager_dict.get( 'data_tables', {} ) 143 data_manager_dict['data_tables'] = data_manager_dict.get('data_tables', {})
144 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get( 'diamond_database', [] ) 144 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get('diamond_database', [])
145 data_manager_dict['data_tables']['diamond_database'].append( data_table_entry ) 145 data_manager_dict['data_tables']['diamond_database'].append(data_table_entry)
146 return data_manager_dict 146 return data_manager_dict
147 147
148 148
149 def _stream_fasta_to_file( fasta_stream, target_directory, database_id, database_name, params, close_stream=True ): 149 def _stream_fasta_to_file(fasta_stream, target_directory, database_id, database_name, params, close_stream=True):
150 fasta_base_filename = "%s.fa" % database_id 150 fasta_base_filename = "%s.fa" % database_id
151 fasta_filename = os.path.join( target_directory, fasta_base_filename ) 151 fasta_filename = os.path.join(target_directory, fasta_base_filename)
152 152
153 temp_fasta = tempfile.NamedTemporaryFile( delete=False, suffix=".fasta" ) 153 temp_fasta = tempfile.NamedTemporaryFile(delete=False, suffix=".fasta")
154 temp_fasta.close() 154 temp_fasta.close()
155 fasta_writer = open( temp_fasta.name, 'wb+' ) 155 fasta_writer = open(temp_fasta.name, 'wb+')
156 156
157 if isinstance( fasta_stream, list ) and len( fasta_stream ) == 1: 157 if isinstance(fasta_stream, list) and len(fasta_stream) == 1:
158 fasta_stream = fasta_stream[0] 158 fasta_stream = fasta_stream[0]
159 159
160 if isinstance( fasta_stream, list ): 160 if isinstance(fasta_stream, list):
161 last_char = None 161 last_char = None
162 for fh in fasta_stream: 162 for fh in fasta_stream:
163 if last_char not in [ None, '\n', '\r' ]: 163 if last_char not in [None, '\n', '\r']:
164 fasta_writer.write( '\n' ) 164 fasta_writer.write('\n')
165 while True: 165 while True:
166 data = fh.read( CHUNK_SIZE ) 166 data = fh.read(CHUNK_SIZE)
167 if data: 167 if data:
168 fasta_writer.write( data ) 168 fasta_writer.write(data)
169 last_char = data[-1] 169 last_char = data[-1]
170 else: 170 else:
171 break 171 break
172 if close_stream: 172 if close_stream:
173 fh.close() 173 fh.close()
174 else: 174 else:
175 while True: 175 while True:
176 data = fasta_stream.read( CHUNK_SIZE ) 176 data = fasta_stream.read(CHUNK_SIZE)
177 if data: 177 if data:
178 fasta_writer.write( data ) 178 fasta_writer.write(data)
179 else: 179 else:
180 break 180 break
181 if close_stream: 181 if close_stream:
182 fasta_stream.close() 182 fasta_stream.close()
183 183
184 fasta_writer.close() 184 fasta_writer.close()
185 185
186 args = [ 'diamond', 'makedb', '--in', temp_fasta.name, '--db', fasta_filename] 186 args = ['diamond', 'makedb', '--in', temp_fasta.name, '--db', fasta_filename]
187 187
188 tmp_stderr = tempfile.NamedTemporaryFile( prefix = "tmp-data-manager-diamond-database-builder-stderr" ) 188 tmp_stderr = tempfile.NamedTemporaryFile(prefix="tmp-data-manager-diamond-database-builder-stderr")
189 proc = subprocess.Popen( args=args, shell=False, cwd=target_directory, stderr=tmp_stderr.fileno() ) 189 proc = subprocess.Popen(args=args, shell=False, cwd=target_directory, stderr=tmp_stderr.fileno())
190 return_code = proc.wait() 190 return_code = proc.wait()
191 if return_code: 191 if return_code:
192 tmp_stderr.flush() 192 tmp_stderr.flush()
193 tmp_stderr.seek(0) 193 tmp_stderr.seek(0)
194 print >> sys.stderr, "Error building diamond database:" 194 print >> sys.stderr, "Error building diamond database:"
195 while True: 195 while True:
196 chunk = tmp_stderr.read( CHUNK_SIZE ) 196 chunk = tmp_stderr.read(CHUNK_SIZE)
197 if not chunk: 197 if not chunk:
198 break 198 break
199 sys.stderr.write( chunk ) 199 sys.stderr.write(chunk)
200 sys.exit( return_code ) 200 sys.exit(return_code)
201 tmp_stderr.close() 201 tmp_stderr.close()
202 os.remove( temp_fasta.name ) 202 os.remove(temp_fasta.name)
203 return dict( value=database_id, name=database_name, db_path="%s.dmnd" % fasta_base_filename ) 203 return dict(value=database_id, name=database_name, db_path="%s.dmnd" % fasta_base_filename)
204 204
205 205
206 def _create_symlink( input_filename, target_directory, database_id, database_name ): 206 def _create_symlink(input_filename, target_directory, database_id, database_name):
207 fasta_base_filename = "%s.fa" % database_id 207 fasta_base_filename = "%s.fa" % database_id
208 fasta_filename = os.path.join( target_directory, fasta_base_filename ) 208 fasta_filename = os.path.join(target_directory, fasta_base_filename)
209 os.symlink( input_filename, fasta_filename ) 209 os.symlink(input_filename, fasta_filename)
210 return dict( value=database_id, name=database_name, db_path=fasta_base_filename ) 210 return dict(value=database_id, name=database_name, db_path=fasta_base_filename)
211 211
212 212
213 REFERENCE_SOURCE_TO_DOWNLOAD = dict( ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory ) 213 REFERENCE_SOURCE_TO_DOWNLOAD = dict(ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory)
214
214 215
215 def main(): 216 def main():
216 #Parse Command Line 217 # Parse Command Line
217 parser = optparse.OptionParser() 218 parser = optparse.OptionParser()
218 parser.add_option( '-d', '--dbkey_description', dest='dbkey_description', action='store', type="string", default=None, help='dbkey_description' ) 219 parser.add_option('-d', '--dbkey_description', dest='dbkey_description', action='store', type="string", default=None, help='dbkey_description')
219 (options, args) = parser.parse_args() 220 (options, args) = parser.parse_args()
220 221
221 filename = args[0] 222 filename = args[0]
222 223
223 params = from_json_string( open( filename ).read() ) 224 params = json.loads(open(filename).read())
224 target_directory = params[ 'output_data' ][0]['extra_files_path'] 225 target_directory = params['output_data'][0]['extra_files_path']
225 os.mkdir( target_directory ) 226 os.mkdir(target_directory)
226 data_manager_dict = {} 227 data_manager_dict = {}
227 228
228 database_id = params['param_dict']['database_id'] 229 database_id = params['param_dict']['database_id']
229 database_name = params['param_dict']['database_name'] 230 database_name = params['param_dict']['database_name']
230 231
231 #Fetch the FASTA 232 # Fetch the FASTA
232 REFERENCE_SOURCE_TO_DOWNLOAD[ params['param_dict']['reference_source']['reference_source_selector'] ]( data_manager_dict, params, target_directory, database_id, database_name ) 233 REFERENCE_SOURCE_TO_DOWNLOAD[params['param_dict']['reference_source']['reference_source_selector']](data_manager_dict, params, target_directory, database_id, database_name)
233 234
234 #save info to json file 235 # save info to json file
235 open( filename, 'wb' ).write( to_json_string( data_manager_dict ) ) 236 open(filename, 'w').write(json.dumps(data_manager_dict, sort_keys=True))
237
236 238
237 if __name__ == "__main__": 239 if __name__ == "__main__":
238 main() 240 main()