Mercurial > repos > pieterlukasse > prims_metabolomics2
diff library_lookup.py @ 0:dffc38727496
initial commit
author | pieter.lukasse@wur.nl |
---|---|
date | Sat, 07 Feb 2015 22:02:00 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/library_lookup.py Sat Feb 07 22:02:00 2015 +0100 @@ -0,0 +1,327 @@ +''' +Logic for searching a Retention Index database file given output from NIST +''' +import match_library +import re +import sys +import csv + +__author__ = "Marcel Kempenaar" +__contact__ = "brs@nbic.nl" +__copyright__ = "Copyright, 2012, Netherlands Bioinformatics Centre" +__license__ = "MIT" + +def create_lookup_table(library_file, column_type_name, statphase): + ''' + Creates a dictionary holding the contents of the library to be searched + @param library_file: library to read + @param column_type_name: the columns type name + @param statphase: the columns stationary phase + ''' + (data, header) = match_library.read_library(library_file) + # Test for presence of required columns + if ('columntype' not in header or + 'columnphasetype' not in header or + 'cas' not in header): + raise IOError('Missing columns in ', library_file) + + column_type_column = header.index("columntype") + statphase_column = header.index("columnphasetype") + cas_column = header.index("cas") + + filtered_library = [line for line in data if line[column_type_column] == column_type_name + and line[statphase_column] == statphase] + lookup_dict = {} + for element in filtered_library: + # Here the cas_number is set to the numeric part of the cas_column value, so if the + # cas_column value is 'C1433' then cas_number will be '1433' + cas_number = str(re.findall(r'\d+', (element[cas_column]).strip())[0]) + try: + lookup_dict[cas_number].append(element) + except KeyError: + lookup_dict[cas_number] = [element] + return lookup_dict + + +def _preferred(hits, pref, ctype, polar, model, method): + ''' + Returns all entries in the lookup_dict that have the same column name, type and polarity + as given by the user, uses regression if selected given the model and method to use. The + regression is applied on the column with the best R-squared value in the model + @param hits: all entries in the lookup_dict for the given CAS number + @param pref: preferred GC-column, can be one or more names + @param ctype: column type (capillary etc.) + @param polar: polarity (polar / non-polar etc.) + @param model: data loaded from file containing regression models + @param method: supported regression method (i.e. poly(nomial) or linear) + ''' + match = [] + for column in pref: + for hit in hits: + if hit[4] == ctype and hit[5] == polar and hit[6] == column: + # Create copy of found hit since it will be altered downstream + match.extend(hit) + return match, False + + # No hit found for current CAS number, return if not performing regression + if not model: + return False, False + + # Perform regression + for column in pref: + if column not in model: + break + # Order regression candidates by R-squared value (last element) + order = sorted(model[column].items(), key=lambda col: col[1][-1]) + # Create list of regression candidate column names + regress_columns = list(reversed([column for (column, _) in order])) + # Names of available columns + available = [hit[6] for hit in hits] + + # TODO: combine Rsquared and number of datapoints to get the best regression match + ''' + # Iterate regression columns (in order) and retrieve their models + models = {} + for col in regress_columns: + if col in available: + hit = list(hits[available.index(col)]) + if hit[4] == ctype: + # models contains all model data including residuals [-2] and rsquared [-1] + models[pref[0]] = model[pref[0]][hit[6]] + # Get the combined maximum for residuals and rsquared + best_match = models[] + # Apply regression + if method == 'poly': + regressed = _apply_poly_regression(best_match, hit[6], float(hit[3]), model) + if regressed: + hit[3] = regressed + else: + return False, False + else: + hit[3] = _apply_linear_regression(best_match, hit[6], float(hit[3]), model) + match.extend(hit) + return match, hit[6] + ''' + + for col in regress_columns: + if col in available: + hit = list(hits[available.index(col)]) + if hit[4] == ctype: + # Perform regression using a column for which regression is possible + if method == 'poly': + # Polynomial is only possible within a set border, if the RI falls outside + # of this border, skip this lookup + regressed = _apply_poly_regression(pref[0], hit[6], float(hit[3]), model) + if regressed: + hit[3] = regressed + else: + return False, False + else: + hit[3] = _apply_linear_regression(pref[0], hit[6], float(hit[3]), model) + match.extend(hit) + return match, hit[6] + + return False, False + + + +def default_hit(row, cas_nr, compound_id): + ''' + This method will return a "default"/empty hit for cases where the + method _preferred() returns False (i.e. a RI could not be found + for the given cas nr, also not via regression. + ''' + return [ + #'CAS', + 'C' + cas_nr, + #'NAME', + '', + #'FORMULA', + '', + #'RI', + '0.0', + #'Column.type', + '', + #'Column.phase.type', + '', + #'Column.name', + '', + #'phase.coding', + ' ', + #'CAS_column.Name', + '', + #'Centrotype', -> NOTE THAT compound_id is not ALWAYS centrotype...depends on MsClust algorithm used...for now only one MsClust algorithm is used so it is not an issue, but this should be updated/corrected once that changes + compound_id, + #'Regression.Column.Name', + '', + #'min', + '', + #'max', + '', + #'nr.duplicates', + ''] + + +def format_result(lookup_dict, nist_tabular_filename, pref, ctype, polar, model, method): + ''' + Looks up the compounds in the library lookup table and formats the results + @param lookup_dict: dictionary containing the library to be searched + @param nist_tabular_filename: NIST output file to be matched + @param pref: (list of) column-name(s) to look for + @param ctype: column type of interest + @param polar: polarity of the used column + @param model: data loaded from file containing regression models + @param method: supported regression method (i.e. poly(nomial) or linear) + ''' + (nist_tabular_list, header_clean) = match_library.read_library(nist_tabular_filename) + # Retrieve indices of the CAS and compound_id columns (exit if not present) + try: + casi = header_clean.index("cas") + idi = header_clean.index("id") + except: + raise IOError("'CAS' or 'compound_id' not found in header of library file") + + data = [] + for row in nist_tabular_list: + casf = str(row[casi].replace('-', '').strip()) + compound_id = str(row[idi].split('-')[0]) + if casf in lookup_dict: + found_hit, regress = _preferred(lookup_dict[casf], pref, ctype, polar, model, method) + if found_hit: + # Keep cas nr as 'C'+ numeric part: + found_hit[0] = 'C' + casf + # Add compound id + found_hit.insert(9, compound_id) + # Add information on regression process + found_hit.insert(10, regress if regress else 'None') + # Replace column index references with actual number of duplicates + dups = len(found_hit[-1].split(',')) + if dups > 1: + found_hit[-1] = str(dups + 1) + else: + found_hit[-1] = '0' + data.append(found_hit) + found_hit = '' + else: + data.append(default_hit(row, casf, compound_id)) + else: + data.append(default_hit(row, casf, compound_id)) + + casf = '' + compound_id = '' + found_hit = [] + dups = [] + return data + + +def _save_data(content, outfile): + ''' + Write to output file + @param content: content to write + @param outfile: file to write to + ''' + # header + header = ['CAS', + 'NAME', + 'FORMULA', + 'RI', + 'Column.type', + 'Column.phase.type', + 'Column.name', + 'phase.coding', + 'CAS_column.Name', + 'Centrotype', + 'Regression.Column.Name', + 'min', + 'max', + 'nr.duplicates'] + output_handle = csv.writer(open(outfile, 'wb'), delimiter="\t") + output_handle.writerow(header) + for entry in content: + output_handle.writerow(entry) + + +def _read_model(model_file): + ''' + Creates an easy to search dictionary for getting the regression parameters + for each valid combination of GC-columns + @param model_file: filename containing the regression models + ''' + regress = list(csv.reader(open(model_file, 'rU'), delimiter='\t')) + if len(regress.pop(0)) > 9: + method = 'poly' + else: + method = 'linear' + + model = {} + # Create new dictionary for each GC-column + for line in regress: + model[line[0]] = {} + + # Add data + for line in regress: + if method == 'poly': + model[line[0]][line[1]] = [float(col) for col in line[2:11]] + else: # linear + model[line[0]][line[1]] = [float(col) for col in line[2:9]] + + return model, method + + +def _apply_poly_regression(column1, column2, retention_index, model): + ''' + Calculates a new retention index (RI) value using a given 3rd-degree polynomial + model based on data from GC columns 1 and 2 + @param column1: name of the selected GC-column + @param column2: name of the GC-column to use for regression + @param retention_index: RI to convert + @param model: dictionary containing model information for all GC-columns + ''' + coeff = model[column1][column2] + # If the retention index to convert is within range of the data the model is based on, perform regression + if coeff[4] < retention_index < coeff[5]: + return (coeff[3] * (retention_index ** 3) + coeff[2] * (retention_index ** 2) + + (retention_index * coeff[1]) + coeff[0]) + else: + return False + + +def _apply_linear_regression(column1, column2, retention_index, model): + ''' + Calculates a new retention index (RI) value using a given linear model based on data + from GC columns 1 and 2 + @param column1: name of the selected GC-column + @param column2: name of the GC-column to use for regression + @param retention_index: RI to convert + @param model: dictionary containing model information for all GC-columns + ''' + # TODO: No use of limits + coeff = model[column1][column2] + return coeff[1] * retention_index + coeff[0] + + +def main(): + ''' + Library Lookup main function + ''' + library_file = sys.argv[1] + nist_tabular_filename = sys.argv[2] + ctype = sys.argv[3] + polar = sys.argv[4] + outfile = sys.argv[5] + pref = sys.argv[6:-1] + regress = sys.argv[-1] + + if regress != 'False': + model, method = _read_model(regress) + else: + model, method = False, None + + lookup_dict = create_lookup_table(library_file, ctype, polar) + data = format_result(lookup_dict, nist_tabular_filename, pref, ctype, polar, model, method) + + _save_data(data, outfile) + + +if __name__ == "__main__": + main()