Mercurial > repos > lain > history_metadata_extractor
view history_metadata_extractor.py @ 1:c7f4f2ac38f2 draft default tip
" master branch Updating"
author | lain |
---|---|
date | Tue, 09 Aug 2022 15:19:45 +0000 |
parents | 426b0f85a311 |
children |
line wrap: on
line source
#!/usr/bin/env python ## **@AUTHOR**: Lain Pavot - lain.pavot@inrae.fr ## **@DATE**: 22/06/2022 import json import os import sys STATIC = os.path.join(sys.path[0], "static") VENDOR = os.path.join(sys.path[0], "vendor") with open(os.path.join(STATIC, "app.css")) as css: CSS_STYLES = css.read() with open(os.path.join(VENDOR, "bootstrap.min.css")) as bootstrap: CSS_STYLES = f"{CSS_STYLES}\n{bootstrap.read()}" with open(os.path.join(STATIC, "app.js")) as js: JAVASCRIPT = js.read() with open(os.path.join(STATIC, "app.template.html")) as template: PAGE_TEMPLATE = template.read() with open(os.path.join(STATIC, "title.template.html")) as template: TITLE_TEMPLATE = template.read() with open(os.path.join(STATIC, "table.template.html")) as template: TABLE_TEMPLATE = template.read() with open(os.path.join(STATIC, "header_list.template.html")) as template: HEADER_LIST_TEMPLATE = template.read() HEADER_LIST_TEMPLATE = '\n'.join(( "<thead>", " <tr>", "{header_list}", " </tr>", "</thead>", )) HEADER_TEMPLATE = "<th scope=\"col\">{}</th>" COLUMN_TEMPLATE = "<th scope=\"row\">{}</th>" TABLE_LINE_LIST_TEMPLATE = '\n'.join(( "<tr class=\"{classes}\">", "{table_lines}", "</tr>", )) TABLE_LINE_TEMPLATE = "<td>{}</td>" INDENT = " " HISTORY_CACHE = {} def indent(text): if text.startswith("\n"): return text.replace("\n", f"\n{INDENT}") else: return INDENT+text.replace("\n", f"\n{INDENT}") def noempty(ls, as_list=True): if as_list: return [x for x in ls if x] return (x for x in ls if x) def join_noempty(ls, sep=';'): return sep.join(noempty(ls, as_list=False)) def extract_dataset_attributes(dataset_attrs): for dataset_attr in dataset_attrs: HISTORY_CACHE[dataset_attr["encoded_id"]] = dataset_attr HISTORY_CACHE["dataset_attrs"] = dataset_attrs def convert_to_html(jobs_attrs, dataset_attrs=None): if dataset_attrs: extract_dataset_attributes(dataset_attrs) return PAGE_TEMPLATE.format( styles=CSS_STYLES.replace("\n<", "\n <"), javascript=JAVASCRIPT, title=indent(indent(get_title(jobs_attrs))), table_list=indent(indent(get_table_list(jobs_attrs))) ) def get_title(jobs_attrs): galaxy_version = jobs_attrs[0]["galaxy_version"] or "Unknown version" return TITLE_TEMPLATE.format(galaxy_version=galaxy_version) def get_table_list(jobs_attrs): return '\n'.join(( convert_item_to_table(job_attr, dataset_id) for job_attr in jobs_attrs for dataset_id_set in sorted(list(( job_attr["output_dataset_mapping"] or {1:"unknown"} ).values())) for dataset_id in sorted(dataset_id_set) )) def convert_item_to_table(job_attr, dataset_id): encoded_jid = job_attr.get("encoded_id") if HISTORY_CACHE: history = HISTORY_CACHE.get(dataset_id, {}) hid = history.get("hid", "DELETED") else: hid = "?" exit_code = job_attr.get("exit_code") if job_attr["exit_code"] == 0: status = f"Ok ({exit_code})" classes = "alert alert-success" else: status = f"Failed ({exit_code})" classes = "alert alert-danger" if hid == "DELETED": classes += " history_metadata_extractor_deleted" tool_name = job_attr["tool_id"] or "unknown" if tool_name.count("/") >= 4: tool_name = job_attr["tool_id"].split("/")[-2] tool_name = tool_name + " - " + job_attr["tool_version"] tool_name = f"[{hid}] - {tool_name}" return TABLE_TEMPLATE.format( classes=classes, tool_name=tool_name, tool_status=status, table=convert_parameters_to_html(job_attr) ) def convert_parameters_to_html(job_attr): params = job_attr["params"] params_enrichment(job_attr, params) keys = [ key for key in iter_parameter_keys(params) if key not in ("dbkey", "chromInfo", "__input_ext", "request_json") ] return '\n'.join(( indent(get_table_header(params, ["value", "name", "extension", "hid"])), indent(get_table_lines(params, keys)), )) def params_enrichment(job_attr, params): if ( all(map(params.__contains__, ("request_json", "files"))) and "encoded_id" in job_attr ): params.update(json.loads(params.pop("request_json"))) for i, target in enumerate(params.pop("targets")): files = target["elements"] params["files"][i]["hid"] = join_noempty( str(file["object_id"]) for file in files ) params["files"][i]["name"] = join_noempty( str(file["name"]) for file in files ) params["files"][i]["extension"] = join_noempty( str(file["ext"]) for file in files ) def iter_parameter_keys(params): for key in params: param = params[key] if param_is_file(param): yield key elif isinstance(param, dict): for subkey in iter_parameter_keys(param): if subkey not in ("__current_case__", ): yield f"{key}.{subkey}" else: yield key def param_is_file(param): return is_file_v1(param) or is_file_v2(param) def is_file_v1(param): return ( isinstance(param, dict) and all(map( param.__contains__, ("info", "peek", "name", "extension") )) ) def is_file_v2(param): return ( isinstance(param, dict) and "values" in param and isinstance(param["values"], list) and isinstance(param["values"][0], dict) and all(map(param["values"][0].__contains__, ("id", "src"))) ) def get_table_header(params, keys): return HEADER_LIST_TEMPLATE.format( header_list=indent(indent('\n'.join(map(HEADER_TEMPLATE.format, [""]+keys)))) ) def get_table_lines(params, keys): return ''.join(table_lines_iterator(params, keys)) def table_lines_iterator(params, param_names): keys = ("value", "name", "extension", "hid",) for param_name in param_names: classes = "" table_lines = [] subparam = params while '.' in param_name: subkey, param_name = param_name.split('.', 1) subparam = subparam[subkey] for key in keys: param = extract_param_info(key, subparam[param_name]) table_lines.append(param) yield TABLE_LINE_LIST_TEMPLATE.format( classes=classes, table_lines=( indent(COLUMN_TEMPLATE.format(param_name) + '\n') + indent('\n'.join(map( TABLE_LINE_TEMPLATE.format, table_lines ))) ) ) def extract_param_info(key, param): if key == "value": return extract_param_value(param) if isinstance(param, dict) and key in param: return str(param[key]) if isinstance(param, list): return join_noempty(extract_param_info(key, p) for p in param) return "" def extract_param_value(param): if isinstance(param, dict): if "__current_case__" in param: return join_dict_key_values(param, ignore=("__current_case__", )) for acceptable_value in ("file_data", "file_name"): if acceptable_value in param: return f"{acceptable_value}: {param[acceptable_value]}" if "values" in param: ids = [] for file_id in param["values"]: file_id = file_id["id"] if file_id in HISTORY_CACHE: file_info = HISTORY_CACHE[file_id] param["name"] = file_info["name"] param["hid"] = file_info["hid"] param["extension"] = file_info["extension"] ids.append(file_id) return join_noempty(ids) if isinstance(param, (str, int, float)): return str(param) if isinstance(param, (list, tuple)): return join_noempty(map(extract_param_value, param)) return str(param) def join_dict_key_values(dico, ignore=()): return join_noempty( f"{name}: {dico[name]}" for name in dico if name not in ignore ) if __name__ == "__main__": import optparse parser = optparse.OptionParser() parser.add_option( "-j", "--jobs-attrs", dest="jobs_attrs", help="write report of FILE", metavar="FILE", default="jobs_attrs.txt" ) parser.add_option( "-d", "--dataset-attrs", dest="dataset_attrs", help="extract additional info from this file", metavar="FILE", default=None, ) parser.add_option( "-o", "--output", dest="output", help="write report to FILE", metavar="FILE", default="out.html" ) parser.add_option( "-v", "--version", action="store_true", help="Show this script's version and exits", ) (options, args) = parser.parse_args() if options.version: import re with open(os.path.join(sys.path[0], "README.md")) as readme: for line in readme.readlines(): if "**@VERSION**" in line: print(re.search(r"\d+\.\d+\.\d+", line)[0]) sys.exit(0) with open(options.jobs_attrs) as j: jobs_attrs = json.load(j) if options.dataset_attrs is not None: with open(options.dataset_attrs) as ds: dataset_attrs = json.load(ds) else: dataset_attrs = {} jobs_attrs = [{ key: jobs_attr.get(key) for key in ( "galaxy_version", "tool_id", "tool_version", "encoded_id", "params", "output_datasets", "exit_code", "output_dataset_mapping", ) } for jobs_attr in jobs_attrs] if jobs_attrs and jobs_attrs[0].get("output_datasets"): jobs_attrs = sorted( jobs_attrs, key=lambda x:x["output_datasets"][0] ) with open(options.output, "w") as o: o.write(convert_to_html(jobs_attrs, dataset_attrs=dataset_attrs))