Mercurial > repos > fabio > gdcwebapp
comparison json_data_source_mod.py @ 12:80593f75d74a draft
Uploaded
author | fabio |
---|---|
date | Tue, 30 May 2017 12:26:32 -0400 |
parents | c0be9583df97 |
children | babc444d4bd0 |
comparison
equal
deleted
inserted
replaced
11:9d24947d4335 | 12:80593f75d74a |
---|---|
55 output_stream = open( target_output_filename, 'wb' ) | 55 output_stream = open( target_output_filename, 'wb' ) |
56 chunk_write( query_stream, output_stream ) | 56 chunk_write( query_stream, output_stream ) |
57 query_stream.close() | 57 query_stream.close() |
58 output_stream.close() | 58 output_stream.close() |
59 | 59 |
60 def store_file_from_archive( file_object, target_output_filename ): | 60 def store_file_from_archive( file_object, target_output_filename, isString=False ): |
61 """ Store file after extracting from archive and organize them as a collection using the structure | 61 """ Store file after extracting from archive and organize them as a collection using the structure |
62 (collection-name)_(file-name).ext as file name | 62 (collection-name)_(file-name).ext as file name |
63 """ | 63 """ |
64 output_stream = open( target_output_filename, 'wb' ) | 64 output_stream = open( target_output_filename, 'wb' ) |
65 chunk_write( file_object.read(), output_stream ) | 65 #chunk_write( file_object.read(), output_stream ) |
66 if not isString: | |
67 output_stream.write(file_object.read()) | |
68 else: | |
69 output_stream.write(file_object) | |
66 output_stream.close() | 70 output_stream.close() |
67 | 71 |
68 | 72 |
69 def download_extra_data( query_ext_data, base_path ): | 73 def download_extra_data( query_ext_data, base_path ): |
70 """ Download any extra data defined in the JSON. | 74 """ Download any extra data defined in the JSON. |
81 os.makedirs( os.path.normpath( '/'.join( [ base_path, os.path.dirname( ext_path ) ] ) ) ) | 85 os.makedirs( os.path.normpath( '/'.join( [ base_path, os.path.dirname( ext_path ) ] ) ) ) |
82 output_stream = open( os.path.normpath( '/'.join( [ base_path, ext_path ] ) ), 'wb' ) | 86 output_stream = open( os.path.normpath( '/'.join( [ base_path, ext_path ] ) ), 'wb' ) |
83 chunk_write( query_stream, output_stream ) | 87 chunk_write( query_stream, output_stream ) |
84 query_stream.close() | 88 query_stream.close() |
85 output_stream.close() | 89 output_stream.close() |
86 | |
87 | |
88 def metadata_to_json_for_archive_entry( dataset_id, extension, metaname, filename, ds_type='dataset', primary=False ): | |
89 """ Return line separated JSON """ | |
90 meta_dict = dict( type = ds_type, | |
91 ext = extension, | |
92 filename = filename, | |
93 name = metaname, | |
94 metadata = {} ) | |
95 if primary: | |
96 meta_dict[ 'base_dataset_id' ] = dataset_id | |
97 else: | |
98 meta_dict[ 'dataset_id' ] = dataset_id | |
99 return "%s\n" % json.dumps( meta_dict ) | |
100 | 90 |
101 | 91 |
102 def metadata_to_json( dataset_id, metadata, filename, ds_type='dataset', primary=False): | 92 def metadata_to_json( dataset_id, metadata, filename, ds_type='dataset', primary=False): |
103 """ Return line separated JSON """ | 93 """ Return line separated JSON """ |
104 meta_dict = dict( type = ds_type, | 94 meta_dict = dict( type = ds_type, |
113 else: | 103 else: |
114 meta_dict[ 'dataset_id' ] = dataset_id | 104 meta_dict[ 'dataset_id' ] = dataset_id |
115 return "%s\n" % json.dumps( meta_dict ) | 105 return "%s\n" % json.dumps( meta_dict ) |
116 | 106 |
117 | 107 |
118 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary, appdata_path): | 108 def walk_on_archive(target_output_filename, check_ext, archive_name, appdata_path): |
109 archive_name = archive_name.replace("_", "-").replace(".", "-") | |
110 with tarfile.open( target_output_filename, check_ext ) as tf: | |
111 for entry in tf: | |
112 if entry.isfile(): | |
113 fileobj = tf.extractfile( entry ) | |
114 # reserve the underscore for the collection searator | |
115 filename = os.path.basename( entry.name ).replace("_", "-") | |
116 extension = splitext( filename )[1] | |
117 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+) | |
118 if (len(extension) > 0): | |
119 filename = (filename[0:len(filename)-(len(extension)+1)]).replace(".", "-") + "." + extension | |
120 else: | |
121 extension = "auto" | |
122 filename_with_collection_prefix = archive_name + "_" + filename | |
123 target_entry_output_filename = os.path.join(appdata_path, filename_with_collection_prefix) | |
124 store_file_from_archive( fileobj, target_entry_output_filename ) | |
125 return True | |
126 | |
127 | |
128 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary, appdata_path, options, args): | |
119 """ Main work function that operates on the JSON representation of | 129 """ Main work function that operates on the JSON representation of |
120 one dataset and its metadata. Returns True. | 130 one dataset and its metadata. Returns True. |
121 """ | 131 """ |
122 dataset_url, output_filename, \ | 132 dataset_url, output_filename, \ |
123 extra_files_path, file_name, \ | 133 extra_files_path, file_name, \ |
124 ext, out_data_name, \ | 134 ext, out_data_name, \ |
125 hda_id, dataset_id = set_up_config_values(json_params) | 135 hda_id, dataset_id = set_up_config_values(json_params) |
126 extension = query_item.get( 'extension' ) | 136 extension = query_item.get( 'extension' ) |
127 filename = query_item.get( 'url' ) | 137 #filename = query_item.get( 'url' ) |
138 filename = query_item.get( 'name' ) | |
139 | |
140 check_ext = "" | |
141 if ( filename.endswith( "gz" ) ): | |
142 check_ext = "r:gz" | |
143 elif ( filename.endswith( "bz2" ) ): | |
144 check_ext = "r:bz2" | |
145 elif ( filename.endswith( "tar" ) ): | |
146 check_ext = "r:" | |
147 isArchive = bool( check_ext and check_ext.strip() ) | |
148 | |
128 extra_data = query_item.get( 'extra_data', None ) | 149 extra_data = query_item.get( 'extra_data', None ) |
129 if primary: | 150 if primary: |
130 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename ) | 151 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename ) |
131 name = construct_multi_filename( hda_id, filename, extension ) | 152 name = construct_multi_filename( hda_id, filename, extension ) |
132 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) ) | 153 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) ) |
133 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, | 154 if isArchive is False: |
134 target_output_filename, | 155 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, |
135 ds_type='new_primary_dataset', | 156 target_output_filename, |
136 primary=primary) ) | 157 ds_type='new_primary_dataset', |
158 primary=primary) ) | |
137 else: | 159 else: |
138 target_output_filename = output_filename | 160 target_output_filename = output_filename |
139 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, | 161 if isArchive is False: |
140 target_output_filename, | 162 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, |
141 ds_type='dataset', | 163 target_output_filename, |
142 primary=primary) ) | 164 ds_type='dataset', |
165 primary=primary) ) | |
166 | |
143 download_from_query( query_item, target_output_filename ) | 167 download_from_query( query_item, target_output_filename ) |
144 if extra_data: | 168 if extra_data: |
145 extra_files_path = ''.join( [ target_output_filename, 'files' ] ) | 169 extra_files_path = ''.join( [ target_output_filename, 'files' ] ) |
146 download_extra_data( extra_data, extra_files_path ) | 170 download_extra_data( extra_data, extra_files_path ) |
147 | 171 |
148 """ the following code handles archives and decompress them in a collection """ | 172 """ the following code handles archives and decompress them in a collection """ |
149 check_ext = "" | 173 if ( isArchive ): |
150 if ( fname.endswith( "gz" ) ): | 174 walk_on_archive(target_output_filename, check_ext, query_item.get( 'name' ), appdata_path) |
151 check_ext = "r:gz" | 175 |
152 elif ( fname.endswith( "bz2" ) ): | |
153 check_ext = "r:bz2" | |
154 elif ( fname.endswith( "tar" ) ): | |
155 check_ext = "r:" | |
156 if ( bool( check_ext and check_ext.strip() ) ): | |
157 with tarfile.open( target_output_filename, check_ext ) as tf: | |
158 for entry in tf: | |
159 fileobj = tf.extractfile( entry ) | |
160 if entry.isfile(): | |
161 | |
162 #dataset_url, output_filename, \ | |
163 # extra_files_path, file_name, \ | |
164 # ext, out_data_name, \ | |
165 # hda_id, dataset_id = set_up_config_values(json_params) | |
166 | |
167 filename = os.path.basename( entry.name ) | |
168 extension = splitext( filename ) | |
169 extra_data = None | |
170 #target_output_filename = output_filename | |
171 # (?P<archive_name>.*)_(?P<file_name>.*)\..* | |
172 filename_with_collection_prefix = query_item.get( 'name' ) + "_" + filename | |
173 target_output_filename = os.path.join(appdata_path, filename_with_collection_prefix) | |
174 | |
175 #metadata_parameter_file.write( metadata_to_json_for_archive_entry( dataset_id, extension, | |
176 # filename, target_output_filename, | |
177 # ds_type='dataset', | |
178 # primary=primary) ) | |
179 | |
180 store_file_from_archive( fileobj, target_output_filename ) | |
181 | |
182 return True | 176 return True |
183 | 177 |
184 | |
185 def set_up_config_values(): | |
186 extra_files_path, file_name, ext, out_data_name, hda_id, dataset_id = \ | |
187 itemgetter('extra_files_path', 'file_name', 'ext', 'out_data_name', 'hda_id', 'dataset_id')(output_data[0]) | |
188 | 178 |
189 def set_up_config_values(json_params): | 179 def set_up_config_values(json_params): |
190 """ Parse json_params file and return a tuple of necessary configuration | 180 """ Parse json_params file and return a tuple of necessary configuration |
191 values. | 181 values. |
192 """ | 182 """ |
200 extra_files_path, file_name, | 190 extra_files_path, file_name, |
201 ext, out_data_name, | 191 ext, out_data_name, |
202 hda_id, dataset_id) | 192 hda_id, dataset_id) |
203 | 193 |
204 | 194 |
205 def download_from_json_data( options, args ): | 195 def download_from_json_data( options, args, json_params=None, json_dataset_url=None ): |
206 """ Parse the returned JSON data and download files. Write metadata | 196 """ Parse the returned JSON data and download files. Write metadata |
207 to flat JSON file. | 197 to flat JSON file. |
208 """ | 198 """ |
209 output_base_path = options.path | 199 output_base_path = options.path |
210 appdata_path = options.appdata | 200 appdata_path = options.appdata |
211 if not os.path.exists(appdata_path): | 201 if not os.path.exists(appdata_path): |
212 os.makedirs(appdata_path) | 202 os.makedirs(appdata_path) |
213 | 203 |
214 # read tool job configuration file and parse parameters we need | 204 # read tool job configuration file and parse parameters we need |
215 json_params = json.loads( open( options.json_param_file, 'r' ).read() ) | 205 if json_params is None: |
206 json_params = json.loads( open( options.json_param_file, 'r' ).read() ) | |
207 | |
216 dataset_url, output_filename, \ | 208 dataset_url, output_filename, \ |
217 extra_files_path, file_name, \ | 209 extra_files_path, file_name, \ |
218 ext, out_data_name, \ | 210 ext, out_data_name, \ |
219 hda_id, dataset_id = set_up_config_values(json_params) | 211 hda_id, dataset_id = set_up_config_values(json_params) |
220 # line separated JSON file to contain all dataset metadata | 212 # line separated JSON file to contain all dataset metadata |
221 metadata_parameter_file = open( json_params['job_config']['TOOL_PROVIDED_JOB_METADATA_FILE'], 'wb' ) | 213 metadata_parameter_file = open( json_params['job_config']['TOOL_PROVIDED_JOB_METADATA_FILE'], 'wb' ) |
222 | 214 |
223 # get JSON response from data source | 215 # get JSON response from data source |
224 # TODO: make sure response is not enormous | 216 # TODO: make sure response is not enormous |
225 query_params = json.loads(urllib.urlopen( dataset_url ).read()) | 217 if json_dataset_url is None: |
218 query_params = json.loads(urllib.urlopen( dataset_url ).read()) | |
219 else: | |
220 query_params = json.loads(urllib.urlopen( json_dataset_url ).read()) | |
226 # download and write files | 221 # download and write files |
227 primary = False | 222 primary = False |
228 # query_item, hda_id, output_base_path, dataset_id | 223 # query_item, hda_id, output_base_path, dataset_id |
229 for query_item in query_params: | 224 for query_item in query_params: |
230 if isinstance( query_item, list ): | 225 if isinstance( query_item, list ): |
231 # TODO: do something with the nested list as a collection | 226 # TODO: do something with the nested list as a collection |
232 for query_subitem in query_item: | 227 for query_subitem in query_item: |
233 primary = download_files_and_write_metadata(query_subitem, json_params, output_base_path, | 228 primary = download_files_and_write_metadata(query_subitem, json_params, output_base_path, |
234 metadata_parameter_file, primary, appdata_path) | 229 metadata_parameter_file, primary, appdata_path, options, args) |
235 | 230 |
236 elif isinstance( query_item, dict ): | 231 elif isinstance( query_item, dict ): |
237 primary = download_files_and_write_metadata(query_item, json_params, output_base_path, | 232 primary = download_files_and_write_metadata(query_item, json_params, output_base_path, |
238 metadata_parameter_file, primary, appdata_path) | 233 metadata_parameter_file, primary, appdata_path, options, args) |
239 metadata_parameter_file.close() | 234 metadata_parameter_file.close() |
240 | 235 |
241 def __main__(): | 236 def __main__(): |
242 """ Read the JSON return from a data source. Parse each line and request | 237 """ Read the JSON return from a data source. Parse each line and request |
243 the data, download to "newfilepath", and write metadata. | 238 the data, download to "newfilepath", and write metadata. |