Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/galaxy/tool_util/provided_metadata.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/galaxy/tool_util/provided_metadata.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,236 @@ +import json +import logging +import os +import re + +from galaxy.util import stringify_dictionary_keys + +log = logging.getLogger(__name__) + + +def parse_tool_provided_metadata(meta_file, provided_metadata_style=None, job_wrapper=None): + """Return a ToolProvidedMetadata object for specified file path. + + If meta_file is absent, return a NullToolProvidedMetadata. If provided_metadata_style is None + attempt to guess tool provided metadata type. + """ + if not os.path.exists(meta_file): + return NullToolProvidedMetadata() + if provided_metadata_style is None: + provided_metadata_style = _guess_tool_provided_metadata_style(meta_file) + + assert provided_metadata_style in ["legacy", "default"] + + if provided_metadata_style == "legacy": + return LegacyToolProvidedMetadata(meta_file, job_wrapper=job_wrapper) + elif provided_metadata_style == "default": + return ToolProvidedMetadata(meta_file) + + +def _guess_tool_provided_metadata_style(path): + try: + with open(path, "r") as f: + metadata = json.load(f) + metadata_type = metadata.get("type", None) + return "legacy" if metadata_type in ["dataset", "new_primary_dataset"] else "default" + except ValueError: + # Either empty or multiple JSON lines, either way we can safely treat + # it as legacy style. + return "legacy" + + +class BaseToolProvidedMetadata(object): + + def get_new_datasets(self, output_name): + """Find new datasets for dataset discovery for specified output. + + Return a list of such datasets. + + Called only in the context of discovering datasets when + discover_via="tool_provided_metadata" is defined in the tool. + """ + return [] + + def has_failed_outputs(self): + """Determine if generation of any of the outputs failed. + + If True, this method should also log information about at least the first such failed output. + """ + return False + + def get_new_dataset_meta_by_basename(self, output_name, basename): + """For a discovered dataset, get the corresponding metadata entry. + + The discovery may have been from explicit listing in this file (returned + from get_new_datasets) or via file regex, either way the basename of the + file is used to index the fetching of the metadata entry. + """ + return {} + + def get_unnamed_outputs(self): + """Return unnamed outputs dataset introduced for upload 2.0. + + Needs more formal specification but see output_collect for how destinations, + types, elements, etc... are consumed. + """ + return [] + + def get_dataset_meta(self, output_name, dataset_id, dataset_uuid): + """Return primary dataset metadata for specified output. + """ + return {} + + def rewrite(self): + """Write metadata back to the file system. + + If metadata has not changed via outputs specified as mutable, the + implementation class may opt to not re-write the file. + """ + return None + + def get_new_datasets_for_metadata_collection(self): + """Return all datasets tracked that are not explicit primary outputs. + """ + return [] + + +class NullToolProvidedMetadata(BaseToolProvidedMetadata): + pass + + +class LegacyToolProvidedMetadata(BaseToolProvidedMetadata): + + def __init__(self, meta_file, job_wrapper=None): + self.meta_file = meta_file + self.tool_provided_job_metadata = [] + + with open(meta_file, 'r') as f: + for line in f: + try: + line = stringify_dictionary_keys(json.loads(line)) + assert 'type' in line + except Exception: + log.exception('(%s) Got JSON data from tool, but data is improperly formatted or no "type" key in data' % getattr(job_wrapper, "job_id", None)) + log.debug('Offending data was: %s' % line) + continue + # Set the dataset id if it's a dataset entry and isn't set. + # This isn't insecure. We loop the job's output datasets in + # the finish method, so if a tool writes out metadata for a + # dataset id that it doesn't own, it'll just be ignored. + dataset_id_not_specified = line['type'] == 'dataset' and 'dataset_id' not in line + if dataset_id_not_specified: + dataset_basename = line['dataset'] + if job_wrapper: + try: + line['dataset_id'] = job_wrapper.get_output_file_id(dataset_basename) + except KeyError: + log.warning('(%s) Tool provided job dataset-specific metadata without specifying a dataset' % job_wrapper.job_id) + continue + else: + match = re.match(r'(galaxy_)?dataset_(.*)\.dat', dataset_basename) + if match is None: + raise Exception("processing tool_provided_metadata (e.g. galaxy.json) entry with invalid dataset name [%s]" % dataset_basename) + dataset_id = match.group(2) + if dataset_id.isdigit(): + line['dataset_id'] = dataset_id + else: + line['dataset_uuid'] = dataset_id + + self.tool_provided_job_metadata.append(line) + + def get_dataset_meta(self, output_name, dataset_id, dataset_uuid): + for meta in self.tool_provided_job_metadata: + if meta['type'] == 'dataset' and 'dataset_id' in meta and int(meta['dataset_id']) == dataset_id: + return meta + if meta['type'] == 'dataset' and 'dataset_uuid' in meta and meta['dataset_uuid'] == dataset_uuid: + return meta + return {} + + def get_new_dataset_meta_by_basename(self, output_name, basename): + for meta in self.tool_provided_job_metadata: + if meta['type'] == 'new_primary_dataset' and meta['filename'] == basename: + return meta + + def get_new_datasets(self, output_name): + log.warning("Called get_new_datasets with legacy tool metadata provider - that is unimplemented.") + return [] + + def has_failed_outputs(self): + found_failed = False + for meta in self.tool_provided_job_metadata: + if meta.get("failed", False): + log.info("One or more tool outputs is marked as failed (%s)." % meta) + found_failed = True + + return found_failed + + def get_unnamed_outputs(self): + return [] + + def rewrite(self): + with open(self.meta_file, 'wt') as job_metadata_fh: + for meta in self.tool_provided_job_metadata: + job_metadata_fh.write("%s\n" % (json.dumps(meta))) + + def get_new_datasets_for_metadata_collection(self): + for meta in self.tool_provided_job_metadata: + if meta['type'] == 'new_primary_dataset': + yield meta + + +class ToolProvidedMetadata(BaseToolProvidedMetadata): + + def __init__(self, meta_file): + self.meta_file = meta_file + with open(meta_file, 'r') as f: + self.tool_provided_job_metadata = json.load(f) + + def get_dataset_meta(self, output_name, dataset_id, dataset_uuid): + return self.tool_provided_job_metadata.get(output_name, {}) + + def get_new_dataset_meta_by_basename(self, output_name, basename): + datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", []) + for meta in datasets: + if meta['filename'] == basename: + return meta + + def get_new_datasets(self, output_name): + datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", []) + if not datasets: + elements = self.tool_provided_job_metadata.get(output_name, {}).get("elements", []) + if elements: + datasets = self._elements_to_datasets(elements) + return datasets + + def _elements_to_datasets(self, elements, level=0): + for element in elements: + extra_kwds = {"identifier_%d" % level: element["name"]} + if "elements" in element: + for inner_element in self._elements_to_datasets(element["elements"], level=level + 1): + dataset = extra_kwds.copy() + dataset.update(inner_element) + yield dataset + else: + dataset = extra_kwds + extra_kwds.update(element) + yield extra_kwds + + def has_failed_outputs(self): + found_failed = False + for output_name, meta in self.tool_provided_job_metadata.items(): + if output_name == "__unnamed_outputs": + continue + + if meta.get("failed", False): + log.info("One or more tool outputs is marked as failed (%s)." % meta) + found_failed = True + + return found_failed + + def get_unnamed_outputs(self): + log.debug("unnamed outputs [%s]" % self.tool_provided_job_metadata) + return self.tool_provided_job_metadata.get("__unnamed_outputs", []) + + def rewrite(self): + with open(self.meta_file, 'wt') as job_metadata_fh: + json.dump(self.tool_provided_job_metadata, job_metadata_fh)