comparison json_data_source_mod.py @ 12:80593f75d74a draft

Uploaded
author fabio
date Tue, 30 May 2017 12:26:32 -0400
parents c0be9583df97
children babc444d4bd0
comparison
equal deleted inserted replaced
11:9d24947d4335 12:80593f75d74a
55 output_stream = open( target_output_filename, 'wb' ) 55 output_stream = open( target_output_filename, 'wb' )
56 chunk_write( query_stream, output_stream ) 56 chunk_write( query_stream, output_stream )
57 query_stream.close() 57 query_stream.close()
58 output_stream.close() 58 output_stream.close()
59 59
60 def store_file_from_archive( file_object, target_output_filename ): 60 def store_file_from_archive( file_object, target_output_filename, isString=False ):
61 """ Store file after extracting from archive and organize them as a collection using the structure 61 """ Store file after extracting from archive and organize them as a collection using the structure
62 (collection-name)_(file-name).ext as file name 62 (collection-name)_(file-name).ext as file name
63 """ 63 """
64 output_stream = open( target_output_filename, 'wb' ) 64 output_stream = open( target_output_filename, 'wb' )
65 chunk_write( file_object.read(), output_stream ) 65 #chunk_write( file_object.read(), output_stream )
66 if not isString:
67 output_stream.write(file_object.read())
68 else:
69 output_stream.write(file_object)
66 output_stream.close() 70 output_stream.close()
67 71
68 72
69 def download_extra_data( query_ext_data, base_path ): 73 def download_extra_data( query_ext_data, base_path ):
70 """ Download any extra data defined in the JSON. 74 """ Download any extra data defined in the JSON.
81 os.makedirs( os.path.normpath( '/'.join( [ base_path, os.path.dirname( ext_path ) ] ) ) ) 85 os.makedirs( os.path.normpath( '/'.join( [ base_path, os.path.dirname( ext_path ) ] ) ) )
82 output_stream = open( os.path.normpath( '/'.join( [ base_path, ext_path ] ) ), 'wb' ) 86 output_stream = open( os.path.normpath( '/'.join( [ base_path, ext_path ] ) ), 'wb' )
83 chunk_write( query_stream, output_stream ) 87 chunk_write( query_stream, output_stream )
84 query_stream.close() 88 query_stream.close()
85 output_stream.close() 89 output_stream.close()
86
87
88 def metadata_to_json_for_archive_entry( dataset_id, extension, metaname, filename, ds_type='dataset', primary=False ):
89 """ Return line separated JSON """
90 meta_dict = dict( type = ds_type,
91 ext = extension,
92 filename = filename,
93 name = metaname,
94 metadata = {} )
95 if primary:
96 meta_dict[ 'base_dataset_id' ] = dataset_id
97 else:
98 meta_dict[ 'dataset_id' ] = dataset_id
99 return "%s\n" % json.dumps( meta_dict )
100 90
101 91
102 def metadata_to_json( dataset_id, metadata, filename, ds_type='dataset', primary=False): 92 def metadata_to_json( dataset_id, metadata, filename, ds_type='dataset', primary=False):
103 """ Return line separated JSON """ 93 """ Return line separated JSON """
104 meta_dict = dict( type = ds_type, 94 meta_dict = dict( type = ds_type,
113 else: 103 else:
114 meta_dict[ 'dataset_id' ] = dataset_id 104 meta_dict[ 'dataset_id' ] = dataset_id
115 return "%s\n" % json.dumps( meta_dict ) 105 return "%s\n" % json.dumps( meta_dict )
116 106
117 107
118 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary, appdata_path): 108 def walk_on_archive(target_output_filename, check_ext, archive_name, appdata_path):
109 archive_name = archive_name.replace("_", "-").replace(".", "-")
110 with tarfile.open( target_output_filename, check_ext ) as tf:
111 for entry in tf:
112 if entry.isfile():
113 fileobj = tf.extractfile( entry )
114 # reserve the underscore for the collection searator
115 filename = os.path.basename( entry.name ).replace("_", "-")
116 extension = splitext( filename )[1]
117 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+)
118 if (len(extension) > 0):
119 filename = (filename[0:len(filename)-(len(extension)+1)]).replace(".", "-") + "." + extension
120 else:
121 extension = "auto"
122 filename_with_collection_prefix = archive_name + "_" + filename
123 target_entry_output_filename = os.path.join(appdata_path, filename_with_collection_prefix)
124 store_file_from_archive( fileobj, target_entry_output_filename )
125 return True
126
127
128 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary, appdata_path, options, args):
119 """ Main work function that operates on the JSON representation of 129 """ Main work function that operates on the JSON representation of
120 one dataset and its metadata. Returns True. 130 one dataset and its metadata. Returns True.
121 """ 131 """
122 dataset_url, output_filename, \ 132 dataset_url, output_filename, \
123 extra_files_path, file_name, \ 133 extra_files_path, file_name, \
124 ext, out_data_name, \ 134 ext, out_data_name, \
125 hda_id, dataset_id = set_up_config_values(json_params) 135 hda_id, dataset_id = set_up_config_values(json_params)
126 extension = query_item.get( 'extension' ) 136 extension = query_item.get( 'extension' )
127 filename = query_item.get( 'url' ) 137 #filename = query_item.get( 'url' )
138 filename = query_item.get( 'name' )
139
140 check_ext = ""
141 if ( filename.endswith( "gz" ) ):
142 check_ext = "r:gz"
143 elif ( filename.endswith( "bz2" ) ):
144 check_ext = "r:bz2"
145 elif ( filename.endswith( "tar" ) ):
146 check_ext = "r:"
147 isArchive = bool( check_ext and check_ext.strip() )
148
128 extra_data = query_item.get( 'extra_data', None ) 149 extra_data = query_item.get( 'extra_data', None )
129 if primary: 150 if primary:
130 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename ) 151 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename )
131 name = construct_multi_filename( hda_id, filename, extension ) 152 name = construct_multi_filename( hda_id, filename, extension )
132 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) ) 153 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) )
133 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, 154 if isArchive is False:
134 target_output_filename, 155 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item,
135 ds_type='new_primary_dataset', 156 target_output_filename,
136 primary=primary) ) 157 ds_type='new_primary_dataset',
158 primary=primary) )
137 else: 159 else:
138 target_output_filename = output_filename 160 target_output_filename = output_filename
139 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, 161 if isArchive is False:
140 target_output_filename, 162 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item,
141 ds_type='dataset', 163 target_output_filename,
142 primary=primary) ) 164 ds_type='dataset',
165 primary=primary) )
166
143 download_from_query( query_item, target_output_filename ) 167 download_from_query( query_item, target_output_filename )
144 if extra_data: 168 if extra_data:
145 extra_files_path = ''.join( [ target_output_filename, 'files' ] ) 169 extra_files_path = ''.join( [ target_output_filename, 'files' ] )
146 download_extra_data( extra_data, extra_files_path ) 170 download_extra_data( extra_data, extra_files_path )
147 171
148 """ the following code handles archives and decompress them in a collection """ 172 """ the following code handles archives and decompress them in a collection """
149 check_ext = "" 173 if ( isArchive ):
150 if ( fname.endswith( "gz" ) ): 174 walk_on_archive(target_output_filename, check_ext, query_item.get( 'name' ), appdata_path)
151 check_ext = "r:gz" 175
152 elif ( fname.endswith( "bz2" ) ):
153 check_ext = "r:bz2"
154 elif ( fname.endswith( "tar" ) ):
155 check_ext = "r:"
156 if ( bool( check_ext and check_ext.strip() ) ):
157 with tarfile.open( target_output_filename, check_ext ) as tf:
158 for entry in tf:
159 fileobj = tf.extractfile( entry )
160 if entry.isfile():
161
162 #dataset_url, output_filename, \
163 # extra_files_path, file_name, \
164 # ext, out_data_name, \
165 # hda_id, dataset_id = set_up_config_values(json_params)
166
167 filename = os.path.basename( entry.name )
168 extension = splitext( filename )
169 extra_data = None
170 #target_output_filename = output_filename
171 # (?P<archive_name>.*)_(?P<file_name>.*)\..*
172 filename_with_collection_prefix = query_item.get( 'name' ) + "_" + filename
173 target_output_filename = os.path.join(appdata_path, filename_with_collection_prefix)
174
175 #metadata_parameter_file.write( metadata_to_json_for_archive_entry( dataset_id, extension,
176 # filename, target_output_filename,
177 # ds_type='dataset',
178 # primary=primary) )
179
180 store_file_from_archive( fileobj, target_output_filename )
181
182 return True 176 return True
183 177
184
185 def set_up_config_values():
186 extra_files_path, file_name, ext, out_data_name, hda_id, dataset_id = \
187 itemgetter('extra_files_path', 'file_name', 'ext', 'out_data_name', 'hda_id', 'dataset_id')(output_data[0])
188 178
189 def set_up_config_values(json_params): 179 def set_up_config_values(json_params):
190 """ Parse json_params file and return a tuple of necessary configuration 180 """ Parse json_params file and return a tuple of necessary configuration
191 values. 181 values.
192 """ 182 """
200 extra_files_path, file_name, 190 extra_files_path, file_name,
201 ext, out_data_name, 191 ext, out_data_name,
202 hda_id, dataset_id) 192 hda_id, dataset_id)
203 193
204 194
205 def download_from_json_data( options, args ): 195 def download_from_json_data( options, args, json_params=None, json_dataset_url=None ):
206 """ Parse the returned JSON data and download files. Write metadata 196 """ Parse the returned JSON data and download files. Write metadata
207 to flat JSON file. 197 to flat JSON file.
208 """ 198 """
209 output_base_path = options.path 199 output_base_path = options.path
210 appdata_path = options.appdata 200 appdata_path = options.appdata
211 if not os.path.exists(appdata_path): 201 if not os.path.exists(appdata_path):
212 os.makedirs(appdata_path) 202 os.makedirs(appdata_path)
213 203
214 # read tool job configuration file and parse parameters we need 204 # read tool job configuration file and parse parameters we need
215 json_params = json.loads( open( options.json_param_file, 'r' ).read() ) 205 if json_params is None:
206 json_params = json.loads( open( options.json_param_file, 'r' ).read() )
207
216 dataset_url, output_filename, \ 208 dataset_url, output_filename, \
217 extra_files_path, file_name, \ 209 extra_files_path, file_name, \
218 ext, out_data_name, \ 210 ext, out_data_name, \
219 hda_id, dataset_id = set_up_config_values(json_params) 211 hda_id, dataset_id = set_up_config_values(json_params)
220 # line separated JSON file to contain all dataset metadata 212 # line separated JSON file to contain all dataset metadata
221 metadata_parameter_file = open( json_params['job_config']['TOOL_PROVIDED_JOB_METADATA_FILE'], 'wb' ) 213 metadata_parameter_file = open( json_params['job_config']['TOOL_PROVIDED_JOB_METADATA_FILE'], 'wb' )
222 214
223 # get JSON response from data source 215 # get JSON response from data source
224 # TODO: make sure response is not enormous 216 # TODO: make sure response is not enormous
225 query_params = json.loads(urllib.urlopen( dataset_url ).read()) 217 if json_dataset_url is None:
218 query_params = json.loads(urllib.urlopen( dataset_url ).read())
219 else:
220 query_params = json.loads(urllib.urlopen( json_dataset_url ).read())
226 # download and write files 221 # download and write files
227 primary = False 222 primary = False
228 # query_item, hda_id, output_base_path, dataset_id 223 # query_item, hda_id, output_base_path, dataset_id
229 for query_item in query_params: 224 for query_item in query_params:
230 if isinstance( query_item, list ): 225 if isinstance( query_item, list ):
231 # TODO: do something with the nested list as a collection 226 # TODO: do something with the nested list as a collection
232 for query_subitem in query_item: 227 for query_subitem in query_item:
233 primary = download_files_and_write_metadata(query_subitem, json_params, output_base_path, 228 primary = download_files_and_write_metadata(query_subitem, json_params, output_base_path,
234 metadata_parameter_file, primary, appdata_path) 229 metadata_parameter_file, primary, appdata_path, options, args)
235 230
236 elif isinstance( query_item, dict ): 231 elif isinstance( query_item, dict ):
237 primary = download_files_and_write_metadata(query_item, json_params, output_base_path, 232 primary = download_files_and_write_metadata(query_item, json_params, output_base_path,
238 metadata_parameter_file, primary, appdata_path) 233 metadata_parameter_file, primary, appdata_path, options, args)
239 metadata_parameter_file.close() 234 metadata_parameter_file.close()
240 235
241 def __main__(): 236 def __main__():
242 """ Read the JSON return from a data source. Parse each line and request 237 """ Read the JSON return from a data source. Parse each line and request
243 the data, download to "newfilepath", and write metadata. 238 the data, download to "newfilepath", and write metadata.