Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/galaxy/tool_util/provided_metadata.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 import json | |
2 import logging | |
3 import os | |
4 import re | |
5 | |
6 from galaxy.util import stringify_dictionary_keys | |
7 | |
8 log = logging.getLogger(__name__) | |
9 | |
10 | |
11 def parse_tool_provided_metadata(meta_file, provided_metadata_style=None, job_wrapper=None): | |
12 """Return a ToolProvidedMetadata object for specified file path. | |
13 | |
14 If meta_file is absent, return a NullToolProvidedMetadata. If provided_metadata_style is None | |
15 attempt to guess tool provided metadata type. | |
16 """ | |
17 if not os.path.exists(meta_file): | |
18 return NullToolProvidedMetadata() | |
19 if provided_metadata_style is None: | |
20 provided_metadata_style = _guess_tool_provided_metadata_style(meta_file) | |
21 | |
22 assert provided_metadata_style in ["legacy", "default"] | |
23 | |
24 if provided_metadata_style == "legacy": | |
25 return LegacyToolProvidedMetadata(meta_file, job_wrapper=job_wrapper) | |
26 elif provided_metadata_style == "default": | |
27 return ToolProvidedMetadata(meta_file) | |
28 | |
29 | |
30 def _guess_tool_provided_metadata_style(path): | |
31 try: | |
32 with open(path, "r") as f: | |
33 metadata = json.load(f) | |
34 metadata_type = metadata.get("type", None) | |
35 return "legacy" if metadata_type in ["dataset", "new_primary_dataset"] else "default" | |
36 except ValueError: | |
37 # Either empty or multiple JSON lines, either way we can safely treat | |
38 # it as legacy style. | |
39 return "legacy" | |
40 | |
41 | |
42 class BaseToolProvidedMetadata(object): | |
43 | |
44 def get_new_datasets(self, output_name): | |
45 """Find new datasets for dataset discovery for specified output. | |
46 | |
47 Return a list of such datasets. | |
48 | |
49 Called only in the context of discovering datasets when | |
50 discover_via="tool_provided_metadata" is defined in the tool. | |
51 """ | |
52 return [] | |
53 | |
54 def has_failed_outputs(self): | |
55 """Determine if generation of any of the outputs failed. | |
56 | |
57 If True, this method should also log information about at least the first such failed output. | |
58 """ | |
59 return False | |
60 | |
61 def get_new_dataset_meta_by_basename(self, output_name, basename): | |
62 """For a discovered dataset, get the corresponding metadata entry. | |
63 | |
64 The discovery may have been from explicit listing in this file (returned | |
65 from get_new_datasets) or via file regex, either way the basename of the | |
66 file is used to index the fetching of the metadata entry. | |
67 """ | |
68 return {} | |
69 | |
70 def get_unnamed_outputs(self): | |
71 """Return unnamed outputs dataset introduced for upload 2.0. | |
72 | |
73 Needs more formal specification but see output_collect for how destinations, | |
74 types, elements, etc... are consumed. | |
75 """ | |
76 return [] | |
77 | |
78 def get_dataset_meta(self, output_name, dataset_id, dataset_uuid): | |
79 """Return primary dataset metadata for specified output. | |
80 """ | |
81 return {} | |
82 | |
83 def rewrite(self): | |
84 """Write metadata back to the file system. | |
85 | |
86 If metadata has not changed via outputs specified as mutable, the | |
87 implementation class may opt to not re-write the file. | |
88 """ | |
89 return None | |
90 | |
91 def get_new_datasets_for_metadata_collection(self): | |
92 """Return all datasets tracked that are not explicit primary outputs. | |
93 """ | |
94 return [] | |
95 | |
96 | |
97 class NullToolProvidedMetadata(BaseToolProvidedMetadata): | |
98 pass | |
99 | |
100 | |
101 class LegacyToolProvidedMetadata(BaseToolProvidedMetadata): | |
102 | |
103 def __init__(self, meta_file, job_wrapper=None): | |
104 self.meta_file = meta_file | |
105 self.tool_provided_job_metadata = [] | |
106 | |
107 with open(meta_file, 'r') as f: | |
108 for line in f: | |
109 try: | |
110 line = stringify_dictionary_keys(json.loads(line)) | |
111 assert 'type' in line | |
112 except Exception: | |
113 log.exception('(%s) Got JSON data from tool, but data is improperly formatted or no "type" key in data' % getattr(job_wrapper, "job_id", None)) | |
114 log.debug('Offending data was: %s' % line) | |
115 continue | |
116 # Set the dataset id if it's a dataset entry and isn't set. | |
117 # This isn't insecure. We loop the job's output datasets in | |
118 # the finish method, so if a tool writes out metadata for a | |
119 # dataset id that it doesn't own, it'll just be ignored. | |
120 dataset_id_not_specified = line['type'] == 'dataset' and 'dataset_id' not in line | |
121 if dataset_id_not_specified: | |
122 dataset_basename = line['dataset'] | |
123 if job_wrapper: | |
124 try: | |
125 line['dataset_id'] = job_wrapper.get_output_file_id(dataset_basename) | |
126 except KeyError: | |
127 log.warning('(%s) Tool provided job dataset-specific metadata without specifying a dataset' % job_wrapper.job_id) | |
128 continue | |
129 else: | |
130 match = re.match(r'(galaxy_)?dataset_(.*)\.dat', dataset_basename) | |
131 if match is None: | |
132 raise Exception("processing tool_provided_metadata (e.g. galaxy.json) entry with invalid dataset name [%s]" % dataset_basename) | |
133 dataset_id = match.group(2) | |
134 if dataset_id.isdigit(): | |
135 line['dataset_id'] = dataset_id | |
136 else: | |
137 line['dataset_uuid'] = dataset_id | |
138 | |
139 self.tool_provided_job_metadata.append(line) | |
140 | |
141 def get_dataset_meta(self, output_name, dataset_id, dataset_uuid): | |
142 for meta in self.tool_provided_job_metadata: | |
143 if meta['type'] == 'dataset' and 'dataset_id' in meta and int(meta['dataset_id']) == dataset_id: | |
144 return meta | |
145 if meta['type'] == 'dataset' and 'dataset_uuid' in meta and meta['dataset_uuid'] == dataset_uuid: | |
146 return meta | |
147 return {} | |
148 | |
149 def get_new_dataset_meta_by_basename(self, output_name, basename): | |
150 for meta in self.tool_provided_job_metadata: | |
151 if meta['type'] == 'new_primary_dataset' and meta['filename'] == basename: | |
152 return meta | |
153 | |
154 def get_new_datasets(self, output_name): | |
155 log.warning("Called get_new_datasets with legacy tool metadata provider - that is unimplemented.") | |
156 return [] | |
157 | |
158 def has_failed_outputs(self): | |
159 found_failed = False | |
160 for meta in self.tool_provided_job_metadata: | |
161 if meta.get("failed", False): | |
162 log.info("One or more tool outputs is marked as failed (%s)." % meta) | |
163 found_failed = True | |
164 | |
165 return found_failed | |
166 | |
167 def get_unnamed_outputs(self): | |
168 return [] | |
169 | |
170 def rewrite(self): | |
171 with open(self.meta_file, 'wt') as job_metadata_fh: | |
172 for meta in self.tool_provided_job_metadata: | |
173 job_metadata_fh.write("%s\n" % (json.dumps(meta))) | |
174 | |
175 def get_new_datasets_for_metadata_collection(self): | |
176 for meta in self.tool_provided_job_metadata: | |
177 if meta['type'] == 'new_primary_dataset': | |
178 yield meta | |
179 | |
180 | |
181 class ToolProvidedMetadata(BaseToolProvidedMetadata): | |
182 | |
183 def __init__(self, meta_file): | |
184 self.meta_file = meta_file | |
185 with open(meta_file, 'r') as f: | |
186 self.tool_provided_job_metadata = json.load(f) | |
187 | |
188 def get_dataset_meta(self, output_name, dataset_id, dataset_uuid): | |
189 return self.tool_provided_job_metadata.get(output_name, {}) | |
190 | |
191 def get_new_dataset_meta_by_basename(self, output_name, basename): | |
192 datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", []) | |
193 for meta in datasets: | |
194 if meta['filename'] == basename: | |
195 return meta | |
196 | |
197 def get_new_datasets(self, output_name): | |
198 datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", []) | |
199 if not datasets: | |
200 elements = self.tool_provided_job_metadata.get(output_name, {}).get("elements", []) | |
201 if elements: | |
202 datasets = self._elements_to_datasets(elements) | |
203 return datasets | |
204 | |
205 def _elements_to_datasets(self, elements, level=0): | |
206 for element in elements: | |
207 extra_kwds = {"identifier_%d" % level: element["name"]} | |
208 if "elements" in element: | |
209 for inner_element in self._elements_to_datasets(element["elements"], level=level + 1): | |
210 dataset = extra_kwds.copy() | |
211 dataset.update(inner_element) | |
212 yield dataset | |
213 else: | |
214 dataset = extra_kwds | |
215 extra_kwds.update(element) | |
216 yield extra_kwds | |
217 | |
218 def has_failed_outputs(self): | |
219 found_failed = False | |
220 for output_name, meta in self.tool_provided_job_metadata.items(): | |
221 if output_name == "__unnamed_outputs": | |
222 continue | |
223 | |
224 if meta.get("failed", False): | |
225 log.info("One or more tool outputs is marked as failed (%s)." % meta) | |
226 found_failed = True | |
227 | |
228 return found_failed | |
229 | |
230 def get_unnamed_outputs(self): | |
231 log.debug("unnamed outputs [%s]" % self.tool_provided_job_metadata) | |
232 return self.tool_provided_job_metadata.get("__unnamed_outputs", []) | |
233 | |
234 def rewrite(self): | |
235 with open(self.meta_file, 'wt') as job_metadata_fh: | |
236 json.dump(self.tool_provided_job_metadata, job_metadata_fh) |