comparison json_collect_data_source.py @ 34:d65de900967e draft

Uploaded 20170613
author fabio
date Tue, 13 Jun 2017 16:39:40 -0400
parents 1edc869cd008
children
comparison
equal deleted inserted replaced
33:228038cd0683 34:d65de900967e
4 import urllib 4 import urllib
5 import os.path 5 import os.path
6 import os 6 import os
7 from operator import itemgetter 7 from operator import itemgetter
8 import tarfile 8 import tarfile
9 import zipfile
9 10
10 __version__ = "1.0.0" 11 __version__ = "1.0.0"
11 CHUNK_SIZE = 2**20 #1mb 12 CHUNK_SIZE = 2**20 #1mb
12 VALID_CHARS = '.-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ ' 13 VALID_CHARS = '.-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ '
13 14
14 15
15 def splitext(path): 16 def splitext(path):
17 # extract the folder path and extension of a file from its path
16 for ext in ['.tar.gz', '.tar.bz2']: 18 for ext in ['.tar.gz', '.tar.bz2']:
17 if path.endswith(ext): 19 if path.endswith(ext):
18 path, ext = path[:-len(ext)], path[-len(ext):] 20 path, ext = path[:-len(ext)], path[-len(ext):]
19 break 21 break
20 else: 22 else:
55 output_stream = open( target_output_filename, 'wb' ) 57 output_stream = open( target_output_filename, 'wb' )
56 chunk_write( query_stream, output_stream ) 58 chunk_write( query_stream, output_stream )
57 query_stream.close() 59 query_stream.close()
58 output_stream.close() 60 output_stream.close()
59 61
60 def store_file_from_archive( file_object, target_output_filename, isString=False ): 62 def store_file_from_tarfile( file_object, target_output_filename, isString=False ):
61 """ Store file after extracting from archive and organize them as a collection using the structure 63 # store the file_object (from tarfile) on the filesystem
62 (collection-name)_(file-name).ext as file name
63 """
64 output_stream = open( target_output_filename, 'wb' ) 64 output_stream = open( target_output_filename, 'wb' )
65 #chunk_write( file_object.read(), output_stream ) 65 output_stream.write(file_object.read())
66 if not isString:
67 output_stream.write(file_object.read())
68 else:
69 output_stream.write(file_object)
70 output_stream.close() 66 output_stream.close()
71 67
72 68
73 def download_extra_data( query_ext_data, base_path ): 69 def download_extra_data( query_ext_data, base_path ):
74 """ Download any extra data defined in the JSON. 70 """ Download any extra data defined in the JSON.
103 else: 99 else:
104 meta_dict[ 'dataset_id' ] = dataset_id 100 meta_dict[ 'dataset_id' ] = dataset_id
105 return "%s\n" % json.dumps( meta_dict ) 101 return "%s\n" % json.dumps( meta_dict )
106 102
107 103
108 def walk_on_archive(target_output_filename, check_ext, archive_name, appdata_path, db_key="?"): 104 def walk_on_archive(target_output_filename, check_ext, archive_library, archive_name, appdata_path, db_key="?"):
105 # fix archive name using valid chars only
106 archive_name = ''.join(e for e in archive_name if e in VALID_CHARS)
109 archive_name = archive_name.replace("_", "-").replace(".", "-") 107 archive_name = archive_name.replace("_", "-").replace(".", "-")
110 with tarfile.open( target_output_filename, check_ext ) as tf: 108 if archive_library is "zipfile":
111 for entry in tf: 109 # iterate over entries inside the archive [zip]
112 if entry.isfile(): 110 with zipfile.ZipFile( target_output_filename, check_ext ) as zf:
113 fileobj = tf.extractfile( entry ) 111 for entry in zf.namelist():
114 # reserve the underscore for the collection searator 112 # if entry is file
115 filename = os.path.basename( entry.name ).replace("_", "-") 113 if entry.startswith("%s/" % entry.rstrip("/")) is False:
116 extension = splitext( filename )[1] 114 # retrieve file name
117 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+)_(?P<ext>[^_]+)_(?P<dbkey>[^_]+) 115 # the underscore character is reserved
118 if (len(extension) > 0): 116 filename = os.path.basename( entry.split("/")[-1] ).replace("_", "-")
119 filename = (filename[0:len(filename)-(len(extension)+1)]).replace(".", "-") + "." + extension + "_" + extension 117 # retrieve file extension
120 else: 118 extension = splitext( filename )[1]
121 extension = "auto" 119 # if no extension use 'auto'
122 filename_with_collection_prefix = archive_name + "_" + filename + "_" + db_key 120 if (len(extension) == 0):
123 target_entry_output_filename = os.path.join(appdata_path, filename_with_collection_prefix) 121 extension = "auto"
124 store_file_from_archive( fileobj, target_entry_output_filename ) 122 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+)_(?P<ext>[^_]+)_(?P<dbkey>[^_]+)
123 filename_with_collection_prefix = archive_name + "_" + filename + "_" + extension + "_" + db_key
124 # store current entry on filesystem
125 zf.extract( filename_with_collection_prefix, appdata_path )
126 elif archive_library is "tarfile":
127 # iterate over entries inside the archive [gz, bz2, tar]
128 with tarfile.open( target_output_filename, check_ext ) as tf:
129 for entry in tf:
130 if entry.isfile():
131 fileobj = tf.extractfile( entry )
132 # retrieve file name
133 # the underscore character is reserved
134 filename = os.path.basename( (entry.name).split("/")[-1] ).replace("_", "-")
135 # retrieve file extension
136 extension = splitext( filename )[1]
137 # if no extension use 'auto'
138 if (len(extension) == 0):
139 extension = "auto"
140 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+)_(?P<ext>[^_]+)_(?P<dbkey>[^_]+)
141 filename_with_collection_prefix = archive_name + "_" + filename + "_" + extension + "_" + db_key
142 target_entry_output_filename = os.path.join(appdata_path, filename_with_collection_prefix)
143 # store current entry on filesystem
144 store_file_from_tarfile( fileobj, target_entry_output_filename )
125 return True 145 return True
126 146
127 147
128 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary, appdata_path, options, args): 148 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary, appdata_path, options, args):
129 """ Main work function that operates on the JSON representation of 149 """ Main work function that operates on the JSON representation of
135 hda_id, dataset_id = set_up_config_values(json_params) 155 hda_id, dataset_id = set_up_config_values(json_params)
136 extension = query_item.get( 'extension' ) 156 extension = query_item.get( 'extension' )
137 url = query_item.get( 'url' ) 157 url = query_item.get( 'url' )
138 filename = query_item.get( 'name' ) 158 filename = query_item.get( 'name' )
139 159
160 # the organize parameter is considered for archives only
161 organize = query_item.get( 'organize', None )
162 if organize is None:
163 organize = False
164 else:
165 if organize.lower() == "true":
166 organize = True
167 elif organize.lower() == "false":
168 organize = False
169 else:
170 # if organize parameter is malformed -> set organize to False
171 organize = False
172
173 # check file extension
174 # if the file is an archive -> do not write metadata and extract files
140 check_ext = "" 175 check_ext = ""
176 archive_library = None
141 if ( url.endswith( "gz" ) ): 177 if ( url.endswith( "gz" ) ):
142 check_ext = "r:gz" 178 check_ext = "r:gz"
179 archive_library = "tarfile"
143 elif ( url.endswith( "bz2" ) ): 180 elif ( url.endswith( "bz2" ) ):
144 check_ext = "r:bz2" 181 check_ext = "r:bz2"
182 archive_library = "tarfile"
145 elif ( url.endswith( "tar" ) ): 183 elif ( url.endswith( "tar" ) ):
146 check_ext = "r:" 184 check_ext = "r:"
185 archive_library = "tarfile"
186 elif ( url.endswith( "zip" ) ):
187 check_ext = "r"
188 archive_library = "zipfile"
147 isArchive = bool( check_ext and check_ext.strip() ) 189 isArchive = bool( check_ext and check_ext.strip() )
148 190
149 extra_data = query_item.get( 'extra_data', None ) 191 extra_data = query_item.get( 'extra_data', None )
150 if primary: 192 if primary:
151 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename ) 193 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename )
152 name = construct_multi_filename( hda_id, filename, extension ) 194 name = construct_multi_filename( hda_id, filename, extension )
153 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) ) 195 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) )
154 if isArchive is False: 196 if (isArchive is False) or ((isArchive is True) and (organize is False)):
155 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, 197 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item,
156 target_output_filename, 198 target_output_filename,
157 ds_type='new_primary_dataset', 199 ds_type='new_primary_dataset',
158 primary=primary) ) 200 primary=primary) )
159 else: 201 else:
160 target_output_filename = output_filename 202 target_output_filename = output_filename
161 if isArchive is False: 203 if (isArchive is False) or ((isArchive is True) and (organize is False)):
162 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, 204 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item,
163 target_output_filename, 205 target_output_filename,
164 ds_type='dataset', 206 ds_type='dataset',
165 primary=primary) ) 207 primary=primary) )
166 208
167 if isArchive is False: 209 if (isArchive is False) or ((isArchive is True) and (organize is False)):
168 download_from_query( query_item, target_output_filename ) 210 download_from_query( query_item, target_output_filename )
169 else: 211 else:
212 # if the current entry is an archive download it inside appdata folder
170 target_output_path = os.path.join(appdata_path, filename) 213 target_output_path = os.path.join(appdata_path, filename)
171 download_from_query( query_item, target_output_path ) 214 download_from_query( query_item, target_output_path )
172 if extra_data: 215 if extra_data:
216 # just download extra data
173 extra_files_path = ''.join( [ target_output_filename, 'files' ] ) 217 extra_files_path = ''.join( [ target_output_filename, 'files' ] )
174 download_extra_data( extra_data, extra_files_path ) 218 download_extra_data( extra_data, extra_files_path )
175 219
176 """ the following code handles archives and decompress them in a collection """ 220 # if the current file is an archive and want to organize the content
177 if ( isArchive ): 221 # -> decompress the archive and populate the collection (has to be defined in the tool xml schema)
222 if isArchive and organize:
223 # set the same db_key for each file inside the archive
224 # use the db_key associated to the archive (if it exists)
178 db_key = "?" 225 db_key = "?"
179 archive_metadata = query_item.get( 'metadata', None ) 226 archive_metadata = query_item.get( 'metadata', None )
180 if archive_metadata is not None: 227 if archive_metadata is not None:
181 try: 228 try:
182 db_key = archive_metadata.get( 'db_key' ) 229 db_key = archive_metadata.get( 'db_key' )
183 except: 230 except:
184 pass 231 pass
185 walk_on_archive(target_output_path, check_ext, filename, appdata_path, db_key) 232 archive_name = query_item.get( 'name', None )
233 if archive_name is None:
234 archive_name = filename
235 # iterate over the archive content
236 walk_on_archive(target_output_path, check_ext, archive_library, archive_name, appdata_path, db_key)
186 237
187 return True 238 return True
188 239
189 240
190 def set_up_config_values(json_params): 241 def set_up_config_values(json_params):
212 if not os.path.exists(appdata_path): 263 if not os.path.exists(appdata_path):
213 os.makedirs(appdata_path) 264 os.makedirs(appdata_path)
214 265
215 # read tool job configuration file and parse parameters we need 266 # read tool job configuration file and parse parameters we need
216 json_params = json.loads( open( options.json_param_file, 'r' ).read() ) 267 json_params = json.loads( open( options.json_param_file, 'r' ).read() )
217 print("json_params: "+str(json_params)) 268 #print("json_params: "+str(json_params))
218 269
219 dataset_url, output_filename, \ 270 dataset_url, output_filename, \
220 extra_files_path, file_name, \ 271 extra_files_path, file_name, \
221 ext, out_data_name, \ 272 ext, out_data_name, \
222 hda_id, dataset_id = set_up_config_values(json_params) 273 hda_id, dataset_id = set_up_config_values(json_params)
248 299
249 Schema 300 Schema
250 ------ 301 ------
251 302
252 [ {"url":"http://url_of_file", 303 [ {"url":"http://url_of_file",
253 "name":"encode WigData", 304 "name":"My Archive",
254 "extension":"wig", 305 "extension":"tar.gz",
255 "metadata":{"db_key":"hg19"}, 306 "organize":"true",
307 "metadata":{"db_key":"hg38"},
256 "extra_data":[ {"url":"http://url_of_ext_file", 308 "extra_data":[ {"url":"http://url_of_ext_file",
257 "path":"rel/path/to/ext_file"} 309 "path":"rel/path/to/ext_file"}
258 ] 310 ]
259 } 311 }
260 ] 312 ]
261 313
262 """ 314 """
263 # Parse the command line options 315 # Parse the command line options
264 usage = "Usage: json_data_source_mod.py max_size --json_param_file filename [options]" 316 usage = "Usage: json_collect_data_source.py max_size --json_param_file filename [options]"
265 parser = optparse.OptionParser(usage = usage) 317 parser = optparse.OptionParser(usage = usage)
266 parser.add_option("-j", "--json_param_file", type="string", 318 parser.add_option("-j", "--json_param_file", type="string",
267 action="store", dest="json_param_file", help="json schema return data") 319 action="store", dest="json_param_file", help="json schema return data")
268 parser.add_option("-p", "--path", type="string", 320 parser.add_option("-p", "--path", type="string",
269 action="store", dest="path", help="new file path") 321 action="store", dest="path", help="new file path")
322 # set appdata: temporary directory in which the archives will be decompressed
270 parser.add_option("-a", "--appdata", type="string", 323 parser.add_option("-a", "--appdata", type="string",
271 action="store", dest="appdata", help="appdata folder name") 324 action="store", dest="appdata", help="appdata folder name")
272 parser.add_option("-v", "--version", action="store_true", dest="version", 325 parser.add_option("-v", "--version", action="store_true", dest="version",
273 default=False, help="display version and exit") 326 default=False, help="display version and exit")
274 327