Mercurial > repos > lain > ms_to_peakforest_it
diff server.py @ 0:b58b229c4cbf draft
planemo upload commit 523a9c8df173302ad38e9f15e7d82eab01736551-dirty
author | lain |
---|---|
date | Fri, 03 Mar 2023 14:10:24 +0000 |
parents | |
children | 7e3085fc60c1 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server.py Fri Mar 03 14:10:24 2023 +0000 @@ -0,0 +1,1120 @@ +#!/usr/bin/env python3 + +import atexit +import csv +import http.server +import json +import logging +import os +import re +import shutil +import socketserver +import sys +import tempfile +import yaml + +TAB_LIST_PLACEHOLDER = "TAB_LIST_PLACEHOLDER" +MS_PEAK_VALUES_PLACEHOLDER = "MS_PEAK_VALUES_PLACEHOLDER" +COMPOUND_NAME_PLACEHOLDER = "COMPOUND_NAME_PLACEHOLDER" +TAB_INDEX_PLACEHOLDER = "TAB_INDEX_PLACEHOLDER" +EMBED_JS_PLACEHOLDER = "EMBED_JS" +ACTIVE_TAB_PLACEHOLDER = "ACTIVE_TAB_PLACEHOLDER" +ADD_SPECTRUM_FORM = "ADD_SPECTRUM_FORM" +PRODUCE_JSON_PLACEHOLDER = "PRODUCE_JSON_PLACEHOLDER" + +COMPOUND_REF = "compound-ref" +COMPOUND_MIX = "compound-mix" + +END_MS_PEAK_VALUES_PLACEHOLDER = " ]" +MS_DATA_COLUMN_NUMBER = 9 +DEFAULT_MS_PEAK_VALUES = ( + "[\n" + + (" [" + ','.join([' ""'] * MS_DATA_COLUMN_NUMBER) + "],\n") * 17 + + END_MS_PEAK_VALUES_PLACEHOLDER +) + +FRAGNOT_HEADER = { + "m/z": "fragment_mz", + "absolute_intensity": "abs_intensity", + "relative_intensity": "rel_intensity", + "theo_mass": "", + "delta_ppm": "ppm", + "rdbequiv": "", + "composition": "", + "attribution": "fragment", +} + +MS_2_SNOOP_HEADER = { + "name": str, + "inchikey": str, + "composition": str, + "fragment": str, + "fragment_mz": str, + "ppm": str, + "fileid": str, + "correlation": str, + "abs_intensity": lambda x:float(x) * 100, + "rel_intensity": lambda x:float(x) * 100, + "valid_corelation": str +} + + +class ConfigException(ValueError): + """ + An exception raised when something went wrong in the config and we + cannot continue - i.e: when there's no token for peakforest + """ + +class YAMLConfig(dict): + + """ + Dictionary that handles key with dot in them: + test["truc.chose"] + is equivalant to + test["truc"]["chose"] + Assignation works too. + Add the possibility to use placeholders: + --- yaml + test: {{ truc.chose }} + truc: + chose: bidule + --- + here, test's value is "bidule" + """ + + def __init__(self, *args, **kwargs): + meta_conf = kwargs.pop("__meta_config__", {}) + self._debug = meta_conf.get("__debug__", False) + self._stream_name = meta_conf.get("__debug_stream__", "stdout") + self._debug_stream = getattr(sys, self._stream_name) + self._only_root_debug = meta_conf.get("__only_root_debug__", False) + if "__root__" in kwargs: + if self._only_root_debug: + self._debug = False + self._name = kwargs.pop("__name__") + self._debugger("Is not root config.") + self._root = kwargs.pop("__root__") + else: + self._name = "root" + self._debugger("Is root config.") + self._root = self + super().__init__(*args, **kwargs) + for key, value in self.copy().items(): + if isinstance(value, dict) and not isinstance(value, YAMLConfig): + self._debugger(f"Parsing sub-config for {key}") + self[key] = self._propagate(value, key) + self._replace_placeholders(self) + self._extract_defaults() + + def _propagate(self, sub_dict, name): + if isinstance(sub_dict, dict) and not isinstance(sub_dict, self.__class__): + return YAMLConfig( + **sub_dict, + __name__=name, + __root__=self._root, + __meta_config__={ + "__debug__": self._debug, + "__debug_stream__": self._stream_name, + "__only_root_debug__": self._only_root_debug, + } + ) + return sub_dict + + def _debugger(self, message): + if self._debug: + self._debug_stream.write(f"[{self._name}]: {message}\n") + self._debug_stream.flush() + + def __getattr__(self, attr): + if attr in self: + return self[attr] + if '.' in attr: + attr, sub = attr.split('.', 1) + return getattr(getattr(self, attr), sub) + return super().__getattribute__(attr) + + def _replace_placeholders(self, subpart): + self._debugger("Replacing placeholders...") + for sub_key, sub_item in subpart.copy().items(): + if isinstance(sub_item, str): + for placeholder in re.findall("{{ (?P<placeholder>.*?) }}", sub_item): + if placeholder not in self._root: + self._debugger(f"Could not fine replacement for {placeholder}") + continue + replacement = self._root[placeholder] + if isinstance(replacement, str): + self._debugger(f"Found placeholder: {placeholder} -> {replacement}") + sub_item = sub_item.replace( + "{{ " + placeholder + " }}", + replacement + ) + else: + self._debugger(f"Found placeholder: {placeholder} -> {replacement.__class__.__name__}") + sub_item = self._propagate(replacement, placeholder) + dict.__setitem__(subpart, sub_key, sub_item) + elif isinstance(sub_item, dict): + super().__setitem__(sub_key, self._propagate(sub_item, sub_key)) + + def _extract_defaults(self): + if self._root is not self: + return + if "defaults" not in self: + self._debugger("No defaults here.") + return + if "arguments" not in self: + self._debugger("Arguments creation...") + self["arguments"] = self._propagate({}, "arguments") + self._debugger("Populating arguments with defaults values") + for key, value in self.defaults.items(): + if key not in self: + if isinstance(value, dict): + value = self._propagate(value, key) + self.arguments[key] = value + self._debugger(f"Default {key} = {value}") + + def __setitem__(self, key, value): + if isinstance(value, dict): + value = self._propagate(value, key) + if "." not in key: + return super().__setitem__(key, value) + curent = self + key, subkey = key.rsplit(".", 1) + self[key][subkey] = value + + def __getitem__(self, key): + if super().__contains__(key): + return super().__getitem__(key) + if "." not in key: + return super().__getitem__(key) + curent = self + while "." in key: + key, subkey = key.split(".", 1) + curent = curent[key] + key = subkey + if subkey not in curent: + curent[subkey] = self._propagate({}, subkey) + result = curent[subkey] + return result + + def __contains__(self, key): + if "." not in key: + return super().__contains__(key) + key, subkey = key.split(".", 1) + if not super().__contains__(key): + return False + return subkey in self[key] + + def copy(self): + return { + key: ( + value if not isinstance(value, dict) + else value.copy() + ) for key, value in self.items() + } + +class YAMLParameters(YAMLConfig): + + """ + Parses parameters from the command line and put them + in the config. + Uses the config to know which parameter is recognized, or not, + to know the metadata (author, version), + which command is a flag, is optional, the help strings, etc... + Assigns default small parameter if not defined in the "shortcut" + section of the config file. + CLI config must be in the root section "parameters": + --- + parameters: + mandatory: + input: input file path + flags: + help: Show this help + optional: + method: "default is {{ defaults.method }}" + meta: + author: Lain Pavot + version: 1.1.0 + shortcuts: + help: h + ## will autogenerate -i for input and -m for method + --- + default parameters are searched in the "default" root section. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._errors = list() + if not self.parameters.shortcuts: + self.parameters["shortcuts"] = YAMLConfig() + self._mandatory = self.parameters.mandatory + self._optional = self.parameters.optional + self._flags = { + flag: False + for flag in self.parameters.flags + } + self._all_params = self._optional.copy() + self._all_params.update(self._mandatory) + self._all_params.update(self._flags) + self._small_params = dict() + self._determine_small_params() + + @property + def in_error(self): + return bool(self._errors) + + @property + def sorted_keys(self): + return sorted(self._all_params.keys()) + + @property + def sorted_items(self): + return sorted(self._all_params.items()) + + def _determine_small_params(self, verbose=False): + self._small_params = (self.parameters.shortcuts or {}).copy() + chars = list(map(chr, range(97, 123))) + list(map(chr, range(65, 91))) + all_params = self._all_params.copy() + for long, short in self._small_params.items(): + chars.remove(short) + del all_params[long] + for param in all_params.copy().keys(): + for operation in ( + lambda x:x[0], ## select first char + lambda x:x.split('-', 1)[-1][0], ## first char after - + lambda x:x.split('_', 1)[-1][0], ## first char after _ + lambda x:x.split('.', 1)[-1][0], ## first char after . + lambda x:x[0].upper(), ## select first char + lambda x:x.split('-', 1)[-1][0].upper(), ## first char after - + lambda x:x.split('_', 1)[-1][0].upper(), ## first char after _ + lambda x:x.split('.', 1)[-1][0].upper(), ## first char after . + lambda x: chars[0], ## first letter in the alphabet + ): + char = operation(param) + if char not in self._small_params.values(): + self._small_params[param] = char + chars.remove(char) + del all_params[param] + break + + def _get_parameter_index(self, parameter, original): + if f"--{parameter}" in sys.argv: + return sys.argv.index(f"--{parameter}") + parameter = self._small_params[original] + if f"-{parameter}" in sys.argv: + return sys.argv.index(f"-{parameter}") + return None + + def as_parameter(self, string): + return ( + string + .replace('.', '-') + .replace('_', '-') + ) + + def show_version(self): + print(self.parameters.meta.version) + + def show_help(self): + parameters = [ + f"-{self._small_params[arg]}|--{self.as_parameter(arg)} {arg}" + for arg in self._mandatory + ] + [ + f"[-{self._small_params[arg]}|--{self.as_parameter(arg)} {arg}]" + for arg in self._optional + ] + [ + f"[-{self._small_params[arg]}|--{self.as_parameter(arg)}]" + for arg in self._flags + ] + print( + f"Usage: {__file__} " + ' '.join(parameters) + + "\n\n" + + '\n'.join( + f" -{self._small_params[args]}|--{self.as_parameter(args)}: {help_str}" + for args, help_str in self.sorted_items + ) + + "\n\n" + + '\n'.join( + f"{key}: {value}" + for key, value in self.parameters.meta.items() + ) + ) + sys.exit(0) + + def parse_args(self): + errors = list() + for kind in ("mandatory", "optional", "flags"): + keys = list(sorted(getattr(self, f"_{kind}").keys())) + for original_param, actual_param in zip( + keys, + map(self.as_parameter, keys), + ): + if original_param in self.defaults: + self.arguments[original_param] = self.defaults[original_param] + elif kind == "flags": + self.arguments[original_param] = False + parser = getattr(self, f"parse_{kind}") + if (error := parser(original_param, actual_param)): + errors.append(error) + self._errors = errors + return self + + def parse_mandatory(self, original, actual): + if (index := self._get_parameter_index(actual, original)) is None: + return f"The parameter --{actual} is mandatory." + if index == len(sys.argv) - 1: + return f"The parameter --{actual} needs a value." + self.arguments[original] = sys.argv[index + 1] + + def parse_optional(self, original, actual): + if (index := self._get_parameter_index(actual, original)) is None: + return + if index == len(sys.argv) - 1: + return f"The parameter --{actual} needs a value." + self.arguments[original] = sys.argv[index + 1] + + def parse_flags(self, original, actual): + if (index := self._get_parameter_index(actual, original)) is None: + return + self.arguments[original] = True + +def parse_config(**kwargs): + """ + opens the config file, extract it using pyyaml's safe loader + and tries to extract and apply a maximum of informations/directives + from the config: + - token retrieval + - workdir management + - tempfile management + """ + root_dir = os.path.dirname(os.path.abspath(__file__)) + with open(os.path.join(root_dir, "config.yml")) as config_file: + config = YAMLConfig( + **yaml.load(config_file.read(), Loader=yaml.SafeLoader), + **kwargs + ) + + if not config.token.value: + if config.token.use_file: + if (not os.path.exists(path := config.token.file_path)): + raise ConfigException("Missing token value or token file.") + with open(path) as token_file: + config.token["value"] = token_file.read() + elif config.defaults.peakforest.token: + config.token["value"] = config.defaults.peakforest.token + + if config.workdir.create_tmp: + tmp_dir = tempfile.mkdtemp() + atexit.register(lambda:shutil.rmtree(tmp_dir)) + else: + tmp_dir = tempfile.gettempdir() + config.workdir["tmp_dir"] = tmp_dir + + config["root_dir"] = root_dir + config["tab_list"] = [] + config["form_template"] = os.path.join(root_dir, config.templates.form) + config["meta_template"] = os.path.join(root_dir, config.templates.main) + config["js_template"] = os.path.join(root_dir, config.templates.js) + config["tab_list_template"] = os.path.join(root_dir, config.templates.tab_list) + config["placeholders"] = dict() + config.placeholders[MS_PEAK_VALUES_PLACEHOLDER] = DEFAULT_MS_PEAK_VALUES + config.placeholders[TAB_INDEX_PLACEHOLDER] = "1" + config.placeholders[ACTIVE_TAB_PLACEHOLDER] = "active" + config.placeholders[ADD_SPECTRUM_FORM] = "" + config.placeholders[EMBED_JS_PLACEHOLDER] = "" + config.placeholders[TAB_LIST_PLACEHOLDER] = "" + config.placeholders["DEFAULT_MIN_MZ"] = "50" + config.placeholders["DEFAULT_MAX_MZ"] = "500" + config.placeholders["DEFAULT_RESOLUTION_LOW"] = "" + config.placeholders["DEFAULT_RESOLUTION_HIGH"] = "selected=\"selected\"" + config.placeholders["DEFAULT_RESOLUTION_UNSET"] = "" + config.placeholders["DEFAULT_MIN_RT"] = "0.9" + config.placeholders["DEFAULT_MAX_RT"] = "1.4" + return config + +def parse_parameters(config): + """ + parses command line and checks provided values are acceptable/usable. + Raises some error if not. + """ + parameters = YAMLParameters(**config) + parameters.parse_args() + + parameters["json_result"] = [] + + get_logger(parameters) + + arguments = parameters.arguments + if arguments.help: + parameters.show_help() + sys.exit(0) + + if arguments.version: + parameters.show_version() + sys.exit(0) + + if parameters.in_error: + raise ValueError( + "Some errors occured during parameters extraction: \n" + + '\n'.join(parameters.errors) + ) + + if arguments.sample_type == COMPOUND_MIX: + parameters["form_template"] = os.path.join( + parameters["root_dir"], + parameters.templates.form_mix + ) + parameters["meta_template"] = os.path.join( + parameters["root_dir"], + parameters.templates.main_mix + ) + elif arguments.sample_type == COMPOUND_REF: + parameters["form_template"] = os.path.join( + parameters["root_dir"], + parameters.templates.form_ref + ) + parameters["meta_template"] = os.path.join( + parameters["root_dir"], + parameters.templates.main_ref + ) + + arguments["produce_json"] = ( + "output_json" in arguments + and arguments["output_json"] != "" + ) + if arguments.produce_json: + parameters.placeholders[PRODUCE_JSON_PLACEHOLDER] = "true" + parameters.json_result = [] + arguments["output_json"] = os.path.abspath(arguments["output_json"]) + atexit.register(save_json, parameters) + else: + parameters.placeholders[PRODUCE_JSON_PLACEHOLDER] = "false" + + if arguments.run_dry_html: + arguments["do_run_dry"] = True + parameters.generated["html"] = os.path.abspath(arguments.run_dry_html) + + if arguments.run_dry_js: + arguments["do_run_dry"] = True + parameters.generated["js"] = os.path.abspath(arguments.run_dry_js) + + if arguments.do_run_dry: + parameters.logger.info("Dry run. Server will ne be run.") + if arguments.run_dry_html: + parameters.logger.info(f"HTML file will be put in {arguments.run_dry_html}") + if arguments.run_dry_js: + parameters.logger.info(f"JS file will be put in {arguments.run_dry_js}") + + if arguments.peakforest.token: + config.token["value"] = arguments.peakforest.token + if not config.token.value: + raise ConfigException( + "No token provided. We will not be able to connect to peakforest." + ) + + if os.path.exists(arguments.input): + single_file = True + file_paths = [arguments.input] + else: + path_list = arguments.input.split(',') + if all(map(os.path.exists, path_list)): + single_file = False + file_paths = path_list + else: + raise ValueError( + f"Some files cannot be found: " + + ', '.join( + path for path in path_list + if not os.path.exists(path) + ) + ) + arguments["input"] = list(map(os.path.abspath, file_paths)) + + if single_file: + arguments["name"] = [arguments.name] + arguments["raw_metadata"] = [arguments.raw_metadata] + parameters.logger.info(f"Single file processing: {arguments.input}") + else: + parameters.logger.info(f"Multiple file processing:") + arguments["raw_metadata"] = arguments.raw_metadata.split( + arguments.raw_metadata_sep + ) + if not arguments.name: + arguments["name"] = arguments["raw_metadata"] + else: + arguments["name"] = arguments.name.split(',') + for i in range(len(arguments.name)): + parameters.logger.info(f" - file: {arguments.input[i]}") + parameters.logger.info(f" - name: {arguments.name[i]}") + parameters.logger.info(f" - metadata: {arguments.raw_metadata[i]}") + parameters.logger.info(f" ") + if ( + len(arguments.name) != len(arguments.raw_metadata) + or len(arguments.name) != len(arguments.input) + ): + raise ValueError( + "name, raw_metadata and input parameters have different lengths: \n" + f"input is {len(arguments.input)} elements long, " + f"raw_metadata is {len(arguments.raw_metadata)} elements long " + f"and name is {len(arguments.name)} elements long." + ) + if arguments.spectrum_type == "LC_MS": + arguments["scan_type"] = "ms" + elif arguments.spectrum_type == "LC_MSMS": + arguments["scan_type"] = "ms2" + if arguments.method == "test": + if arguments.spectrum_type == "LC_MS": + arguments["method"] = "cf_pfem_urine_qtof" + else: + arguments["method"] = "cf_pfem_urine_method1_qtof-msms" + if arguments["sample_type"] == COMPOUND_MIX: + check_mix_compound_files(parameters) + more_info_in_logs(parameters) + return parameters + +def check_mix_compound_files(parameters): + arguments = parameters.arguments + try: + numbarz = [ + list(map(int, os.path.basename(metadata).split("_", 1)[0].split("-"))) + for metadata in arguments.raw_metadata + ] + except ValueError: + parameters.logger.error( + "Metadata/file names does not start with `[0-9]+-[0-9]+_.*` . " + "This is necessary in the case of compounds mix." + ) + sys.exit(-1) + runs, samples = zip(*numbarz) + if not all(runs[0] == i for i in runs[1:]): + parameters.logger.error( + "Run numbers in metadata/file names are not identical. " + "You mixed some files." + ) + sys.exit(-1) + length = len(samples) + if list(sorted(samples)) != list(range(1, length+1)): + if not all(samples.count(i) == 1 for i in samples): + parameters.logger.error("Some samples are duplicated. ") + else: + parameters.logger.error("Some samples files are missing. ") + sys.exit(-1) + +def more_info_in_logs(config): + arguments = config.arguments + if arguments.embed_js: + config.logger.info(f"JS will be embed in HTML page to form a HTML bundle.") + else: + config.logger.info(f"JS are separated files, needed to be served.") + config.logger.info(f"Choosen parameters:") + config.logger.info(f" - method: {arguments.method}") + config.logger.info(f" - peakforest instance: {arguments.peakforest.url}") + config.logger.info(f" - polarity instance: {arguments.polarity}") + config.logger.info(f" - spectrum type: {arguments.spectrum_type}") + config.logger.info(f" - scan type: {arguments.scan_type}") + config.logger.info(f" - produce JSON: {arguments.produce_json}") + config.logger.info(f" - sample type: {arguments.sample_type}") + +def process_all_files(config): + """ + for each file and its metadata, read and process them, + then fills the meta html template file with the whole result. + """ + arguments = config.arguments + extra_defaults = [ + process_fragnot_metadata(metadata, config) + for metadata in arguments.raw_metadata + ] + for i, name in enumerate(arguments.name): + extra_defaults[i]["name"] = name + + if not extra_defaults: + extra_defaults = [{}] * len(arguments.input) + + index = 0 + for input_path, extra_default in zip(arguments.input, extra_defaults): + config.logger.info(f"Processing file at {input_path}...") + curent_defaults = arguments.copy() + curent_defaults.update(extra_default) + if config.arguments.verbose: + config.logger.info( + "[VERBOSE] Defaults for curent file: " + + ';'.join(f"{key}={value}" for key, value in curent_defaults.items()) + ) + tsv_content, tsv_data_extractor = read_input(input_path, config) + index = process_tsv( + tsv_content, + tsv_data_extractor, + config, + defaults_data = curent_defaults, + index = index+1, + ) + if arguments.embed_js: + config.logger.info(f"Embeding JS in HTML file... ") + for index in range(len(config.tab_list)): + config.placeholders[EMBED_JS_PLACEHOLDER] += "<script type='text/javascript'>" + with open(f"add-one-spectrum-{index+1}.js") as js_file: + config.placeholders[EMBED_JS_PLACEHOLDER] += js_file.read() + config.placeholders[EMBED_JS_PLACEHOLDER] += "</script>" + config.placeholders[EMBED_JS_PLACEHOLDER] += "\n" + config.logger.info(f" - add-one-spectrum-{index+1}.js embed.") + config.placeholders[TAB_LIST_PLACEHOLDER] = "\n".join(config.tab_list) + else: + config.placeholders[EMBED_JS_PLACEHOLDER] += "<script type='text/javascript'>" + config.placeholders[EMBED_JS_PLACEHOLDER] += "</script>" + config.placeholders[EMBED_JS_PLACEHOLDER] += "\n".join( + [""] + [ + " "*12 + f"<script src=\"./add-one-spectrum-{index+1}.js\"></script>" + for index in range(len(config.tab_list)) + ] + ) + config.placeholders[EMBED_JS_PLACEHOLDER] += "\n" + config.placeholders[TAB_LIST_PLACEHOLDER] = "\n".join(config.tab_list) + + fill_template("meta_template", "pf_path", config) + +def fill_template( + template_name, + output_name, + config, + additional_placeholders=dict() +): + """ + Fills a template, replaces the placeholders. + Either outputs the result in a given file, or returns it if path is none. + """ + template_path = config[template_name] + config.logger.debug(f"Filling template {template_name} at {template_path}...") + with open(template_path) as template_file: + template_content = template_file.read() + placeholders = config.placeholders.copy() + placeholders.update(additional_placeholders) + for placeholder, replacement in placeholders.items(): + if not placeholder.startswith(config.templates.placeholders.start): + placeholder = placeholder.join(( + config.templates.placeholders.start, + config.templates.placeholders.stop + )) + template_content = template_content.replace(placeholder, replacement) + if output_name is None: + config.logger.debug(f"Returning template content") + return template_content + output_path = config[output_name] + if "{{ index }}" in output_path: + index_value = additional_placeholders["{{ index }}"] + config.logger.debug(f"Changing index value for {index_value}") + output_path = output_path.replace("{{ index }}", index_value) + config.logger.debug(f"Full output path {output_path}") + with open(output_path, "w") as output_file: + output_file.write(template_content) + +def read_input(input_path, config): + """ + reads a tsv file and determin its processor, based on its header. + """ + with open(input_path) as input_file: + config.logger.info(f"Reading {input_path}...") + tsv_file = csv.reader(input_file, delimiter='\t') + header = next(tsv_file) + tsv_file = list(tsv_file) + config.logger.info(f"Header is: {', '.join(header)}") + if header == list(FRAGNOT_HEADER): + config.logger.info(f"Fragnot recognized.") + processor = fragnot_extractor + return uniformize_fragnot(tsv_file, header), processor + else: + config.logger.info(f"MS2Snoop recognized.") + processor = ms2snoop_extractor + return uniformize_ms2snoop(tsv_file, header), processor + +def uniformize_fragnot(content, header): + """ + sorts fragnot data so they appear always in the same order + """ + return sorted(content, key=lambda x:(float(x[0]), float(x[4]))) + +def uniformize_ms2snoop(content, header): + """ + sorts ms2snoop data so they appear always in the same order + """ + return sorted(content, key=lambda x:(x[0], float(x[4]))) + +def process_fragnot_metadata(raw_metadata, config): + """ + Tries to extract informations from the metadata provided by fragnot + files names. + Heavily based on regex defined in conf file. + """ + regex = config.regex.copy() + del regex["values"] + result = {} + config.logger.info(f"Extracting info from {raw_metadata}...") + count = 0 + for name, expression in regex.items(): + if (match := re.search(expression, raw_metadata)): + result[name] = match[name] + count += 1 + did = "+ did" + else: + did = "- did not" + if config.arguments.verbose: + config.logger.info(f" {did} match {expression}") + config.logger.info(f"{count} useful informations extracted.") + return result + +def process_tsv( + tsv_content, + tsv_data_extractor, + config, + defaults_data={}, + index=1 +): + """ + processes one tsv file, containing one or multiple compounds. + Creation of the peak table for each compound + """ + tsv_content = list(tsv_content) + curent_name, ms_data = get_ms_data( + tsv_content[0], + tsv_data_extractor, + defaults_data, + config + ) + _, second_ms_data = get_ms_data( + tsv_content[1], + tsv_data_extractor, + defaults_data, + config + ) + ms_peak_table = [] + config.logger.info(f"Processing compound {curent_name}...") + + for line in tsv_content: + name, new_ms_data = get_ms_data(line, tsv_data_extractor, defaults_data, config) + if name != curent_name: + new_compound(curent_name, index, ms_data, config, ms_peak_table) + curent_name = name + index += 1 + config.logger.info(f"Processing compound {curent_name}...") + ms_peak_table = [] + ms_data = new_ms_data + ms_peak_table.append( + ", ".join( + f'"{value}"' if value not in ("na", "NA") + else '""' + for value in ( + ms_data["fragment_mz"], + ms_data["abs_intensity"], + ms_data["rel_intensity"], + ms_data["ppm"], + ms_data["composition"], + ms_data["fragment"], + str(ms_data["valid_corelation"] == "TRUE").lower(), + "true" if ms_data.get("correlation") == "1" else "false" + ) + ) + ) + new_compound(curent_name, index, ms_data, config, ms_peak_table) + return index + +def get_ms_data(line, extractor, defaults, config): + ms_data = defaults.copy() + ms_data.update(extractor(config, *line)) + return ms_data["name"], ms_data + +def new_compound(name, index, ms_data, config, ms_peak_table): + """ + aggregates informations to form the peak table, + adds the compound to the tab list, + creates the js file for this tab + """ + if len([x for x in ms_peak_table if x.split(", ")[7] == "\"true\""]) > 1: + for i in range(len(ms_peak_table)): + ms_peak_table[i] = ", ".join( + ms_peak_table[i].split(", ")[:-1] + [", \"false\""] + ) + config.placeholders[MS_PEAK_VALUES_PLACEHOLDER] = f"""[ + {','.join('['+line+']' for line in ms_peak_table)} + ]""" + tab_list = fill_template( + "tab_list_template", + None, + config, { + COMPOUND_NAME_PLACEHOLDER: name, + TAB_INDEX_PLACEHOLDER: str(index), + }) + config.tab_list.append(tab_list) + create_js_file(index, ms_data, config) + config.placeholders[ADD_SPECTRUM_FORM] += fill_template( + "form_template", + None, + config, + {TAB_INDEX_PLACEHOLDER: str(index)}, + ) + if index == 1: + config.placeholders[ACTIVE_TAB_PLACEHOLDER] = "" + +def fragnot_extractor(config, *line): + """ + Fragnot processor - extracts one fragnot line of content and + produces a uniformised output. + """ + fragnot_data = { + FRAGNOT_HEADER[header]: line[i].strip() + for i, header in enumerate(FRAGNOT_HEADER) + } + fragnot_data["composition"] = "unknown" + fragnot_data["valid_corelation"] = config.arguments.validation + return fragnot_data + +def ms2snoop_extractor(config, *line): + """ + Fragnot processor - extracts one ms2snoop line of content and + produces a uniformised output. + """ + ms2snoop_data = { + header: MS_2_SNOOP_HEADER[header](line[i]) + for i, header in enumerate(MS_2_SNOOP_HEADER) + } + return ms2snoop_data + +def create_js_file(index, ms_data, config): + """ + fills the js template file for one tab (compound) + """ + if (method := ms_data["method"]): + method = f'"{method}"' + else: + method = "null" + if config.arguments.verbose: + config.logger.info( + "[VERBOSE] " + + ';'.join(f"{key}={value}" for key, value in ms_data.items()) + ) + fill_template( + "js_template", + "js_file", + config, + { + TAB_INDEX_PLACEHOLDER: str(index), + "INCHIKEY_PLACEHOLDER": ms_data["inchikey"], + "DEFAULT_DATA": f"""{{ + name: "{ms_data["name"]}", + inchikey: "{ms_data["inchikey"]}", + method: {method}, + spectrum_type: "{ms_data["spectrum_type"]}", + scan_type: "{ms_data["scan_type"]}", + polarity: "{ms_data["polarity"]}", + resolution: "{ms_data["resolution"]}", + sample_type: "{ms_data["sample_type"]}", + }}""", + "{{ index }}": str(index) + }, + ) + +def prepare_workplace(config): + """ + prepares the directory we will work in. + """ + if config.workdir.work_in_tmp: + os.chdir(config.workdir.tmp_dir) + config.logger.info(f"Moving to {os.getcwd()}") + if config.workdir.generate_in_tmp: + gen_dir = config.workdir.tmp_dir + else: + gen_dir = tempfile.gettempdir() + config.workdir.tmp_dir = gen_dir + shutil.copy(os.path.join(config["root_dir"], "common.js"), gen_dir) + config.logger.info(f"Outputs will be generated in {config.workdir.tmp_dir}") + return gen_dir + +def get_hander_for(directory, config): + """ + generates the handler class for the directory we provide. + """ + config["json_result"] = [{}] * len(config.tab_list) + + class HTTPHandler(http.server.SimpleHTTPRequestHandler): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs, directory=directory) + + def do_POST(self): + content_length = int(self.headers.get("Content-Length")) + json_bytes = self.rfile.read(content_length).decode("utf-8") + json_list = json.loads(json_bytes) + for i, obj in enumerate(json_list): + print(obj) + if obj: + config["json_result"][i] = obj + save_json(config) + self.send_head() + self.wfile.write(json_bytes.encode("utf-8")) + return + + def do_GET(self): + if self.path == "/quit": + self.path = "/" + super().do_GET() + exit(0) + self.path = os.path.join(directory, self.path) + if self.path == "/": + self.path = config.generated.html + return super().do_GET() + + return HTTPHandler + + +def save_json(config): + json_string = json.dumps(config["json_result"]) + print(json_string) + with open(config.arguments.output_json, "w") as json_file: + json_file.write(json_string) + +def run_server(config): + """ + prepare and runs the server, with the handler for the given directory + """ + ip, port = config.network.ip, config.network.port + config.logger.debug(f"IP and port: {ip}:{port}") + socketserver.TCPServer.allow_reuse_address = True + config.logger.debug(f"Allow reuse adress.") + handler = get_hander_for(config.workdir.tmp_dir, config) + config.logger.debug(f"Created server handler for {config.workdir.tmp_dir}") + config.logger.debug( + f"Content of directory {config.workdir.tmp_dir}: " + + "\n" + + '\n'.join(sorted( + f" - {path}"for path in os.listdir(config.workdir.tmp_dir) + )) + ) + config.logger.debug(f"Creating TCP server...") + server = socketserver.TCPServer((ip, port), handler) + if ip == "0.0.0.0": + displayed_ip = "localhost" + else: + displayed_ip = ip + config.logger.debug(f"Serving...") + print() + print(f"http://{displayed_ip}:{port}") + server.serve_forever() + +def get_logger(config, dummy=False): + dummy_log = lambda msg:dummy and config.logger.info(msg) + arguments = config.arguments + if not dummy: + logger = logging.getLogger(__file__) + if arguments.debug: + dummy_log(f"Output debug info.") + level = logging.DEBUG + else: + level = logging.INFO + if not dummy: + logger.setLevel(level) + formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(message)s" + ) + if arguments.logging.std == "err": + dummy_log(f"Handler added to output logs in stderr.") + if not dummy: + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(level) + handler.setFormatter(formatter) + logger.addHandler(handler) + elif arguments.logging.std == "out": + dummy_log(f"Handler added to output logs in stdout.") + if not dummy: + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(level) + handler.setFormatter(formatter) + logger.addHandler(handler) + else: + dummy_log(f"Logs will not be output in stderr not stdout.") + if (path := arguments.logging.file.path): + dummy_log(f"Add log file: {arguments.logging.file.path}.") + if not arguments.logging.file.append: + dummy_log(f"Log file content cleaned.") + with open(path, "w"):pass + else: + dummy_log(f"Logs appended to log file.") + if not dummy: + file_handler = logging.FileHandler(filename=path) + file_handler.setLevel(level) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + if not dummy: + config["logger"] = logger + starting_sequence(logger) + get_logger(config, dummy=True) + return logger + +def starting_sequence(logger): + logger.info("*bip* *bop*") + logger.info("starting...") + logger.info("program...") + logger.info("MS2PF is running...") + logger.info("*bip* *bop* am a robot") + atexit.register(stoping_sequence, logger) + +def stoping_sequence(logger): + logger.info("*bip* *bop*") + logger.info("ending...") + logger.info("program...") + logger.info("MS2PF is shuting down...") + logger.info("...robot") + logger.info("*bip* *bop*") + logger.info("shutdown") + logger.info("...") + +if __name__ == "__main__": + + base_config = parse_config() + config = parse_parameters(base_config) + + """ + The config contains result of the parsed config file. + """ + arguments = config.arguments + + config.logger.info(f"Starting MS2PF from {os.getcwd()}") + + gen_dir = prepare_workplace(config) + + config["pf_path"] = os.path.join(gen_dir, config.generated.html) + config.logger.info(f"HTML output file will be {config.pf_path}") + config["js_file"] = os.path.join(gen_dir, config.generated.js) + config.logger.info(f"JS output files will like {config.js_file}") + config.placeholders["PF_URL_PLACEHOLDER"] = arguments.peakforest.url + config.placeholders["PF_TOKEN_PLACEHOLDER"] = ( + arguments.peakforest.token + or config.token.value + ) + if (token := config.placeholders.PF_TOKEN_PLACEHOLDER): + config.logger.info(f"Using a token for authentification - length: {len(token)}") + else: + config.logger.info(f"No token provided for peakforest authentification.") + + process_all_files(config) + + if not arguments.do_run_dry: + config.logger.debug(f"Running the server.") + if arguments.firefox or arguments.chromium: + config.logger.debug(f"Running the server.") + import threading + import time + if arguments.firefox: + browser = "firefox" + else: + browser = "chromium" + if (ip := config.network.ip) == "0.0.0.0": + ip = "localhost" + adress = f"http://{ip}:{config.network.port}" + threading.Thread( + target=lambda:( + time.sleep(1), + os.system(f"{browser} {adress}") + ), + daemon=True + ).start() + run_server(config) + else: + config.logger.debug(f"Server not run.")