Mercurial > repos > bgruening > data_manager_diamond_database_builder
comparison data_manager/data_manager_diamond_database_builder.py @ 0:ce62d0912b10 draft
Imported from capsule None
author | bgruening |
---|---|
date | Sun, 08 Feb 2015 10:05:20 -0500 |
parents | |
children | 5a0d0bee4f8d |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:ce62d0912b10 |
---|---|
1 #!/usr/bin/env python | |
2 import sys | |
3 import os | |
4 import tempfile | |
5 import shutil | |
6 import optparse | |
7 import urllib2 | |
8 import subprocess | |
9 from ftplib import FTP | |
10 import tarfile | |
11 import zipfile | |
12 import gzip | |
13 import bz2 | |
14 | |
15 from galaxy.util.json import from_json_string, to_json_string | |
16 | |
17 CHUNK_SIZE = 2**20 #1mb | |
18 | |
19 def cleanup_before_exit( tmp_dir ): | |
20 if tmp_dir and os.path.exists( tmp_dir ): | |
21 shutil.rmtree( tmp_dir ) | |
22 | |
23 | |
24 def stop_err(msg): | |
25 sys.stderr.write(msg) | |
26 sys.exit(1) | |
27 | |
28 | |
29 def _get_files_in_ftp_path( ftp, path ): | |
30 path_contents = [] | |
31 ftp.retrlines( 'MLSD %s' % ( path ), path_contents.append ) | |
32 return [ line.split( ';' )[ -1 ].lstrip() for line in path_contents ] | |
33 | |
34 | |
35 def _get_stream_readers_for_tar( file_obj, tmp_dir ): | |
36 fasta_tar = tarfile.open( fileobj=file_obj, mode='r:*' ) | |
37 return [ fasta_tar.extractfile( member ) for member in fasta_tar.getmembers() ] | |
38 | |
39 | |
40 def _get_stream_readers_for_zip( file_obj, tmp_dir ): | |
41 fasta_zip = zipfile.ZipFile( file_obj, 'r' ) | |
42 rval = [] | |
43 for member in fasta_zip.namelist(): | |
44 fasta_zip.extract( member, tmp_dir ) | |
45 rval.append( open( os.path.join( tmp_dir, member ), 'rb' ) ) | |
46 return rval | |
47 | |
48 | |
49 def _get_stream_readers_for_gzip( file_obj, tmp_dir ): | |
50 return [ gzip.GzipFile( fileobj=file_obj, mode='rb' ) ] | |
51 | |
52 | |
53 def _get_stream_readers_for_bz2( file_obj, tmp_dir ): | |
54 return [ bz2.BZ2File( file_obj.name, 'rb' ) ] | |
55 | |
56 | |
57 def download_from_ncbi( data_manager_dict, params, target_directory, database_id, database_name ): | |
58 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov' | |
59 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/' | |
60 COMPRESSED_EXTENSIONS = [ ( '.tar.gz', _get_stream_readers_for_tar ), ( '.tar.bz2', _get_stream_readers_for_tar ), ( '.zip', _get_stream_readers_for_zip ), ( '.gz', _get_stream_readers_for_gzip ), ( '.bz2', _get_stream_readers_for_bz2 ) ] | |
61 | |
62 ncbi_identifier = params['param_dict']['reference_source']['requested_identifier'] | |
63 ftp = FTP( NCBI_FTP_SERVER ) | |
64 ftp.login() | |
65 | |
66 path_contents = _get_files_in_ftp_path( ftp, NCBI_DOWNLOAD_PATH ) | |
67 | |
68 ncbi_file_name = None | |
69 get_stream_reader = None | |
70 ext = None | |
71 for ext, get_stream_reader in COMPRESSED_EXTENSIONS: | |
72 if "%s%s" % ( ncbi_identifier, ext ) in path_contents: | |
73 ncbi_file_name = "%s%s%s" % ( NCBI_DOWNLOAD_PATH, ncbi_identifier, ext ) | |
74 break | |
75 | |
76 if not ncbi_file_name: | |
77 raise Exception( 'Unable to determine filename for NCBI database for %s: %s' % ( ncbi_identifier, path_contents ) ) | |
78 | |
79 tmp_dir = tempfile.mkdtemp( prefix='tmp-data-manager-ncbi-' ) | |
80 ncbi_fasta_filename = os.path.join( tmp_dir, "%s%s" % ( ncbi_identifier, ext ) ) | |
81 | |
82 fasta_base_filename = "%s.fa" % database_id | |
83 fasta_filename = os.path.join( target_directory, fasta_base_filename ) | |
84 fasta_writer = open( fasta_filename, 'wb+' ) | |
85 | |
86 tmp_extract_dir = os.path.join ( tmp_dir, 'extracted_fasta' ) | |
87 os.mkdir( tmp_extract_dir ) | |
88 | |
89 tmp_fasta = open( ncbi_fasta_filename, 'wb+' ) | |
90 | |
91 ftp.retrbinary( 'RETR %s' % ncbi_file_name, tmp_fasta.write ) | |
92 | |
93 tmp_fasta.flush() | |
94 tmp_fasta.seek( 0 ) | |
95 | |
96 fasta_readers = get_stream_reader( tmp_fasta, tmp_extract_dir ) | |
97 | |
98 data_table_entry = _stream_fasta_to_file( fasta_readers, target_directory, database_id, database_name, params ) | |
99 _add_data_table_entry( data_manager_dict, data_table_entry ) | |
100 | |
101 for fasta_reader in fasta_readers: | |
102 fasta_reader.close() | |
103 tmp_fasta.close() | |
104 cleanup_before_exit( tmp_dir ) | |
105 | |
106 | |
107 def download_from_url( data_manager_dict, params, target_directory, database_id, database_name ): | |
108 #TODO: we should automatically do decompression here | |
109 urls = filter( bool, map( lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split( '\n' ) ) ) | |
110 fasta_reader = [ urllib2.urlopen( url ) for url in urls ] | |
111 | |
112 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) | |
113 _add_data_table_entry( data_manager_dict, data_table_entry ) | |
114 | |
115 | |
116 def download_from_history( data_manager_dict, params, target_directory, database_id, database_name ): | |
117 #TODO: allow multiple FASTA input files | |
118 input_filename = params['param_dict']['reference_source']['input_fasta'] | |
119 if isinstance( input_filename, list ): | |
120 fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ] | |
121 else: | |
122 fasta_reader = open( input_filename ) | |
123 | |
124 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) | |
125 _add_data_table_entry( data_manager_dict, data_table_entry ) | |
126 | |
127 | |
128 def copy_from_directory( data_manager_dict, params, target_directory, database_id, database_name ): | |
129 input_filename = params['param_dict']['reference_source']['fasta_filename'] | |
130 create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink' | |
131 if create_symlink: | |
132 data_table_entry = _create_symlink( input_filename, target_directory, database_id, database_name ) | |
133 else: | |
134 if isinstance( input_filename, list ): | |
135 fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ] | |
136 else: | |
137 fasta_reader = open( input_filename ) | |
138 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) | |
139 _add_data_table_entry( data_manager_dict, data_table_entry ) | |
140 | |
141 | |
142 def _add_data_table_entry( data_manager_dict, data_table_entry ): | |
143 data_manager_dict['data_tables'] = data_manager_dict.get( 'data_tables', {} ) | |
144 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get( 'diamond_database', [] ) | |
145 data_manager_dict['data_tables']['diamond_database'].append( data_table_entry ) | |
146 return data_manager_dict | |
147 | |
148 | |
149 def _stream_fasta_to_file( fasta_stream, target_directory, database_id, database_name, params, close_stream=True ): | |
150 fasta_base_filename = "%s.fa" % database_id | |
151 fasta_filename = os.path.join( target_directory, fasta_base_filename ) | |
152 | |
153 temp_fasta = tempfile.NamedTemporaryFile( delete=False, suffix=".fasta" ) | |
154 temp_fasta.close() | |
155 fasta_writer = open( temp_fasta.name, 'wb+' ) | |
156 | |
157 if isinstance( fasta_stream, list ) and len( fasta_stream ) == 1: | |
158 fasta_stream = fasta_stream[0] | |
159 | |
160 if isinstance( fasta_stream, list ): | |
161 last_char = None | |
162 for fh in fasta_stream: | |
163 if last_char not in [ None, '\n', '\r' ]: | |
164 fasta_writer.write( '\n' ) | |
165 while True: | |
166 data = fh.read( CHUNK_SIZE ) | |
167 if data: | |
168 fasta_writer.write( data ) | |
169 last_char = data[-1] | |
170 else: | |
171 break | |
172 if close_stream: | |
173 fh.close() | |
174 else: | |
175 while True: | |
176 data = fasta_stream.read( CHUNK_SIZE ) | |
177 if data: | |
178 fasta_writer.write( data ) | |
179 else: | |
180 break | |
181 if close_stream: | |
182 fasta_stream.close() | |
183 | |
184 fasta_writer.close() | |
185 | |
186 args = [ 'diamond', 'makedb', '--in', temp_fasta.name, '--db', fasta_filename] | |
187 | |
188 tmp_stderr = tempfile.NamedTemporaryFile( prefix = "tmp-data-manager-diamond-database-builder-stderr" ) | |
189 proc = subprocess.Popen( args=args, shell=False, cwd=target_directory, stderr=tmp_stderr.fileno() ) | |
190 return_code = proc.wait() | |
191 if return_code: | |
192 tmp_stderr.flush() | |
193 tmp_stderr.seek(0) | |
194 print >> sys.stderr, "Error building diamond database:" | |
195 while True: | |
196 chunk = tmp_stderr.read( CHUNK_SIZE ) | |
197 if not chunk: | |
198 break | |
199 sys.stderr.write( chunk ) | |
200 sys.exit( return_code ) | |
201 tmp_stderr.close() | |
202 os.remove( temp_fasta.name ) | |
203 return dict( value=database_id, name=database_name, db_path="%s.dmnd" % fasta_base_filename ) | |
204 | |
205 | |
206 def _create_symlink( input_filename, target_directory, database_id, database_name ): | |
207 fasta_base_filename = "%s.fa" % database_id | |
208 fasta_filename = os.path.join( target_directory, fasta_base_filename ) | |
209 os.symlink( input_filename, fasta_filename ) | |
210 return dict( value=database_id, name=database_name, db_path=fasta_base_filename ) | |
211 | |
212 | |
213 REFERENCE_SOURCE_TO_DOWNLOAD = dict( ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory ) | |
214 | |
215 def main(): | |
216 #Parse Command Line | |
217 parser = optparse.OptionParser() | |
218 parser.add_option( '-d', '--dbkey_description', dest='dbkey_description', action='store', type="string", default=None, help='dbkey_description' ) | |
219 (options, args) = parser.parse_args() | |
220 | |
221 filename = args[0] | |
222 | |
223 params = from_json_string( open( filename ).read() ) | |
224 target_directory = params[ 'output_data' ][0]['extra_files_path'] | |
225 os.mkdir( target_directory ) | |
226 data_manager_dict = {} | |
227 | |
228 database_id = params['param_dict']['database_id'] | |
229 database_name = params['param_dict']['database_name'] | |
230 | |
231 #Fetch the FASTA | |
232 REFERENCE_SOURCE_TO_DOWNLOAD[ params['param_dict']['reference_source']['reference_source_selector'] ]( data_manager_dict, params, target_directory, database_id, database_name ) | |
233 | |
234 #save info to json file | |
235 open( filename, 'wb' ).write( to_json_string( data_manager_dict ) ) | |
236 | |
237 if __name__ == "__main__": | |
238 main() |