diff library_lookup.py @ 0:dffc38727496

initial commit
author pieter.lukasse@wur.nl
date Sat, 07 Feb 2015 22:02:00 +0100
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/library_lookup.py	Sat Feb 07 22:02:00 2015 +0100
@@ -0,0 +1,327 @@
+'''
+Logic for searching a Retention Index database file given output from NIST
+'''
+import match_library
+import re
+import sys
+import csv
+
+__author__ = "Marcel Kempenaar"
+__contact__ = "brs@nbic.nl"
+__copyright__ = "Copyright, 2012, Netherlands Bioinformatics Centre"
+__license__ = "MIT"
+
+def create_lookup_table(library_file, column_type_name, statphase):
+    '''
+    Creates a dictionary holding the contents of the library to be searched
+    @param library_file: library to read
+    @param column_type_name: the columns type name
+    @param statphase: the columns stationary phase
+    '''
+    (data, header) = match_library.read_library(library_file)
+    # Test for presence of required columns
+    if ('columntype' not in header or
+        'columnphasetype' not in header or
+        'cas' not in header):
+        raise IOError('Missing columns in ', library_file)
+
+    column_type_column = header.index("columntype")
+    statphase_column = header.index("columnphasetype")
+    cas_column = header.index("cas")
+
+    filtered_library = [line for line in data if line[column_type_column] == column_type_name
+                        and line[statphase_column] == statphase]
+    lookup_dict = {}
+    for element in filtered_library:
+        # Here the cas_number is set to the numeric part of the cas_column value, so if the 
+        # cas_column value is 'C1433' then cas_number will be '1433'
+        cas_number = str(re.findall(r'\d+', (element[cas_column]).strip())[0])
+        try:
+            lookup_dict[cas_number].append(element)
+        except KeyError:
+            lookup_dict[cas_number] = [element]
+    return lookup_dict
+
+
+def _preferred(hits, pref, ctype, polar, model, method):
+    '''
+    Returns all entries in the lookup_dict that have the same column name, type and polarity
+    as given by the user, uses regression if selected given the model and method to use. The
+    regression is applied on the column with the best R-squared value in the model
+    @param hits: all entries in the lookup_dict for the given CAS number
+    @param pref: preferred GC-column, can be one or more names
+    @param ctype: column type (capillary etc.)
+    @param polar: polarity (polar / non-polar etc.)
+    @param model: data loaded from file containing regression models
+    @param method: supported regression method (i.e. poly(nomial) or linear)
+    '''
+    match = []
+    for column in pref:
+        for hit in hits:
+            if hit[4] == ctype and hit[5] == polar and hit[6] == column:
+                # Create copy of found hit since it will be altered downstream
+                match.extend(hit)
+                return match, False
+
+    # No hit found for current CAS number, return if not performing regression
+    if not model:
+        return False, False
+
+    # Perform regression
+    for column in pref:
+        if column not in model:
+            break
+        # Order regression candidates by R-squared value (last element)
+        order = sorted(model[column].items(), key=lambda col: col[1][-1])
+        # Create list of regression candidate column names
+        regress_columns = list(reversed([column for (column, _) in order]))
+        # Names of available columns
+        available = [hit[6] for hit in hits]
+        
+        # TODO: combine Rsquared and number of datapoints to get the best regression match
+        '''
+        # Iterate regression columns (in order) and retrieve their models
+        models = {}
+        for col in regress_columns:
+            if col in available:
+                hit = list(hits[available.index(col)])
+                if hit[4] == ctype:
+                    # models contains all model data including residuals [-2] and rsquared [-1]
+                    models[pref[0]] = model[pref[0]][hit[6]] 
+        # Get the combined maximum for residuals and rsquared
+        best_match = models[]
+        # Apply regression
+        if method == 'poly':
+            regressed = _apply_poly_regression(best_match, hit[6], float(hit[3]), model)
+            if regressed:
+                hit[3] = regressed
+            else:
+                return False, False
+            else:
+                hit[3] = _apply_linear_regression(best_match, hit[6], float(hit[3]), model)
+                match.extend(hit)
+            return match, hit[6]
+        '''
+        
+        for col in regress_columns:
+            if col in available:
+                hit = list(hits[available.index(col)])
+                if hit[4] == ctype:
+                    # Perform regression using a column for which regression is possible
+                    if method == 'poly':
+                        # Polynomial is only possible within a set border, if the RI falls outside
+                        # of this border, skip this lookup
+                        regressed = _apply_poly_regression(pref[0], hit[6], float(hit[3]), model)
+                        if regressed:
+                            hit[3] = regressed
+                        else:
+                            return False, False
+                    else:
+                        hit[3] = _apply_linear_regression(pref[0], hit[6], float(hit[3]), model)
+                    match.extend(hit)
+                    return match, hit[6]
+
+    return False, False
+
+
+
+def default_hit(row, cas_nr, compound_id):
+    '''
+    This method will return a "default"/empty hit for cases where the
+    method _preferred() returns False (i.e. a RI could not be found 
+    for the given cas nr, also not via regression.
+    '''
+    return [
+            #'CAS', 
+            'C' + cas_nr,
+            #'NAME', 
+            '',
+            #'FORMULA', 
+            '',
+            #'RI', 
+            '0.0',
+            #'Column.type', 
+            '',
+            #'Column.phase.type', 
+            '',
+            #'Column.name', 
+            '',
+            #'phase.coding', 
+            ' ',
+            #'CAS_column.Name', 
+            '',
+            #'Centrotype', -> NOTE THAT compound_id is not ALWAYS centrotype...depends on MsClust algorithm used...for now only one MsClust algorithm is used so it is not an issue, but this should be updated/corrected once that changes
+            compound_id,
+            #'Regression.Column.Name', 
+            '',
+            #'min', 
+            '',
+            #'max', 
+            '',
+            #'nr.duplicates', 
+            '']
+    
+
+def format_result(lookup_dict, nist_tabular_filename, pref, ctype, polar, model, method):
+    '''
+    Looks up the compounds in the library lookup table and formats the results
+    @param lookup_dict: dictionary containing the library to be searched
+    @param nist_tabular_filename: NIST output file to be matched
+    @param pref: (list of) column-name(s) to look for
+    @param ctype: column type of interest
+    @param polar: polarity of the used column
+    @param model: data loaded from file containing regression models
+    @param method: supported regression method (i.e. poly(nomial) or linear)
+    '''
+    (nist_tabular_list, header_clean) = match_library.read_library(nist_tabular_filename)
+    # Retrieve indices of the CAS and compound_id columns (exit if not present)
+    try:
+        casi = header_clean.index("cas")
+        idi = header_clean.index("id")
+    except:
+        raise IOError("'CAS' or 'compound_id' not found in header of library file")
+
+    data = []
+    for row in nist_tabular_list:
+        casf = str(row[casi].replace('-', '').strip())
+        compound_id = str(row[idi].split('-')[0])
+        if casf in lookup_dict:
+            found_hit, regress = _preferred(lookup_dict[casf], pref, ctype, polar, model, method)
+            if found_hit:
+                # Keep cas nr as 'C'+ numeric part:
+                found_hit[0] = 'C' + casf
+                # Add compound id
+                found_hit.insert(9, compound_id)
+                # Add information on regression process
+                found_hit.insert(10, regress if regress else 'None')
+                # Replace column index references with actual number of duplicates
+                dups = len(found_hit[-1].split(','))
+                if dups > 1:
+                    found_hit[-1] = str(dups + 1)
+                else:
+                    found_hit[-1] = '0'
+                data.append(found_hit)
+                found_hit = ''
+            else:
+                data.append(default_hit(row, casf, compound_id))
+        else:
+            data.append(default_hit(row, casf, compound_id))
+            
+        casf = ''
+        compound_id = ''
+        found_hit = []
+        dups = []
+    return data
+
+
+def _save_data(content, outfile):
+    '''
+    Write to output file
+    @param content: content to write
+    @param outfile: file to write to
+    '''
+    # header
+    header = ['CAS',
+              'NAME',
+              'FORMULA',
+              'RI',
+              'Column.type',
+              'Column.phase.type',
+              'Column.name',
+              'phase.coding',
+              'CAS_column.Name',
+              'Centrotype',
+              'Regression.Column.Name',
+              'min',
+              'max',
+              'nr.duplicates']
+    output_handle = csv.writer(open(outfile, 'wb'), delimiter="\t")
+    output_handle.writerow(header)
+    for entry in content:
+        output_handle.writerow(entry)
+
+
+def _read_model(model_file):
+    '''
+    Creates an easy to search dictionary for getting the regression parameters
+    for each valid combination of GC-columns
+    @param model_file: filename containing the regression models
+    '''
+    regress = list(csv.reader(open(model_file, 'rU'), delimiter='\t'))
+    if len(regress.pop(0)) > 9:
+        method = 'poly'
+    else:
+        method = 'linear'
+
+    model = {}
+    # Create new dictionary for each GC-column
+    for line in regress:
+        model[line[0]] = {}
+
+    # Add data
+    for line in regress:
+        if method == 'poly':
+            model[line[0]][line[1]] = [float(col) for col in line[2:11]]
+        else:  # linear
+            model[line[0]][line[1]] = [float(col) for col in line[2:9]]
+
+    return model, method
+
+
+def _apply_poly_regression(column1, column2, retention_index, model):
+    '''
+    Calculates a new retention index (RI) value using a given 3rd-degree polynomial
+    model based on data from GC columns 1 and 2
+    @param column1: name of the selected GC-column
+    @param column2: name of the GC-column to use for regression
+    @param retention_index: RI to convert
+    @param model: dictionary containing model information for all GC-columns
+    '''
+    coeff = model[column1][column2]
+    # If the retention index to convert is within range of the data the model is based on, perform regression
+    if coeff[4] < retention_index < coeff[5]:
+        return (coeff[3] * (retention_index ** 3) + coeff[2] * (retention_index ** 2) + 
+                (retention_index * coeff[1]) + coeff[0])
+    else:
+        return False
+
+
+def _apply_linear_regression(column1, column2, retention_index, model):
+    '''
+    Calculates a new retention index (RI) value using a given linear model based on data
+    from GC columns 1 and 2
+    @param column1: name of the selected GC-column
+    @param column2: name of the GC-column to use for regression
+    @param retention_index: RI to convert
+    @param model: dictionary containing model information for all GC-columns
+    '''
+    # TODO: No use of limits
+    coeff = model[column1][column2]
+    return coeff[1] * retention_index + coeff[0]
+
+
+def main():
+    '''
+    Library Lookup main function
+    '''
+    library_file = sys.argv[1]
+    nist_tabular_filename = sys.argv[2]
+    ctype = sys.argv[3]
+    polar = sys.argv[4]
+    outfile = sys.argv[5]
+    pref = sys.argv[6:-1]
+    regress = sys.argv[-1]
+
+    if regress != 'False':
+        model, method = _read_model(regress)
+    else:
+        model, method = False, None
+
+    lookup_dict = create_lookup_table(library_file, ctype, polar)
+    data = format_result(lookup_dict, nist_tabular_filename, pref, ctype, polar, model, method)
+
+    _save_data(data, outfile)
+
+
+if __name__ == "__main__":
+    main()