32
|
1 #!/usr/bin/env python
|
|
2 import json
|
|
3 import optparse
|
|
4 import urllib
|
|
5 import os.path
|
|
6 import os
|
|
7 from operator import itemgetter
|
|
8 import tarfile
|
34
|
9 import zipfile
|
32
|
10
|
|
11 __version__ = "1.0.0"
|
|
12 CHUNK_SIZE = 2**20 #1mb
|
|
13 VALID_CHARS = '.-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ '
|
|
14
|
|
15
|
|
16 def splitext(path):
|
34
|
17 # extract the folder path and extension of a file from its path
|
32
|
18 for ext in ['.tar.gz', '.tar.bz2']:
|
|
19 if path.endswith(ext):
|
|
20 path, ext = path[:-len(ext)], path[-len(ext):]
|
|
21 break
|
|
22 else:
|
|
23 path, ext = os.path.splitext(path)
|
|
24 return path, ext[1:]
|
|
25
|
|
26
|
|
27 def chunk_write( source_stream, target_stream, source_method = "read", target_method="write" ):
|
|
28 source_method = getattr( source_stream, source_method )
|
|
29 target_method = getattr( target_stream, target_method )
|
|
30 while True:
|
|
31 chunk = source_method( CHUNK_SIZE )
|
|
32 if chunk:
|
|
33 target_method( chunk )
|
|
34 else:
|
|
35 break
|
|
36
|
|
37
|
|
38 def deconstruct_multi_filename( multi_filename ):
|
|
39 keys = [ 'primary', 'id', 'name', 'visible', 'file_type' ]
|
|
40 return ( dict( zip( keys, multi_filename.split('_') ) ) )
|
|
41
|
|
42
|
|
43 def construct_multi_filename( id, name, file_type ):
|
|
44 """ Implementation of *Number of Output datasets cannot be determined until tool run* from documentation_.
|
|
45 .. _documentation: http://wiki.galaxyproject.org/Admin/Tools/Multiple%20Output%20Files
|
|
46 """
|
|
47 filename = "%s_%s_%s_%s_%s" % ( 'primary', id, name, 'visible', file_type )
|
|
48 return filename
|
|
49
|
|
50
|
|
51 def download_from_query( query_data, target_output_filename ):
|
|
52 """ Download file from the json data and write it to target_output_filename.
|
|
53 """
|
|
54 query_url = query_data.get( 'url' )
|
|
55 query_file_type = query_data.get( 'extension' )
|
|
56 query_stream = urllib.urlopen( query_url )
|
|
57 output_stream = open( target_output_filename, 'wb' )
|
|
58 chunk_write( query_stream, output_stream )
|
|
59 query_stream.close()
|
|
60 output_stream.close()
|
|
61
|
34
|
62 def store_file_from_tarfile( file_object, target_output_filename, isString=False ):
|
|
63 # store the file_object (from tarfile) on the filesystem
|
32
|
64 output_stream = open( target_output_filename, 'wb' )
|
34
|
65 output_stream.write(file_object.read())
|
32
|
66 output_stream.close()
|
|
67
|
|
68
|
|
69 def download_extra_data( query_ext_data, base_path ):
|
|
70 """ Download any extra data defined in the JSON.
|
|
71 NOTE: the "path" value is a relative path to the file on our
|
|
72 file system. This is slightly dangerous and we should make every effort
|
|
73 to avoid a malicious absolute path to write the file elsewhere on the
|
|
74 filesystem.
|
|
75 """
|
|
76 for ext_data in query_ext_data:
|
|
77 if not os.path.exists( base_path ):
|
|
78 os.mkdir( base_path )
|
|
79 query_stream = urllib.urlopen( ext_data.get( 'url' ) )
|
|
80 ext_path = ext_data.get( 'path' )
|
|
81 os.makedirs( os.path.normpath( '/'.join( [ base_path, os.path.dirname( ext_path ) ] ) ) )
|
|
82 output_stream = open( os.path.normpath( '/'.join( [ base_path, ext_path ] ) ), 'wb' )
|
|
83 chunk_write( query_stream, output_stream )
|
|
84 query_stream.close()
|
|
85 output_stream.close()
|
|
86
|
|
87
|
|
88 def metadata_to_json( dataset_id, metadata, filename, ds_type='dataset', primary=False):
|
|
89 """ Return line separated JSON """
|
|
90 meta_dict = dict( type = ds_type,
|
|
91 ext = metadata.get( 'extension' ),
|
|
92 filename = filename,
|
|
93 name = metadata.get( 'name' ),
|
|
94 metadata = metadata.get( 'metadata', {} ) )
|
|
95 if metadata.get( 'extra_data', None ):
|
|
96 meta_dict[ 'extra_files' ] = '_'.join( [ filename, 'files' ] )
|
|
97 if primary:
|
|
98 meta_dict[ 'base_dataset_id' ] = dataset_id
|
|
99 else:
|
|
100 meta_dict[ 'dataset_id' ] = dataset_id
|
|
101 return "%s\n" % json.dumps( meta_dict )
|
|
102
|
|
103
|
34
|
104 def walk_on_archive(target_output_filename, check_ext, archive_library, archive_name, appdata_path, db_key="?"):
|
|
105 # fix archive name using valid chars only
|
|
106 archive_name = ''.join(e for e in archive_name if e in VALID_CHARS)
|
32
|
107 archive_name = archive_name.replace("_", "-").replace(".", "-")
|
34
|
108 if archive_library is "zipfile":
|
|
109 # iterate over entries inside the archive [zip]
|
|
110 with zipfile.ZipFile( target_output_filename, check_ext ) as zf:
|
|
111 for entry in zf.namelist():
|
|
112 # if entry is file
|
|
113 if entry.startswith("%s/" % entry.rstrip("/")) is False:
|
|
114 # retrieve file name
|
|
115 # the underscore character is reserved
|
|
116 filename = os.path.basename( entry.split("/")[-1] ).replace("_", "-")
|
|
117 # retrieve file extension
|
|
118 extension = splitext( filename )[1]
|
|
119 # if no extension use 'auto'
|
|
120 if (len(extension) == 0):
|
|
121 extension = "auto"
|
|
122 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+)_(?P<ext>[^_]+)_(?P<dbkey>[^_]+)
|
|
123 filename_with_collection_prefix = archive_name + "_" + filename + "_" + extension + "_" + db_key
|
|
124 # store current entry on filesystem
|
|
125 zf.extract( filename_with_collection_prefix, appdata_path )
|
|
126 elif archive_library is "tarfile":
|
|
127 # iterate over entries inside the archive [gz, bz2, tar]
|
|
128 with tarfile.open( target_output_filename, check_ext ) as tf:
|
|
129 for entry in tf:
|
|
130 if entry.isfile():
|
|
131 fileobj = tf.extractfile( entry )
|
|
132 # retrieve file name
|
|
133 # the underscore character is reserved
|
|
134 filename = os.path.basename( (entry.name).split("/")[-1] ).replace("_", "-")
|
|
135 # retrieve file extension
|
|
136 extension = splitext( filename )[1]
|
|
137 # if no extension use 'auto'
|
|
138 if (len(extension) == 0):
|
|
139 extension = "auto"
|
|
140 # pattern: (?P<identifier_0>[^_]+)_(?P<identifier_1>[^_]+)_(?P<ext>[^_]+)_(?P<dbkey>[^_]+)
|
|
141 filename_with_collection_prefix = archive_name + "_" + filename + "_" + extension + "_" + db_key
|
|
142 target_entry_output_filename = os.path.join(appdata_path, filename_with_collection_prefix)
|
|
143 # store current entry on filesystem
|
|
144 store_file_from_tarfile( fileobj, target_entry_output_filename )
|
32
|
145 return True
|
|
146
|
|
147
|
|
148 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary, appdata_path, options, args):
|
|
149 """ Main work function that operates on the JSON representation of
|
|
150 one dataset and its metadata. Returns True.
|
|
151 """
|
|
152 dataset_url, output_filename, \
|
|
153 extra_files_path, file_name, \
|
|
154 ext, out_data_name, \
|
|
155 hda_id, dataset_id = set_up_config_values(json_params)
|
|
156 extension = query_item.get( 'extension' )
|
|
157 url = query_item.get( 'url' )
|
|
158 filename = query_item.get( 'name' )
|
|
159
|
34
|
160 # the organize parameter is considered for archives only
|
|
161 organize = query_item.get( 'organize', None )
|
|
162 if organize is None:
|
|
163 organize = False
|
|
164 else:
|
|
165 if organize.lower() == "true":
|
|
166 organize = True
|
|
167 elif organize.lower() == "false":
|
|
168 organize = False
|
|
169 else:
|
|
170 # if organize parameter is malformed -> set organize to False
|
|
171 organize = False
|
|
172
|
|
173 # check file extension
|
|
174 # if the file is an archive -> do not write metadata and extract files
|
32
|
175 check_ext = ""
|
34
|
176 archive_library = None
|
32
|
177 if ( url.endswith( "gz" ) ):
|
|
178 check_ext = "r:gz"
|
34
|
179 archive_library = "tarfile"
|
32
|
180 elif ( url.endswith( "bz2" ) ):
|
|
181 check_ext = "r:bz2"
|
34
|
182 archive_library = "tarfile"
|
32
|
183 elif ( url.endswith( "tar" ) ):
|
|
184 check_ext = "r:"
|
34
|
185 archive_library = "tarfile"
|
|
186 elif ( url.endswith( "zip" ) ):
|
|
187 check_ext = "r"
|
|
188 archive_library = "zipfile"
|
32
|
189 isArchive = bool( check_ext and check_ext.strip() )
|
|
190
|
|
191 extra_data = query_item.get( 'extra_data', None )
|
|
192 if primary:
|
|
193 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename )
|
|
194 name = construct_multi_filename( hda_id, filename, extension )
|
|
195 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) )
|
34
|
196 if (isArchive is False) or ((isArchive is True) and (organize is False)):
|
32
|
197 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item,
|
|
198 target_output_filename,
|
|
199 ds_type='new_primary_dataset',
|
|
200 primary=primary) )
|
|
201 else:
|
|
202 target_output_filename = output_filename
|
34
|
203 if (isArchive is False) or ((isArchive is True) and (organize is False)):
|
32
|
204 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item,
|
|
205 target_output_filename,
|
|
206 ds_type='dataset',
|
|
207 primary=primary) )
|
|
208
|
34
|
209 if (isArchive is False) or ((isArchive is True) and (organize is False)):
|
32
|
210 download_from_query( query_item, target_output_filename )
|
|
211 else:
|
34
|
212 # if the current entry is an archive download it inside appdata folder
|
32
|
213 target_output_path = os.path.join(appdata_path, filename)
|
|
214 download_from_query( query_item, target_output_path )
|
|
215 if extra_data:
|
34
|
216 # just download extra data
|
32
|
217 extra_files_path = ''.join( [ target_output_filename, 'files' ] )
|
|
218 download_extra_data( extra_data, extra_files_path )
|
|
219
|
34
|
220 # if the current file is an archive and want to organize the content
|
|
221 # -> decompress the archive and populate the collection (has to be defined in the tool xml schema)
|
|
222 if isArchive and organize:
|
|
223 # set the same db_key for each file inside the archive
|
|
224 # use the db_key associated to the archive (if it exists)
|
32
|
225 db_key = "?"
|
|
226 archive_metadata = query_item.get( 'metadata', None )
|
|
227 if archive_metadata is not None:
|
|
228 try:
|
|
229 db_key = archive_metadata.get( 'db_key' )
|
|
230 except:
|
|
231 pass
|
34
|
232 archive_name = query_item.get( 'name', None )
|
|
233 if archive_name is None:
|
|
234 archive_name = filename
|
|
235 # iterate over the archive content
|
|
236 walk_on_archive(target_output_path, check_ext, archive_library, archive_name, appdata_path, db_key)
|
32
|
237
|
|
238 return True
|
|
239
|
|
240
|
|
241 def set_up_config_values(json_params):
|
|
242 """ Parse json_params file and return a tuple of necessary configuration
|
|
243 values.
|
|
244 """
|
|
245 datasource_params = json_params.get( 'param_dict' )
|
|
246 dataset_url = datasource_params.get( 'URL' )
|
|
247 output_filename = datasource_params.get( 'output1', None )
|
|
248 output_data = json_params.get( 'output_data' )
|
|
249 extra_files_path, file_name, ext, out_data_name, hda_id, dataset_id = \
|
|
250 itemgetter('extra_files_path', 'file_name', 'ext', 'out_data_name', 'hda_id', 'dataset_id')(output_data[0])
|
|
251 return (dataset_url, output_filename,
|
|
252 extra_files_path, file_name,
|
|
253 ext, out_data_name,
|
|
254 hda_id, dataset_id)
|
|
255
|
|
256
|
|
257 def download_from_json_data( options, args ):
|
|
258 """ Parse the returned JSON data and download files. Write metadata
|
|
259 to flat JSON file.
|
|
260 """
|
|
261 output_base_path = options.path
|
|
262 appdata_path = options.appdata
|
|
263 if not os.path.exists(appdata_path):
|
|
264 os.makedirs(appdata_path)
|
|
265
|
|
266 # read tool job configuration file and parse parameters we need
|
|
267 json_params = json.loads( open( options.json_param_file, 'r' ).read() )
|
34
|
268 #print("json_params: "+str(json_params))
|
32
|
269
|
|
270 dataset_url, output_filename, \
|
|
271 extra_files_path, file_name, \
|
|
272 ext, out_data_name, \
|
|
273 hda_id, dataset_id = set_up_config_values(json_params)
|
|
274 # line separated JSON file to contain all dataset metadata
|
|
275 metadata_parameter_file = open( json_params['job_config']['TOOL_PROVIDED_JOB_METADATA_FILE'], 'wb' )
|
|
276
|
|
277 # get JSON response from data source
|
|
278 # TODO: make sure response is not enormous
|
|
279 query_params = json.loads(urllib.urlopen( dataset_url ).read())
|
|
280 # download and write files
|
|
281 primary = False
|
|
282 #primary = True
|
|
283 # query_item, hda_id, output_base_path, dataset_id
|
|
284 for query_item in query_params:
|
|
285 if isinstance( query_item, list ):
|
|
286 # TODO: do something with the nested list as a collection
|
|
287 for query_subitem in query_item:
|
|
288 primary = download_files_and_write_metadata(query_subitem, json_params, output_base_path,
|
|
289 metadata_parameter_file, primary, appdata_path, options, args)
|
|
290
|
|
291 elif isinstance( query_item, dict ):
|
|
292 primary = download_files_and_write_metadata(query_item, json_params, output_base_path,
|
|
293 metadata_parameter_file, primary, appdata_path, options, args)
|
|
294 metadata_parameter_file.close()
|
|
295
|
|
296 def __main__():
|
|
297 """ Read the JSON return from a data source. Parse each line and request
|
|
298 the data, download to "newfilepath", and write metadata.
|
|
299
|
|
300 Schema
|
|
301 ------
|
|
302
|
|
303 [ {"url":"http://url_of_file",
|
34
|
304 "name":"My Archive",
|
|
305 "extension":"tar.gz",
|
|
306 "organize":"true",
|
|
307 "metadata":{"db_key":"hg38"},
|
32
|
308 "extra_data":[ {"url":"http://url_of_ext_file",
|
|
309 "path":"rel/path/to/ext_file"}
|
|
310 ]
|
|
311 }
|
|
312 ]
|
|
313
|
|
314 """
|
|
315 # Parse the command line options
|
34
|
316 usage = "Usage: json_collect_data_source.py max_size --json_param_file filename [options]"
|
32
|
317 parser = optparse.OptionParser(usage = usage)
|
|
318 parser.add_option("-j", "--json_param_file", type="string",
|
|
319 action="store", dest="json_param_file", help="json schema return data")
|
|
320 parser.add_option("-p", "--path", type="string",
|
|
321 action="store", dest="path", help="new file path")
|
34
|
322 # set appdata: temporary directory in which the archives will be decompressed
|
32
|
323 parser.add_option("-a", "--appdata", type="string",
|
|
324 action="store", dest="appdata", help="appdata folder name")
|
|
325 parser.add_option("-v", "--version", action="store_true", dest="version",
|
|
326 default=False, help="display version and exit")
|
|
327
|
|
328 (options, args) = parser.parse_args()
|
|
329 if options.version:
|
|
330 print __version__
|
|
331 else:
|
|
332 download_from_json_data( options, args )
|
|
333
|
|
334
|
|
335 if __name__ == "__main__": __main__()
|