Mercurial > repos > galaxyp > maxquant
diff mqparam.py @ 4:dcd39bcc7481 draft
"planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/maxquant commit da342a782ccc391b87fb4fead956b7b3cbd21258"
author | galaxyp |
---|---|
date | Sat, 11 Apr 2020 11:49:19 -0400 |
parents | 175e062b6a17 |
children | 2133b0be850a |
line wrap: on
line diff
--- a/mqparam.py Thu Aug 15 08:09:00 2019 -0400 +++ b/mqparam.py Sat Apr 11 11:49:19 2020 -0400 @@ -1,113 +1,174 @@ """ Create a project-specific MaxQuant parameter file. - -TODO: check validity of parsed experimental design template - add support for parameter groups - add reporter ion MS2 - add label free quantification - don't hardcode parse rules for fasta files - -Author: Damian Glaetzer <d.glaetzer@mailbox.org> """ +import copy import ntpath import os import re +import yaml import xml.etree.ElementTree as ET from itertools import zip_longest from xml.dom import minidom +def et_add_child(el, name, text, attrib=None): + "Add a child element to an xml.etree.ElementTree.Element" + child = ET.SubElement(el, name, attrib=attrib if attrib else {}) + child.text = str(text) + return child + + +class ParamGroup: + """Represents one parameter Group + """ + + def __init__(self, root): + """Initialize with its xml.etree.ElementTree root Element. + """ + self._root = copy.deepcopy(root) + + def set_list_param(self, key, vals): + """Set a list parameter. + """ + node = self._root.find(key) + if node is None: + raise ValueError('Element {} not found in parameter file' + .format(key)) + node.clear() + node.tag = key + for e in vals: + et_add_child(node, name='string', text=e) + + def set_simple_param(self, key, value): + """Set a simple parameter. + """ + node = self._root.find(key) + if node is None: + raise ValueError('Element {} not found in parameter file' + .format(key)) + node.text = str(value) + + def set_silac(self, light_labels, medium_labels, heavy_labels): + """Set label modifications. + """ + if medium_labels and not (heavy_labels or light_labels): # medium omly with heavy and light + raise Exception("Incorrect SILAC specification. Use medium only together with light and heavy labels.") + multiplicity = 3 if medium_labels else 2 if heavy_labels else 1 + max_label = str(max(len(light_labels) if light_labels else 0, + len(medium_labels) if medium_labels else 0, + len(heavy_labels) if heavy_labels else 0)) + self._root.find('multiplicity').text = str(multiplicity) + self._root.find('maxLabeledAa').text = max_label + node = self._root.find('labelMods') + node[0].text = ';'.join(light_labels) if light_labels else '' + if multiplicity == 3: + et_add_child(node, name='string', text=';'.join(medium_labels)) + if multiplicity > 1: + et_add_child(node, name='string', + text=';'.join(heavy_labels) if heavy_labels else '') + + def set_isobaric_label(self, internalLabel, terminalLabel, + cm2, cm1, cp1, cp2, tmtLike): + """Add isobaric label info. + Args: + internalLabel: string + terminalLabel: string + cm2: (float) correction factor + cm1: (float) correction factor + cp1: (float) correction factor + cp2: (float) correction factor + tmtLike: bool or string + Returns: + None + """ + iso_labels_node = self._root.find('isobaricLabels') + label = et_add_child(iso_labels_node, 'IsobaricLabelInfo', '') + et_add_child(label, 'internalLabel', internalLabel) + et_add_child(label, 'terminalLabel', terminalLabel) + for num, factor in (('M2', cm2), ('M1', cm1), ('P1', cp1), ('P2', cp2)): + et_add_child(label, 'correctionFactor' + num, + str(float(factor) if factor % 1 else int(factor))) + et_add_child(label, 'tmtLike', str(tmtLike)) + + class MQParam: """Represents a mqpar.xml and provides methods to modify some of its parameters. """ - fasta_template = """<FastaFileInfo> - <fastaFilePath></fastaFilePath> - <identifierParseRule></identifierParseRule> - <descriptionParseRule></descriptionParseRule> - <taxonomyParseRule></taxonomyParseRule> - <variationParseRule></variationParseRule> - <modificationParseRule></modificationParseRule> - <taxonomyId></taxonomyId> - </FastaFileInfo>""" - - def __init__(self, mqpar_out, mqpar_in, exp_design, - substitution_rx=r'[^\s\S]'): # no sub by default + def __init__(self, mqpar_in, exp_design=None, yaml=None, substitution_rx=r'[^\s\S]'): # no sub by default """Initialize MQParam class. mqpar_in can either be a template or a already suitable mqpar file. - >>> t = MQParam("test", './test-data/template.xml', None) - >>> t.root.tag - 'MaxQuantParams' - >>> (t.root.find('maxQuantVersion')).text - '1.6.3.4' + Args: + mqpar_in: a template parameter file + exp_design: a experimental design template (see MaxQuant documentation), + can be None + substitution_rx: a regular expression for replacements in the file names. + It is applied before comparing input file names (e.g. from the exp. design) """ - self.orig_mqpar = mqpar_in self.exp_design = exp_design - self.mqpar_out = mqpar_out - self.root = ET.parse(mqpar_in).getroot() - self.version = self.root.find('maxQuantVersion').text + self._root = ET.parse(mqpar_in).getroot() + self.version = self._root.find('maxQuantVersion').text # regex for substitution of certain file name characters self.substitution_rx = substitution_rx - - @staticmethod - def _add_child(el, name, text, attrib=None): - """Add a child element to an element. + self.pg_node = copy.deepcopy(self._root.find('parameterGroups')[0]) + self._paramGroups = [] + self.fasta_file_node = copy.deepcopy(self._root.find('fastaFiles')[0]) + if yaml: + self._from_yaml(yaml) - >>> t = MQParam("test", './test-data/template.xml', None) - >>> MQParam._add_child(t.root, "test", "test") - >>> t.root.find('test').text == "test" - True + def __getitem__(self, index): + """Return paramGroup if indexed with integer, else try to find + matching Element in XML root and return its text or None. """ - - child = ET.SubElement(el, name, attrib=attrib if attrib else {}) - child.text = str(text) + try: + return self._paramGroups[index] + except TypeError: + ret = self._root.find(index) + return ret.text if ret is not None else None @staticmethod def _check_validity(design, len_infiles): - "Perform some checks on the exp. design template" + """Perform some checks on the exp. design template""" design_len = len(design['Name']) + # 'Name' can be None, we need at least len_infiles valid entries match = len(list(filter(lambda x: bool(x), design['Name']))) if match < len_infiles: - raise Exception("Error parsing experimental design template: " + - "Found only {} matching entries ".format(design_len) + - "for {} input files".format(len_infiles)) + raise Exception(' '.join(["Error parsing experimental design template:", + "Found only {} matching entries".format(match), + "for {} input files".format(len_infiles)])) for i in range(0, design_len): - msg = "Error in line " + str(i + 2) + " of experimental design: " - if not (design['Name'][i] and design['Experiment'][i]): - raise Exception(msg + " Name or Experiment is empty.") + msg = "(in line " + str(i + 2) + " of experimental design) " + if not design['Experiment'][i]: + raise ValueError(msg + " Experiment is empty.") if design['PTM'][i].lower() not in ('true', 'false'): - raise Exception(msg + "Defines invalid PTM value, " + - "should be 'True' or 'False'.") + raise ValueError(msg + "Defines invalid PTM value, should be 'True' or 'False'.") try: int(design['Fraction'][i]) except ValueError as e: - raise Exception(msg + str(e)) + raise ValueError(msg + str(e)) - def _make_exp_design(self, infiles): - """Create a dict representing an experimental design from - an experimental design template and a list of input files. + def _make_exp_design(self, groups, files): + """Create a dict representing an experimental design from an + experimental design template and a list input files. If the experimental design template is None, create a default - design with one experiment for each input file, no fractions and - parameter group 0 for all files. - >>> t2 = MQParam("test", './test-data/template.xml', \ - './test-data/two/exp_design_template.txt') - >>> design = t2._make_exp_design(['./test-data/BSA_min_21.mzXML', \ - './test-data/BSA_min_22.mzXML']) - >>> design['Name'] - ['./test-data/BSA_min_21.mzXML', './test-data/BSA_min_22.mzXML'] - >>> design['Fraction'] - ['1', '2'] + design with one experiment for each input file and no fractions + for all files. + Args: + files: list of input file paths + groups: list of parameter group indices + Returns: + dict: The (complete) experimental design template """ - - design = {s: [] for s in ("Name", "PTM", "Fraction", "Experiment")} + design = {s: [] for s in ("Name", "PTM", "Fraction", "Experiment", "paramGroup")} if not self.exp_design: - design["Name"] = infiles - design["Fraction"] = ('32767',) * len(infiles) - design["Experiment"] = [os.path.split(f)[1] for f in infiles] - design["PTM"] = ('False',) * len(infiles) + design["Name"] = files + design["Fraction"] = ('32767',) * len(files) + design["Experiment"] = [os.path.split(f)[1] for f in files] + design["PTM"] = ('False',) * len(files) + design["paramGroup"] = groups else: with open(self.exp_design) as design_file: index_line = design_file.readline().strip() @@ -116,25 +177,22 @@ if i in design: index.append(i) else: - raise Exception("Invalid column index in experimental" - + " design template: {}".format(i)) - + raise Exception("Invalid column index in experimental design template: {}".format(i)) for line in design_file: row = line.strip().split('\t') for e, i in zip_longest(row, index): - if i == "Fraction" and e == '': - e = 32767 + if i == "Fraction" and not e: + e = '32767' elif i == "PTM" and not e: e = 'False' design[i].append(e) - - # map infiles to names in exp. design template + # map files to names in exp. design template names = [] names_to_paths = {} # strip path and extension - for f in infiles: + for f in files: b = os.path.basename(f) - basename = b[:-6] if b.endswith('.mzXML') else b[:-11] + basename = b[:-11] if b.lower().endswith('.thermo.raw') else b.rsplit('.', maxsplit=1)[0] names_to_paths[basename] = f for name in design['Name']: # same substitution as in maxquant.xml, @@ -144,236 +202,157 @@ else None) # replace orig. file names with matching links to galaxy datasets design['Name'] = names - MQParam._check_validity(design, len(infiles)) - + design['paramGroup'] = groups + MQParam._check_validity(design, len(files)) return design - def add_infiles(self, infiles, interactive): + def add_infiles(self, infiles): """Add a list of raw/mzxml files to the mqpar.xml. If experimental design template was specified, modify other parameters accordingly. The files must be specified as absolute paths for maxquant to find them. - >>> t1 = MQParam("test", './test-data/template.xml', None) - >>> t1.add_infiles(('test1', ), True) - >>> t1.root.find("filePaths")[0].text - 'test1' - >>> t1.root.find("fractions")[0].text - '32767' - >>> len(t1.root.find("fractions")) - 1 - >>> t2 = MQParam("test", './test-data/template.xml', \ - './test-data/exp_design_test.txt') - >>> t2.add_infiles(('test-data/QEplus021874.thermo.raw', \ - 'test-data/QEplus021876.thermo.raw'), True) - >>> len(t2.root.find("filePaths")) - 2 - >>> t2.root.find("filePaths")[1].text - 'test-data/QEplus021876.thermo.raw' - >>> t2.root.find("experiments")[1].text - '2' - >>> t2.root.find("fractions")[0].text - '3' + Also add parameter Groups. + Args: + infiles: a list of infile lists. first dimension denotes the + parameter group. + Returns: + None """ - - # Create experimental design for interactive mode. - # In non-interactive mode only filepaths are modified, but - # their order from the original mqpar must be kept. - if interactive: - index = range(len(infiles)) - nodenames = ('filePaths', 'experiments', 'fractions', - 'ptms', 'paramGroupIndices', 'referenceChannel') - design = self._make_exp_design(infiles) - else: - index = [-1] * len(infiles) - # kind of a BUG: fails if filename starts with '.' - infilenames = [os.path.basename(f).split('.')[0] for f in infiles] - i = 0 - for child in self.root.find('filePaths'): - # either windows or posix path - win = ntpath.basename(child.text) - posix = os.path.basename(child.text) - basename = win if len(win) < len(posix) else posix - basename_with_sub = re.sub(self.substitution_rx, '_', - basename.split('.')[0]) - # match infiles to their names in mqpar.xml, - # ignore files missing in mqpar.xml - if basename_with_sub in infilenames: - index[i] = infilenames.index(basename_with_sub) - i += 1 - else: - raise ValueError("no matching infile found for " - + child.text) - - nodenames = ('filePaths', ) - design = {'Name': infiles} - + groups, files = zip(*[(num, f) for num, l in enumerate(infiles) for f in l]) + self._paramGroups = [ParamGroup(self.pg_node) for i in range(len(infiles))] + nodenames = ('filePaths', 'experiments', 'fractions', + 'ptms', 'paramGroupIndices', 'referenceChannel') + design = self._make_exp_design(groups, files) # Get parent nodes from document nodes = dict() for nodename in nodenames: - node = self.root.find(nodename) + node = self._root.find(nodename) if node is None: raise ValueError('Element {} not found in parameter file' .format(nodename)) nodes[nodename] = node node.clear() node.tag = nodename + # Append sub-elements to nodes (one per file) + for i, name in enumerate(design['Name']): + if name: + et_add_child(nodes['filePaths'], 'string', name) + et_add_child(nodes['experiments'], 'string', + design['Experiment'][i]) + et_add_child(nodes['fractions'], 'short', + design['Fraction'][i]) + et_add_child(nodes['ptms'], 'boolean', + design['PTM'][i]) + et_add_child(nodes['paramGroupIndices'], 'int', + design['paramGroup'][i]) + et_add_child(nodes['referenceChannel'], 'string', '') - # Append sub-elements to nodes (one per file) - for i in index: - if i > -1 and design['Name'][i]: - MQParam._add_child(nodes['filePaths'], 'string', - design['Name'][i]) - if interactive: - MQParam._add_child(nodes['experiments'], 'string', - design['Experiment'][i]) - MQParam._add_child(nodes['fractions'], 'short', - design['Fraction'][i]) - MQParam._add_child(nodes['ptms'], 'boolean', - design['PTM'][i]) - MQParam._add_child(nodes['paramGroupIndices'], 'int', 0) - MQParam._add_child(nodes['referenceChannel'], 'string', '') + def translate(self, infiles): + """Map a list of given infiles to the files specified in the parameter file. + Needed for the mqpar upload in galaxy. Removes the path and then tries + to match the files. + Args: + infiles: list or tuple of the input + Returns: + None + """ + # kind of a BUG: fails if filename starts with '.' + infilenames = [os.path.basename(f).split('.')[0] for f in infiles] + filesNode = self._root.find('filePaths') + files_from_mqpar = [e.text for e in filesNode] + filesNode.clear() + filesNode.tag = 'filePaths' + for f in files_from_mqpar: + # either windows or posix path + win = ntpath.basename(f) + posix = os.path.basename(f) + basename = win if len(win) < len(posix) else posix + basename_with_sub = re.sub(self.substitution_rx, '_', + basename.split('.')[0]) + # match infiles to their names in mqpar.xml, + # ignore files missing in mqpar.xml + if basename_with_sub in infilenames: + i = infilenames.index(basename_with_sub) + et_add_child(filesNode, 'string', infiles[i]) + else: + raise ValueError("no matching infile found for " + f) - def add_fasta_files(self, files, - identifier=r'>([^\s]*)', - description=r'>(.*)'): + def add_fasta_files(self, files, parse_rules={}): """Add fasta file groups. - >>> t = MQParam('test', './test-data/template.xml', None) - >>> t.add_fasta_files(('test1', 'test2')) - >>> len(t.root.find('fastaFiles')) - 2 - >>> t.root.find('fastaFiles')[0].find("fastaFilePath").text - 'test1' + Args: + files: (list) of fasta file paths + parseRules: (dict) the parse rules as (tag, text)-pairs + Returns: + None """ - fasta_node = self.root.find("fastaFiles") + fasta_node = self._root.find('fastaFiles') fasta_node.clear() - fasta_node.tag = "fastaFiles" - - for index in range(len(files)): - filepath = '<fastaFilePath>' + files[index] - identifier = identifier.replace('<', '<') - description = description.replace('<', '<') - fasta = self.fasta_template.replace('<fastaFilePath>', filepath) - fasta = fasta.replace('<identifierParseRule>', - '<identifierParseRule>' + identifier) - fasta = fasta.replace('<descriptionParseRule>', - '<descriptionParseRule>' + description) - ff_node = self.root.find('.fastaFiles') - fastaentry = ET.fromstring(fasta) - ff_node.append(fastaentry) + for f in files: + fasta_node.append(copy.deepcopy(self.fasta_file_node)) + fasta_node[-1].find('fastaFilePath').text = f + for rule in parse_rules: + fasta_node[-1].find(rule).text = parse_rules[rule] def set_simple_param(self, key, value): """Set a simple parameter. - >>> t = MQParam(None, './test-data/template.xml', None) - >>> t.set_simple_param('min_unique_pep', 4) - >>> t.root.find('.minUniquePeptides').text - '4' + Args: + key: (string) XML tag of the parameter + value: the text of the parameter XML node + Returns: + None """ - # map simple params to their node in the xml tree - simple_params = {'missed_cleavages': - '.parameterGroups/parameterGroup/maxMissedCleavages', - 'min_unique_pep': '.minUniquePeptides', - 'num_threads': 'numThreads', - 'calc_peak_properties': '.calcPeakProperties', - 'write_mztab': 'writeMzTab', - 'min_peptide_len': 'minPepLen', - 'max_peptide_mass': 'maxPeptideMass', - 'match_between_runs': 'matchBetweenRuns', - 'ibaq': 'ibaq', # lfq global options - 'ibaq_log_fit': 'ibaqLogFit', - 'separate_lfq': 'separateLfq', - 'lfq_stabilize_large_ratios': - 'lfqStabilizeLargeRatios', - 'lfq_require_msms': 'lfqRequireMsms', - 'advanced_site_intensities': - 'advancedSiteIntensities', - 'lfq_mode': # lfq param group options - '.parameterGroups/parameterGroup/lfqMode', - 'lfq_skip_norm': - '.parameterGroups/parameterGroup/lfqSkipNorm', - 'lfq_min_edges_per_node': - '.parameterGroups/parameterGroup/lfqMinEdgesPerNode', - 'lfq_avg_edges_per_node': - '.parameterGroups/parameterGroup/lfqAvEdgesPerNode', - 'lfq_min_ratio_count': - '.parameterGroups/parameterGroup/lfqMinRatioCount'} - - if key in simple_params: - node = self.root.find(simple_params[key]) - if node is None: - raise ValueError('Element {} not found in parameter file' - .format(simple_params[key])) - node.text = str(value) - else: - raise ValueError("Parameter not found.") + node = self._root.find(key) + if node is None: + raise ValueError('Element {} not found in parameter file' + .format(key)) + node.text = str(value) - def set_silac(self, light_mods, medium_mods, heavy_mods): - """Set label modifications. - >>> t1 = MQParam('test', './test-data/template.xml', None) - >>> t1.set_silac(None, ('test1', 'test2'), None) - >>> t1.root.find('.parameterGroups/parameterGroup/maxLabeledAa').text - '2' - >>> t1.root.find('.parameterGroups/parameterGroup/multiplicity').text - '3' - >>> t1.root.find('.parameterGroups/parameterGroup/labelMods')[1].text - 'test1;test2' - >>> t1.root.find('.parameterGroups/parameterGroup/labelMods')[2].text - '' + def _from_yaml(self, conf): + """Read a yaml config file. + Args: + conf: (string) path to the yaml conf file + Returns: + None """ - multiplicity = 3 if medium_mods else 2 if heavy_mods else 1 - max_label = str(max(len(light_mods) if light_mods else 0, - len(medium_mods) if medium_mods else 0, - len(heavy_mods) if heavy_mods else 0)) - multiplicity_node = self.root.find('.parameterGroups/parameterGroup/' - + 'multiplicity') - multiplicity_node.text = str(multiplicity) - max_label_node = self.root.find('.parameterGroups/parameterGroup/' - + 'maxLabeledAa') - max_label_node.text = max_label - - node = self.root.find('.parameterGroups/parameterGroup/labelMods') - node[0].text = ';'.join(light_mods) if light_mods else '' - if multiplicity == 3: - MQParam._add_child(node, name='string', text=';'.join(medium_mods)) - if multiplicity > 1: - MQParam._add_child(node, name='string', - text=';'.join(heavy_mods) if heavy_mods else '') + with open(conf) as f: + conf_dict = yaml.safe_load(f.read()) + paramGroups = conf_dict.pop('paramGroups') + self.add_infiles([pg.pop('files') for pg in paramGroups]) + for i, pg in enumerate(paramGroups): + silac = pg.pop('labelMods', False) + if silac: + self[i].set_silac(*silac) + isobaricLabels = pg.pop('isobaricLabels', False) + if isobaricLabels: + for l in isobaricLabels: + self[i].set_isobaric_label(*l) + for el in ['fixedModifications', 'variableModifications', 'enzymes']: + lst = pg.pop(el, None) + if lst is not None: + self[i].set_list_param(el, lst) + for key in pg: + self[i].set_simple_param(key, pg[key]) + fastafiles = conf_dict.pop('fastaFiles', False) + if fastafiles: + self.add_fasta_files(fastafiles, parse_rules=conf_dict.pop('parseRules', {})) + else: + raise Exception('No fasta files provided.') + for key in conf_dict: + self.set_simple_param(key, conf_dict[key]) - def set_list_params(self, key, vals): - """Set a list parameter. - >>> t = MQParam(None, './test-data/template.xml', None) - >>> t.set_list_params('proteases', ('test 1', 'test 2')) - >>> len(t.root.find('.parameterGroups/parameterGroup/enzymes')) - 2 - >>> t.set_list_params('var_mods', ('Oxidation (M)', )) - >>> var_mods = '.parameterGroups/parameterGroup/variableModifications' - >>> t.root.find(var_mods)[0].text - 'Oxidation (M)' + def write(self, mqpar_out): + """Write pretty formatted xml parameter file. + Compose it from global parameters and parameter Groups. """ - - params = {'var_mods': - '.parameterGroups/parameterGroup/variableModifications', - 'fixed_mods': - '.parameterGroups/parameterGroup/fixedModifications', - 'proteases': - '.parameterGroups/parameterGroup/enzymes'} - - if key in params: - node = self.root.find(params[key]) - if node is None: - raise ValueError('Element {} not found in parameter file' - .format(params[key])) - node.clear() - node.tag = params[key].split('/')[-1] - for e in vals: - MQParam._add_child(node, name='string', text=e) - else: - raise ValueError("Parameter {} not found.".format(key)) - - def write(self): - rough_string = ET.tostring(self.root, 'utf-8', short_empty_elements=False) + if self._paramGroups: + pg_node = self._root.find('parameterGroups') + pg_node.remove(pg_node[0]) + for group in self._paramGroups: + pg_node.append(group._root) + rough_string = ET.tostring(self._root, 'utf-8', short_empty_elements=False) reparsed = minidom.parseString(rough_string) pretty = reparsed.toprettyxml(indent="\t") even_prettier = re.sub(r"\n\s+\n", r"\n", pretty) - with open(self.mqpar_out, 'w') as f: + with open(mqpar_out, 'w') as f: print(even_prettier, file=f)