Mercurial > repos > fabio > gdcwebapp
comparison json_collect_data_source.py @ 34:d65de900967e draft
Uploaded 20170613
author | fabio |
---|---|
date | Tue, 13 Jun 2017 16:39:40 -0400 |
parents | 1edc869cd008 |
children |
comparison
equal
deleted
inserted
replaced
33:228038cd0683 | 34:d65de900967e |
---|---|
4 import urllib | 4 import urllib |
5 import os.path | 5 import os.path |
6 import os | 6 import os |
7 from operator import itemgetter | 7 from operator import itemgetter |
8 import tarfile | 8 import tarfile |
9 import zipfile | |
9 | 10 |
10 __version__ = "1.0.0" | 11 __version__ = "1.0.0" |
11 CHUNK_SIZE = 2**20 #1mb | 12 CHUNK_SIZE = 2**20 #1mb |
12 VALID_CHARS = '.-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ ' | 13 VALID_CHARS = '.-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ ' |
13 | 14 |
14 | 15 |
15 def splitext(path): | 16 def splitext(path): |
17 # extract the folder path and extension of a file from its path | |
16 for ext in ['.tar.gz', '.tar.bz2']: | 18 for ext in ['.tar.gz', '.tar.bz2']: |
17 if path.endswith(ext): | 19 if path.endswith(ext): |
18 path, ext = path[:-len(ext)], path[-len(ext):] | 20 path, ext = path[:-len(ext)], path[-len(ext):] |
19 break | 21 break |
20 else: | 22 else: |
55 output_stream = open( target_output_filename, 'wb' ) | 57 output_stream = open( target_output_filename, 'wb' ) |
56 chunk_write( query_stream, output_stream ) | 58 chunk_write( query_stream, output_stream ) |
57 query_stream.close() | 59 query_stream.close() |
58 output_stream.close() | 60 output_stream.close() |
59 | 61 |
60 def store_file_from_archive( file_object, target_output_filename, isString=False ): | 62 def store_file_from_tarfile( file_object, target_output_filename, isString=False ): |
61 """ Store file after extracting from archive and organize them as a collection using the structure | 63 # store the file_object (from tarfile) on the filesystem |
62 (collection-name)_(file-name).ext as file name | |
63 """ | |
64 output_stream = open( target_output_filename, 'wb' ) | 64 output_stream = open( target_output_filename, 'wb' ) |
65 #chunk_write( file_object.read(), output_stream ) | 65 output_stream.write(file_object.read()) |
66 if not isString: | |
67 output_stream.write(file_object.read()) | |
68 else: | |
69 output_stream.write(file_object) | |
70 output_stream.close() | 66 output_stream.close() |
71 | 67 |
72 | 68 |
73 def download_extra_data( query_ext_data, base_path ): | 69 def download_extra_data( query_ext_data, base_path ): |
74 """ Download any extra data defined in the JSON. | 70 """ Download any extra data defined in the JSON. |
103 else: | 99 else: |
104 meta_dict[ 'dataset_id' ] = dataset_id | 100 meta_dict[ 'dataset_id' ] = dataset_id |
105 return "%s\n" % json.dumps( meta_dict ) | 101 return "%s\n" % json.dumps( meta_dict ) |
106 | 102 |
107 | 103 |
108 def walk_on_archive(target_output_filename, check_ext, archive_name, appdata_path, db_key="?"): | 104 def walk_on_archive(target_output_filename, check_ext, archive_library, archive_name, appdata_path, db_key="?"): |
105 # fix archive name using valid chars only | |
106 archive_name = ''.join(e for e in archive_name if e in VALID_CHARS) | |
109 archive_name = archive_name.replace("_", "-").replace(".", "-") | 107 archive_name = archive_name.replace("_", "-").replace(".", "-") |
110 with tarfile.open( target_output_filename, check_ext ) as tf: | 108 if archive_library is "zipfile": |
111 for entry in tf: | 109 # iterate over entries inside the archive [zip] |
112 if entry.isfile(): | 110 with zipfile.ZipFile( target_output_filename, check_ext ) as zf: |
113 fileobj = tf.extractfile( entry ) | 111 for entry in zf.namelist(): |
114 # reserve the underscore for the collection searator | 112 # if entry is file |
115 filename = os.path.basename( entry.name ).replace("_", "-") | 113 if entry.startswith("%s/" % entry.rstrip("/")) is False: |
116 extension = splitext( filename )[1] | 114 # retrieve file name |
117 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+)_(?P<ext>[^_]+)_(?P<dbkey>[^_]+) | 115 # the underscore character is reserved |
118 if (len(extension) > 0): | 116 filename = os.path.basename( entry.split("/")[-1] ).replace("_", "-") |
119 filename = (filename[0:len(filename)-(len(extension)+1)]).replace(".", "-") + "." + extension + "_" + extension | 117 # retrieve file extension |
120 else: | 118 extension = splitext( filename )[1] |
121 extension = "auto" | 119 # if no extension use 'auto' |
122 filename_with_collection_prefix = archive_name + "_" + filename + "_" + db_key | 120 if (len(extension) == 0): |
123 target_entry_output_filename = os.path.join(appdata_path, filename_with_collection_prefix) | 121 extension = "auto" |
124 store_file_from_archive( fileobj, target_entry_output_filename ) | 122 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+)_(?P<ext>[^_]+)_(?P<dbkey>[^_]+) |
123 filename_with_collection_prefix = archive_name + "_" + filename + "_" + extension + "_" + db_key | |
124 # store current entry on filesystem | |
125 zf.extract( filename_with_collection_prefix, appdata_path ) | |
126 elif archive_library is "tarfile": | |
127 # iterate over entries inside the archive [gz, bz2, tar] | |
128 with tarfile.open( target_output_filename, check_ext ) as tf: | |
129 for entry in tf: | |
130 if entry.isfile(): | |
131 fileobj = tf.extractfile( entry ) | |
132 # retrieve file name | |
133 # the underscore character is reserved | |
134 filename = os.path.basename( (entry.name).split("/")[-1] ).replace("_", "-") | |
135 # retrieve file extension | |
136 extension = splitext( filename )[1] | |
137 # if no extension use 'auto' | |
138 if (len(extension) == 0): | |
139 extension = "auto" | |
140 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+)_(?P<ext>[^_]+)_(?P<dbkey>[^_]+) | |
141 filename_with_collection_prefix = archive_name + "_" + filename + "_" + extension + "_" + db_key | |
142 target_entry_output_filename = os.path.join(appdata_path, filename_with_collection_prefix) | |
143 # store current entry on filesystem | |
144 store_file_from_tarfile( fileobj, target_entry_output_filename ) | |
125 return True | 145 return True |
126 | 146 |
127 | 147 |
128 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary, appdata_path, options, args): | 148 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary, appdata_path, options, args): |
129 """ Main work function that operates on the JSON representation of | 149 """ Main work function that operates on the JSON representation of |
135 hda_id, dataset_id = set_up_config_values(json_params) | 155 hda_id, dataset_id = set_up_config_values(json_params) |
136 extension = query_item.get( 'extension' ) | 156 extension = query_item.get( 'extension' ) |
137 url = query_item.get( 'url' ) | 157 url = query_item.get( 'url' ) |
138 filename = query_item.get( 'name' ) | 158 filename = query_item.get( 'name' ) |
139 | 159 |
160 # the organize parameter is considered for archives only | |
161 organize = query_item.get( 'organize', None ) | |
162 if organize is None: | |
163 organize = False | |
164 else: | |
165 if organize.lower() == "true": | |
166 organize = True | |
167 elif organize.lower() == "false": | |
168 organize = False | |
169 else: | |
170 # if organize parameter is malformed -> set organize to False | |
171 organize = False | |
172 | |
173 # check file extension | |
174 # if the file is an archive -> do not write metadata and extract files | |
140 check_ext = "" | 175 check_ext = "" |
176 archive_library = None | |
141 if ( url.endswith( "gz" ) ): | 177 if ( url.endswith( "gz" ) ): |
142 check_ext = "r:gz" | 178 check_ext = "r:gz" |
179 archive_library = "tarfile" | |
143 elif ( url.endswith( "bz2" ) ): | 180 elif ( url.endswith( "bz2" ) ): |
144 check_ext = "r:bz2" | 181 check_ext = "r:bz2" |
182 archive_library = "tarfile" | |
145 elif ( url.endswith( "tar" ) ): | 183 elif ( url.endswith( "tar" ) ): |
146 check_ext = "r:" | 184 check_ext = "r:" |
185 archive_library = "tarfile" | |
186 elif ( url.endswith( "zip" ) ): | |
187 check_ext = "r" | |
188 archive_library = "zipfile" | |
147 isArchive = bool( check_ext and check_ext.strip() ) | 189 isArchive = bool( check_ext and check_ext.strip() ) |
148 | 190 |
149 extra_data = query_item.get( 'extra_data', None ) | 191 extra_data = query_item.get( 'extra_data', None ) |
150 if primary: | 192 if primary: |
151 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename ) | 193 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename ) |
152 name = construct_multi_filename( hda_id, filename, extension ) | 194 name = construct_multi_filename( hda_id, filename, extension ) |
153 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) ) | 195 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) ) |
154 if isArchive is False: | 196 if (isArchive is False) or ((isArchive is True) and (organize is False)): |
155 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, | 197 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, |
156 target_output_filename, | 198 target_output_filename, |
157 ds_type='new_primary_dataset', | 199 ds_type='new_primary_dataset', |
158 primary=primary) ) | 200 primary=primary) ) |
159 else: | 201 else: |
160 target_output_filename = output_filename | 202 target_output_filename = output_filename |
161 if isArchive is False: | 203 if (isArchive is False) or ((isArchive is True) and (organize is False)): |
162 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, | 204 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, |
163 target_output_filename, | 205 target_output_filename, |
164 ds_type='dataset', | 206 ds_type='dataset', |
165 primary=primary) ) | 207 primary=primary) ) |
166 | 208 |
167 if isArchive is False: | 209 if (isArchive is False) or ((isArchive is True) and (organize is False)): |
168 download_from_query( query_item, target_output_filename ) | 210 download_from_query( query_item, target_output_filename ) |
169 else: | 211 else: |
212 # if the current entry is an archive download it inside appdata folder | |
170 target_output_path = os.path.join(appdata_path, filename) | 213 target_output_path = os.path.join(appdata_path, filename) |
171 download_from_query( query_item, target_output_path ) | 214 download_from_query( query_item, target_output_path ) |
172 if extra_data: | 215 if extra_data: |
216 # just download extra data | |
173 extra_files_path = ''.join( [ target_output_filename, 'files' ] ) | 217 extra_files_path = ''.join( [ target_output_filename, 'files' ] ) |
174 download_extra_data( extra_data, extra_files_path ) | 218 download_extra_data( extra_data, extra_files_path ) |
175 | 219 |
176 """ the following code handles archives and decompress them in a collection """ | 220 # if the current file is an archive and want to organize the content |
177 if ( isArchive ): | 221 # -> decompress the archive and populate the collection (has to be defined in the tool xml schema) |
222 if isArchive and organize: | |
223 # set the same db_key for each file inside the archive | |
224 # use the db_key associated to the archive (if it exists) | |
178 db_key = "?" | 225 db_key = "?" |
179 archive_metadata = query_item.get( 'metadata', None ) | 226 archive_metadata = query_item.get( 'metadata', None ) |
180 if archive_metadata is not None: | 227 if archive_metadata is not None: |
181 try: | 228 try: |
182 db_key = archive_metadata.get( 'db_key' ) | 229 db_key = archive_metadata.get( 'db_key' ) |
183 except: | 230 except: |
184 pass | 231 pass |
185 walk_on_archive(target_output_path, check_ext, filename, appdata_path, db_key) | 232 archive_name = query_item.get( 'name', None ) |
233 if archive_name is None: | |
234 archive_name = filename | |
235 # iterate over the archive content | |
236 walk_on_archive(target_output_path, check_ext, archive_library, archive_name, appdata_path, db_key) | |
186 | 237 |
187 return True | 238 return True |
188 | 239 |
189 | 240 |
190 def set_up_config_values(json_params): | 241 def set_up_config_values(json_params): |
212 if not os.path.exists(appdata_path): | 263 if not os.path.exists(appdata_path): |
213 os.makedirs(appdata_path) | 264 os.makedirs(appdata_path) |
214 | 265 |
215 # read tool job configuration file and parse parameters we need | 266 # read tool job configuration file and parse parameters we need |
216 json_params = json.loads( open( options.json_param_file, 'r' ).read() ) | 267 json_params = json.loads( open( options.json_param_file, 'r' ).read() ) |
217 print("json_params: "+str(json_params)) | 268 #print("json_params: "+str(json_params)) |
218 | 269 |
219 dataset_url, output_filename, \ | 270 dataset_url, output_filename, \ |
220 extra_files_path, file_name, \ | 271 extra_files_path, file_name, \ |
221 ext, out_data_name, \ | 272 ext, out_data_name, \ |
222 hda_id, dataset_id = set_up_config_values(json_params) | 273 hda_id, dataset_id = set_up_config_values(json_params) |
248 | 299 |
249 Schema | 300 Schema |
250 ------ | 301 ------ |
251 | 302 |
252 [ {"url":"http://url_of_file", | 303 [ {"url":"http://url_of_file", |
253 "name":"encode WigData", | 304 "name":"My Archive", |
254 "extension":"wig", | 305 "extension":"tar.gz", |
255 "metadata":{"db_key":"hg19"}, | 306 "organize":"true", |
307 "metadata":{"db_key":"hg38"}, | |
256 "extra_data":[ {"url":"http://url_of_ext_file", | 308 "extra_data":[ {"url":"http://url_of_ext_file", |
257 "path":"rel/path/to/ext_file"} | 309 "path":"rel/path/to/ext_file"} |
258 ] | 310 ] |
259 } | 311 } |
260 ] | 312 ] |
261 | 313 |
262 """ | 314 """ |
263 # Parse the command line options | 315 # Parse the command line options |
264 usage = "Usage: json_data_source_mod.py max_size --json_param_file filename [options]" | 316 usage = "Usage: json_collect_data_source.py max_size --json_param_file filename [options]" |
265 parser = optparse.OptionParser(usage = usage) | 317 parser = optparse.OptionParser(usage = usage) |
266 parser.add_option("-j", "--json_param_file", type="string", | 318 parser.add_option("-j", "--json_param_file", type="string", |
267 action="store", dest="json_param_file", help="json schema return data") | 319 action="store", dest="json_param_file", help="json schema return data") |
268 parser.add_option("-p", "--path", type="string", | 320 parser.add_option("-p", "--path", type="string", |
269 action="store", dest="path", help="new file path") | 321 action="store", dest="path", help="new file path") |
322 # set appdata: temporary directory in which the archives will be decompressed | |
270 parser.add_option("-a", "--appdata", type="string", | 323 parser.add_option("-a", "--appdata", type="string", |
271 action="store", dest="appdata", help="appdata folder name") | 324 action="store", dest="appdata", help="appdata folder name") |
272 parser.add_option("-v", "--version", action="store_true", dest="version", | 325 parser.add_option("-v", "--version", action="store_true", dest="version", |
273 default=False, help="display version and exit") | 326 default=False, help="display version and exit") |
274 | 327 |