view rankfilter_GCMS/rankfilter.py @ 16:fe4682eb938c

small improvement
author pieter.lukasse@wur.nl
date Mon, 23 Mar 2015 08:40:42 +0100
parents 346ff9ad8c7a
children
line wrap: on
line source

"""
Copyright (C) 2011 by Velitchka Mihaleva, Wageningen University

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

#Library functions definition
#----------Begin-------------
from numpy import array, linalg, ones
from numpy.polynomial import polynomial
import math
import sys
import csv

def calibrate(standards):
    '''
    Calculates the RT to RI conversion: RI = a + b*RT
    @param standards: variable containing RI and RT data
    '''
    A = ones((len(standards['R.T.']), 2), dtype=float)
    A[:, 0] = array(map(float, standards['R.T.']))
    [coeff, res, r, s] = linalg.lstsq(A, array(map(float, standards['RI'])))

    return coeff


def calibrate_poly(standards):
    '''
    Calculates the RT to RI conversion using a polynomial model
    @param standards: variable containing RI and RT data
    '''
    retention_time = array(map(float, standards['R.T.']))
    retention_index = array(map(float, standards['RI']))

    # Fit a 3rd degree polynomial
    fit = polynomial.polyfit(retention_time, retention_index, 3)[::-1]
    return [fit[0], fit[1], fit[2], fit[3]]


def calculate_poly(retention_time, poly_cal):
    '''
    Converts a given retention time to retention index using the calculated polynomial model
    @param retention_time: retention_time to convert to retention index
    @param poly_cal: result from calculating regression
    '''
    # Calculates RI based on given retention_time using polynomial function
    retention_time = array(map(float, retention_time))
    if len(retention_time) > 1:
        ri_exp = []
        for i in retention_time:
            ri_exp.append(poly_cal[0] * (i ** 3) + poly_cal[1] * (i ** 2) + (i * poly_cal[2]) + poly_cal[3])
        return ri_exp
    else:
        return poly_cal[0] * (retention_time ** 3) + poly_cal[1] * (retention_time ** 2) + (retention_time * poly_cal[2]) + poly_cal[3]


def convert_rt(hit_list, coeff):
    '''
    Converts a given retention time to retention index using the linear model
    @param hit_list: list holding the retention time
    @param coeff: calculated coefficient (slope and intercept) using the linear model
    '''
    #Convert RT to RI
    hit_list['RIexp'] = array(map(float, hit_list['R.T.'])) * coeff[0] + coeff[1]
    return hit_list


def convert_rt_poly(hit_list, poly_cal):
    '''
    Calls the actual RT to RI converter and returns the updated list with added RIexp value
    @param hit_list: result list containing the retention time
    '''
    hit_list['RIexp'] = array(map(float, calculate_poly(hit_list['R.T.'], poly_cal)))
    return hit_list


def get_data(libdata, LabelCol):
    '''
    Retrieves datacolumns indicated by LabelCol from libdata (generic function)
    Returns a dict with the requested column names as keys
    @param libdata: file from which data is loaded
    @param LabelCol: columns to retrieve
    '''
    from numpy import take
    LibData = open(libdata, 'r').read().split('\n')

    #Get the labels of the columns in the file
    FirstLine = LibData.pop(0).split('\t')

    FirstLine[-1] = FirstLine[-1].replace('\r', '')

    # Create a temporate variable containing the all data
    tmp_data = []
    for ll in LibData:
        if ll != '':
            tmp_data.append(ll.split('\t'))

    # Find the indices of the desired data
    ind = []
    try:
        for key in LabelCol:
            ind.append(FirstLine.index(key))
    except:
        print str(key) + " not found in first line of library (" + str(libdata) + ")"
        print"the folowing items are found in the first line of the library:\n" + str(FirstLine)
        sys.exit(1)
    # Extract the desired data
    data = []
    for x in tmp_data:
        data.append(take(array(x), ind))

    # library_data = dict(zip(LabelCol,transpose(data)))
    library_data = dict(zip(LabelCol, map(lambda *x: list(x), *data)))
    return library_data


def rank_hit(hit_list, library_data, window):
    '''
    Computes the Rank and % relative error
    @param hit_list: input data
    @param library_data: library used for reading the RIsvr data 
    @param window: error window
    '''
    hit_match_ripred = []
    hit_match_syn = []
    # Convert 'Name' data to list in order to be indexed
    # library_data['Name']=list(library_data['Name'])

    # tries to match on CAS first. If this is not possible (cas is 'undef' 
    # or not found) then tries to match on name:
    for hit_cas, hit_name in zip(hit_list['CAS'], hit_list['Name']):
        index = 0
        if hit_cas != 'undef':
            try:
                index = library_data['CAS'].index(hit_cas.replace(' ', '').replace('-', ''))
            except:
                try:
                    index = library_data['Name'].index(hit_name.replace(' ', ''))
                except:
                    # If for any reason the hit is not present
                    # in the mainlib library indicate this with -999 
                    index = 0
        else:
            try:
                index = library_data['Name'].index(hit_name.replace(' ', ''))
            except:
                # If for any reason the hit is not present
                # in the mainlib library indicate this with -999 
                index = 0
        if index != 0:
            hit_match_ripred.append(float(library_data['RIsvr'][index]))
            hit_match_syn.append(library_data['Synonyms'][index])
        else:
            hit_match_ripred.append(-999)
            hit_match_syn.append('None')
    hit_list['RIsvr'] = hit_match_ripred
    hit_list['Synonyms'] = hit_match_syn

    # Determine the relative difference between the experimental
    # and the predicted RI
    ri_err = []

    for ri_exp, ri_qsar in zip(hit_list['RIexp'], hit_list['RIsvr']):
        if int(ri_qsar) != -999:
            ri_err.append(float(int(float(ri_qsar)) - int(float(ri_exp))) / int(float(ri_exp)) * 100)
        else:
            ri_err.append(-999)

    # Define the rank of the hits
    hit_rank = []

    for tt in ri_err:
        if tt == -999:
            # The name of the hit generated with AMDIS did not match a name
            # in the mainlib library
            hit_rank.append(4)
        else:
            # Rank 1 - ri_err is within +/- window/2
            if abs(tt) <= float(window) / 2:
                hit_rank.append(1)
            # Rank 2 - window/2 < ri_err <= window 
            if abs(tt) > float(window) / 2 and abs(tt) <= float(window):
                hit_rank.append(2)
            # Rank 3 - ri_err > window
            if abs(tt) > float(window):
                hit_rank.append(3)
    hit_list['Rank'] = hit_rank
    hit_list['%rel.err'] = ri_err
    return hit_list

def print_to_file(hit_list, filename, keys_to_print, print_subsets=True):
    '''
    Writes output data to files (four output files are generated):
        filename_ranked - the hits are ranked
        filename_filter_in - only hits with rank 1 and 2 are retained
        filename_filter_out - hits with rank 3 are filtered out
        filename_filter_missed - hits with rank 4 - there was no match with the
                                 library data
    @param hit_list: a dictionary with the ranked hits
    @param filename: the core of the output file names
    @param keys_to_print: determines the order in which the data are printed
    @param print_subsets:
    '''
    from numpy import take

    out_ranked = open(filename["ranked"], 'w')
    if (print_subsets):
        out_in = open(filename["filter_in"], 'w')
        out_out = open(filename["filter_out"], 'w')
        out_missed = open(filename["missed"], 'w')

    #Convert RIexp and RIsvr into integer for printing
    hit_list['RIexp'] = map(int, map(math.ceil, hit_list['RIexp']))
    hit_list['RIsvr'] = map(int, map(math.ceil, hit_list['RIsvr']))

    #Establish the right order of the data to be printed    
    tmp_items = hit_list.items()
    tmp_keys = hit_list.keys()
    ind = []

    for key in keys_to_print:
        ind.append(tmp_keys.index(key))

    #Print the labels of the columns
    line = '\t'.join(take(array(tmp_keys), ind))
    out_ranked.write('%s\n' % line)
    if (print_subsets):
        out_in.write('%s\n' % line)
        out_out.write('%s\n' % line)
        out_missed.write('%s\n' % line)

    #Collect the data for each hit in the right order and print them
    #in the output file
    i = 0
    for name in hit_list['Name']:
        tt = []
        for x in iter(tmp_items):
            # trim the value if it is a string:
            if isinstance(x[1][i], basestring):
                tt.append(x[1][i].strip())
            else:
                tt.append(x[1][i])
        tmp1 = take(array(tt), ind)
        line = '\t'.join(tmp1.tolist())

        out_ranked.write('%s\n' % line)
        if(print_subsets):
            if hit_list['Rank'][i] == 4:
                out_missed.write('%s\n' % line)
            if hit_list['Rank'][i] == 3:
                out_out.write('%s\n' % line)
            if hit_list['Rank'][i] == 1 or hit_list['Rank'][i] == 2:
                out_in.write('%s\n' % line)

        i = i + 1

def read_tabular(in_csv):
    '''
    Parses a tab-separated file returning a dictionary with named columns
    @param in_csv: input filename to be parsed
    '''
    data = list(csv.reader(open(in_csv, 'rU'), delimiter='\t'))
    header = data.pop(0)
    # Create dictionary with column name as key
    output = {}
    for index in xrange(len(header)):
        output[header[index]] = [row[index] for row in data]
    return output

#---------End--------------
def main():
    #Ranking and filtering procedure
    if len(sys.argv) < 2:
        print "Usage:"
        print "python RankFilter_GC-MS.py input \n"
        print "input is a text file that specifies the names and the location"
        print "of the files with the sample, compounds for calibration, library"
        print "data, the core of the name ot the outputs, and the value of the"
        print "window used for the filtering \n"

        sys.exit(1)

    #------Read the input file------
    try:
        ifile = open(sys.argv[1], 'r').read().split('\n')
    except:
        print sys.argv[1], " file can not be found"
        sys.exit()

    #Get the file names for the data
    #labels - the type of input files
    #filenames - the names of the input files
    labels = []
    filenames = []

    for l in ifile:
        l = l.strip()
        if l != '':
            labels.append(l.split('=')[0].replace(' ', '').replace('\r', ''))
            filenames.append(l.split('=')[1].replace(' ', '').replace('\r', ''))

    InputData = dict(zip(labels, filenames))

    #this part checkes if the ouput option is set. The output files are saved as the output variable as prefix for the output files
    #if the output is not found , each output file has to be selected by forehand. This comes in handy for pipeline tools as galaxy
    print_subsets = True
    NDIS_is_tabular = False

    if 'output' in InputData:
        output_files = {"ranked":InputData['output'] + "_ranked", \
        "filter_in":InputData['output'] + "_filter_in", \
        "filter_out":InputData['output'] + "filter_out", \
        "missed":InputData['output'] + "_missed", \
        "missed_parse_pdf":InputData['output'] + "_missed_parse_pdf"}
    elif 'tabular' in InputData:
        NDIS_is_tabular = True
        if(not "onefile" in InputData):
            output_files = {"ranked":InputData['ranked'], \
            "filter_in":InputData['filter_in'], \
            "filter_out":InputData['filter_out'], \
            "missed":InputData['missed']}
        else:
            print_subsets = False
            output_files = {"ranked":InputData['onefile']}
    else:
        output_files = {"ranked":InputData['ranked'], \
        "filter_in":InputData['filter_in'], \
        "filter_out":InputData['filter_out'], \
        "missed":InputData['missed'], \
        "missed_parse_pdf":InputData['missed_parse_pdf']}

    #------Start with calibration------
    #Check whether a file with data for the calibration is specified
    #Specify which data to be read from the file with standard compounds
    LabelColStand = ['Name', 'R.T.', 'RI']

    if InputData['calibration'] != 'none':
        #get the coeffiecients for the RT to RI convertion

        try:
            ifile = open(InputData['calibration'], 'r')
        except:
            print "file", InputData['calibration'], "can not be found"
            sys.exit(1)

        standards = get_data(InputData['calibration'], LabelColStand)
        if InputData['model'] == 'linear':
            coeff = calibrate(standards)
        elif InputData['model'] == 'poly':
            poly_cal = calibrate_poly(standards)
        else:
            print "error: model ", InputData['model'], " can not be found. Use 'linear' or 'poly' "
            sys.exit(1)
    else:
        #No file has been specified for the calibration
        #Use the default coefficients
        print 'No file has been specified for the calibration'
        print 'WARNING: the default coefficients will be used'
        coeff = array([29.4327, 454.5260])

    if InputData['analysis_type'] == 'AMDIS':
        try:
            AmdisOut = open(InputData['sample'], 'r')
            print("open ok")
            #Specify which data to be extracted from the AMDIS output table
            #Weighted and Reverse are measure of matching between the experimental
            #and the library spectra. The experimental spectrum is used as template
            #for the calculation of Weighted, whereas for Reverse the template is the
            #library spectrum
            LabelCol = ['CAS', 'Name', 'R.T.', 'Weighted', 'Reverse', 'Purity']

            #Get the data from the AMDIS output file
            HitList = get_data(InputData['sample'], LabelCol)
            #Remove '>' from the names
            HitList['Name'] = [s.replace('>', '') for s in HitList['Name']]
        except:
            print "the file", InputData['sample'], "can not be found"
            sys.exit(1)
    if InputData['analysis_type'] == 'NIST':
        #HitList_missed - a variable of type dictionary containing the hits with the symbol ";"
        #in the name
        # HITLIST = the NIST results file given here as input:
        HitList = read_tabular(InputData['sample'])

    #Convert RT to RI
    if InputData['model'] == 'linear':
            HitList = convert_rt(HitList, coeff)
    if InputData['model'] == 'poly':
        print "Executing convert_rt_poly().."
        HitList = convert_rt_poly(HitList, poly_cal)

    #------Read the library data with the predicted RI------
    try:
        LibData = open(InputData['lib_data'], 'r')
    except:
        print "the file", InputData['lib_data'], "can not be found"
        sys.exit(1)

    #Specify which data to be extracted from the library data file
    LabelColLib = ['CAS', 'Name', 'RIsvr', 'Synonyms']
    LibraryData = get_data(InputData['lib_data'], LabelColLib)

    #------Match the hits with the library data and rank them------
    if InputData['window'] != '':
        HitList = rank_hit(HitList, LibraryData, InputData['window'])
    else:
        print "No value for the window used for the filtering is specified \n"
        sys.exit(1)

    #------Print the ranked and filtered hits------
    #Specify which data to be printed
    if InputData['analysis_type'] == 'AMDIS':
        keys_to_print = ['R.T.', 'CAS', 'Name', 'Formula','Rank', 'RIexp', 'RIsvr', '%rel.err', 'Weighted', 'Reverse', 'Synonyms']
    else:
        keys_to_print = ['ID', 'R.T.', 'Name', 'Formula', 'CAS', 'Rank', 'RIexp', 'RIsvr', '%rel.err', 'Forward', 'Reverse', 'Synonyms', 'Library']



    print_to_file(HitList, output_files, keys_to_print, print_subsets)

if __name__ == '__main__':
    main()