Mercurial > repos > bgruening > data_manager_diamond_database_builder
comparison data_manager/data_manager_diamond_database_builder.py @ 0:ce62d0912b10 draft
Imported from capsule None
| author | bgruening |
|---|---|
| date | Sun, 08 Feb 2015 10:05:20 -0500 |
| parents | |
| children | 5a0d0bee4f8d |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:ce62d0912b10 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import sys | |
| 3 import os | |
| 4 import tempfile | |
| 5 import shutil | |
| 6 import optparse | |
| 7 import urllib2 | |
| 8 import subprocess | |
| 9 from ftplib import FTP | |
| 10 import tarfile | |
| 11 import zipfile | |
| 12 import gzip | |
| 13 import bz2 | |
| 14 | |
| 15 from galaxy.util.json import from_json_string, to_json_string | |
| 16 | |
| 17 CHUNK_SIZE = 2**20 #1mb | |
| 18 | |
| 19 def cleanup_before_exit( tmp_dir ): | |
| 20 if tmp_dir and os.path.exists( tmp_dir ): | |
| 21 shutil.rmtree( tmp_dir ) | |
| 22 | |
| 23 | |
| 24 def stop_err(msg): | |
| 25 sys.stderr.write(msg) | |
| 26 sys.exit(1) | |
| 27 | |
| 28 | |
| 29 def _get_files_in_ftp_path( ftp, path ): | |
| 30 path_contents = [] | |
| 31 ftp.retrlines( 'MLSD %s' % ( path ), path_contents.append ) | |
| 32 return [ line.split( ';' )[ -1 ].lstrip() for line in path_contents ] | |
| 33 | |
| 34 | |
| 35 def _get_stream_readers_for_tar( file_obj, tmp_dir ): | |
| 36 fasta_tar = tarfile.open( fileobj=file_obj, mode='r:*' ) | |
| 37 return [ fasta_tar.extractfile( member ) for member in fasta_tar.getmembers() ] | |
| 38 | |
| 39 | |
| 40 def _get_stream_readers_for_zip( file_obj, tmp_dir ): | |
| 41 fasta_zip = zipfile.ZipFile( file_obj, 'r' ) | |
| 42 rval = [] | |
| 43 for member in fasta_zip.namelist(): | |
| 44 fasta_zip.extract( member, tmp_dir ) | |
| 45 rval.append( open( os.path.join( tmp_dir, member ), 'rb' ) ) | |
| 46 return rval | |
| 47 | |
| 48 | |
| 49 def _get_stream_readers_for_gzip( file_obj, tmp_dir ): | |
| 50 return [ gzip.GzipFile( fileobj=file_obj, mode='rb' ) ] | |
| 51 | |
| 52 | |
| 53 def _get_stream_readers_for_bz2( file_obj, tmp_dir ): | |
| 54 return [ bz2.BZ2File( file_obj.name, 'rb' ) ] | |
| 55 | |
| 56 | |
| 57 def download_from_ncbi( data_manager_dict, params, target_directory, database_id, database_name ): | |
| 58 NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov' | |
| 59 NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/' | |
| 60 COMPRESSED_EXTENSIONS = [ ( '.tar.gz', _get_stream_readers_for_tar ), ( '.tar.bz2', _get_stream_readers_for_tar ), ( '.zip', _get_stream_readers_for_zip ), ( '.gz', _get_stream_readers_for_gzip ), ( '.bz2', _get_stream_readers_for_bz2 ) ] | |
| 61 | |
| 62 ncbi_identifier = params['param_dict']['reference_source']['requested_identifier'] | |
| 63 ftp = FTP( NCBI_FTP_SERVER ) | |
| 64 ftp.login() | |
| 65 | |
| 66 path_contents = _get_files_in_ftp_path( ftp, NCBI_DOWNLOAD_PATH ) | |
| 67 | |
| 68 ncbi_file_name = None | |
| 69 get_stream_reader = None | |
| 70 ext = None | |
| 71 for ext, get_stream_reader in COMPRESSED_EXTENSIONS: | |
| 72 if "%s%s" % ( ncbi_identifier, ext ) in path_contents: | |
| 73 ncbi_file_name = "%s%s%s" % ( NCBI_DOWNLOAD_PATH, ncbi_identifier, ext ) | |
| 74 break | |
| 75 | |
| 76 if not ncbi_file_name: | |
| 77 raise Exception( 'Unable to determine filename for NCBI database for %s: %s' % ( ncbi_identifier, path_contents ) ) | |
| 78 | |
| 79 tmp_dir = tempfile.mkdtemp( prefix='tmp-data-manager-ncbi-' ) | |
| 80 ncbi_fasta_filename = os.path.join( tmp_dir, "%s%s" % ( ncbi_identifier, ext ) ) | |
| 81 | |
| 82 fasta_base_filename = "%s.fa" % database_id | |
| 83 fasta_filename = os.path.join( target_directory, fasta_base_filename ) | |
| 84 fasta_writer = open( fasta_filename, 'wb+' ) | |
| 85 | |
| 86 tmp_extract_dir = os.path.join ( tmp_dir, 'extracted_fasta' ) | |
| 87 os.mkdir( tmp_extract_dir ) | |
| 88 | |
| 89 tmp_fasta = open( ncbi_fasta_filename, 'wb+' ) | |
| 90 | |
| 91 ftp.retrbinary( 'RETR %s' % ncbi_file_name, tmp_fasta.write ) | |
| 92 | |
| 93 tmp_fasta.flush() | |
| 94 tmp_fasta.seek( 0 ) | |
| 95 | |
| 96 fasta_readers = get_stream_reader( tmp_fasta, tmp_extract_dir ) | |
| 97 | |
| 98 data_table_entry = _stream_fasta_to_file( fasta_readers, target_directory, database_id, database_name, params ) | |
| 99 _add_data_table_entry( data_manager_dict, data_table_entry ) | |
| 100 | |
| 101 for fasta_reader in fasta_readers: | |
| 102 fasta_reader.close() | |
| 103 tmp_fasta.close() | |
| 104 cleanup_before_exit( tmp_dir ) | |
| 105 | |
| 106 | |
| 107 def download_from_url( data_manager_dict, params, target_directory, database_id, database_name ): | |
| 108 #TODO: we should automatically do decompression here | |
| 109 urls = filter( bool, map( lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split( '\n' ) ) ) | |
| 110 fasta_reader = [ urllib2.urlopen( url ) for url in urls ] | |
| 111 | |
| 112 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) | |
| 113 _add_data_table_entry( data_manager_dict, data_table_entry ) | |
| 114 | |
| 115 | |
| 116 def download_from_history( data_manager_dict, params, target_directory, database_id, database_name ): | |
| 117 #TODO: allow multiple FASTA input files | |
| 118 input_filename = params['param_dict']['reference_source']['input_fasta'] | |
| 119 if isinstance( input_filename, list ): | |
| 120 fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ] | |
| 121 else: | |
| 122 fasta_reader = open( input_filename ) | |
| 123 | |
| 124 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) | |
| 125 _add_data_table_entry( data_manager_dict, data_table_entry ) | |
| 126 | |
| 127 | |
| 128 def copy_from_directory( data_manager_dict, params, target_directory, database_id, database_name ): | |
| 129 input_filename = params['param_dict']['reference_source']['fasta_filename'] | |
| 130 create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink' | |
| 131 if create_symlink: | |
| 132 data_table_entry = _create_symlink( input_filename, target_directory, database_id, database_name ) | |
| 133 else: | |
| 134 if isinstance( input_filename, list ): | |
| 135 fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ] | |
| 136 else: | |
| 137 fasta_reader = open( input_filename ) | |
| 138 data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params ) | |
| 139 _add_data_table_entry( data_manager_dict, data_table_entry ) | |
| 140 | |
| 141 | |
| 142 def _add_data_table_entry( data_manager_dict, data_table_entry ): | |
| 143 data_manager_dict['data_tables'] = data_manager_dict.get( 'data_tables', {} ) | |
| 144 data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get( 'diamond_database', [] ) | |
| 145 data_manager_dict['data_tables']['diamond_database'].append( data_table_entry ) | |
| 146 return data_manager_dict | |
| 147 | |
| 148 | |
| 149 def _stream_fasta_to_file( fasta_stream, target_directory, database_id, database_name, params, close_stream=True ): | |
| 150 fasta_base_filename = "%s.fa" % database_id | |
| 151 fasta_filename = os.path.join( target_directory, fasta_base_filename ) | |
| 152 | |
| 153 temp_fasta = tempfile.NamedTemporaryFile( delete=False, suffix=".fasta" ) | |
| 154 temp_fasta.close() | |
| 155 fasta_writer = open( temp_fasta.name, 'wb+' ) | |
| 156 | |
| 157 if isinstance( fasta_stream, list ) and len( fasta_stream ) == 1: | |
| 158 fasta_stream = fasta_stream[0] | |
| 159 | |
| 160 if isinstance( fasta_stream, list ): | |
| 161 last_char = None | |
| 162 for fh in fasta_stream: | |
| 163 if last_char not in [ None, '\n', '\r' ]: | |
| 164 fasta_writer.write( '\n' ) | |
| 165 while True: | |
| 166 data = fh.read( CHUNK_SIZE ) | |
| 167 if data: | |
| 168 fasta_writer.write( data ) | |
| 169 last_char = data[-1] | |
| 170 else: | |
| 171 break | |
| 172 if close_stream: | |
| 173 fh.close() | |
| 174 else: | |
| 175 while True: | |
| 176 data = fasta_stream.read( CHUNK_SIZE ) | |
| 177 if data: | |
| 178 fasta_writer.write( data ) | |
| 179 else: | |
| 180 break | |
| 181 if close_stream: | |
| 182 fasta_stream.close() | |
| 183 | |
| 184 fasta_writer.close() | |
| 185 | |
| 186 args = [ 'diamond', 'makedb', '--in', temp_fasta.name, '--db', fasta_filename] | |
| 187 | |
| 188 tmp_stderr = tempfile.NamedTemporaryFile( prefix = "tmp-data-manager-diamond-database-builder-stderr" ) | |
| 189 proc = subprocess.Popen( args=args, shell=False, cwd=target_directory, stderr=tmp_stderr.fileno() ) | |
| 190 return_code = proc.wait() | |
| 191 if return_code: | |
| 192 tmp_stderr.flush() | |
| 193 tmp_stderr.seek(0) | |
| 194 print >> sys.stderr, "Error building diamond database:" | |
| 195 while True: | |
| 196 chunk = tmp_stderr.read( CHUNK_SIZE ) | |
| 197 if not chunk: | |
| 198 break | |
| 199 sys.stderr.write( chunk ) | |
| 200 sys.exit( return_code ) | |
| 201 tmp_stderr.close() | |
| 202 os.remove( temp_fasta.name ) | |
| 203 return dict( value=database_id, name=database_name, db_path="%s.dmnd" % fasta_base_filename ) | |
| 204 | |
| 205 | |
| 206 def _create_symlink( input_filename, target_directory, database_id, database_name ): | |
| 207 fasta_base_filename = "%s.fa" % database_id | |
| 208 fasta_filename = os.path.join( target_directory, fasta_base_filename ) | |
| 209 os.symlink( input_filename, fasta_filename ) | |
| 210 return dict( value=database_id, name=database_name, db_path=fasta_base_filename ) | |
| 211 | |
| 212 | |
| 213 REFERENCE_SOURCE_TO_DOWNLOAD = dict( ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory ) | |
| 214 | |
| 215 def main(): | |
| 216 #Parse Command Line | |
| 217 parser = optparse.OptionParser() | |
| 218 parser.add_option( '-d', '--dbkey_description', dest='dbkey_description', action='store', type="string", default=None, help='dbkey_description' ) | |
| 219 (options, args) = parser.parse_args() | |
| 220 | |
| 221 filename = args[0] | |
| 222 | |
| 223 params = from_json_string( open( filename ).read() ) | |
| 224 target_directory = params[ 'output_data' ][0]['extra_files_path'] | |
| 225 os.mkdir( target_directory ) | |
| 226 data_manager_dict = {} | |
| 227 | |
| 228 database_id = params['param_dict']['database_id'] | |
| 229 database_name = params['param_dict']['database_name'] | |
| 230 | |
| 231 #Fetch the FASTA | |
| 232 REFERENCE_SOURCE_TO_DOWNLOAD[ params['param_dict']['reference_source']['reference_source_selector'] ]( data_manager_dict, params, target_directory, database_id, database_name ) | |
| 233 | |
| 234 #save info to json file | |
| 235 open( filename, 'wb' ).write( to_json_string( data_manager_dict ) ) | |
| 236 | |
| 237 if __name__ == "__main__": | |
| 238 main() |
