view test/integration_tests.py @ 33:42880675f824

more mem
author pieter.lukasse@wur.nl
date Tue, 16 Sep 2014 21:02:23 +0200
parents 53e1eee93430
children
line wrap: on
line source

'''Integration tests for the GCMS project'''

from pkg_resources import resource_filename  # @UnresolvedImport # pylint: disable=E0611
from GCMS import library_lookup, combine_output
from GCMS.rankfilter_GCMS import rankfilter
import os.path
import sys
import unittest
import re


class IntegrationTest(unittest.TestCase):
    def test_library_lookup(self):
        '''
        Run main for data/NIST_tabular and compare produced files with references determined earlier.
        '''
        # Create out folder
        outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup')
        if not os.path.exists(outdir):
            os.makedirs(outdir)
        outfile_base = os.path.join(outdir, 'produced_library_lookup')
        outfile_txt = outfile_base + '.txt'

        #Build up arguments and run
        input_txt = resource_filename(__name__, "data/NIST_tabular.txt")
        library = resource_filename(__name__, "data/RIDB_subset.txt")
        regress_model = resource_filename(__name__, "data/ridb_poly_regression.txt")
        sys.argv = ['test',
                    library,
                    input_txt,
                    'Capillary',
                    'Semi-standard non-polar',
                    outfile_txt,
                    'HP-5',
                    regress_model]
        # Execute main function with arguments provided through sys.argv
        library_lookup.main()
        #Compare with reference files
        reference_txt = resource_filename(__name__, 'reference/produced_library_lookup.txt')
        
        #read both the reference file  and actual output files
        expected = _read_file(reference_txt)
        actual = _read_file(outfile_txt)
        
        #convert the read in files to lists we can compare
        expected = expected.split()
        actual = actual.split()

        for exp, act in zip(expected, actual):
            if re.match('\\d+\\.\\d+', exp):
                exp = float(exp)
                act = float(act)
                self.assertAlmostEqual(exp, act, places=5)
            else:
                # compare values
                self.failUnlessEqual(expected, actual)


    def test_combine_output_simple(self):
        '''
        Run main for data/NIST_tabular and compare produced files with references determined earlier.
        '''
        # Create out folder
        outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup')
        if not os.path.exists(outdir):
            os.makedirs(outdir)
        outfile_base = os.path.join(outdir, 'produced_combine_output')
        outfile_single_txt = outfile_base + '_single.txt'
        outfile_multi_txt = outfile_base + '_multi.txt'

        #Build up arguments and run
        input_rankfilter = resource_filename(__name__, "data/Rankfilter.txt")
        input_caslookup = resource_filename(__name__, "data/Caslookup.txt")
        sys.argv = ['test',
                    input_rankfilter,
                    input_caslookup,
                    outfile_single_txt,
                    outfile_multi_txt]
        # Execute main function with arguments provided through sys.argv
        combine_output.main()
        #Compare with reference files
        # reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt')
        # reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt')
        # self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt))
        # self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt))

        #Clean up
        #shutil.rmtree(tempdir)


        
    def def_test_rank_filter_advanced(self):
        '''
        Run main of RankFilter
        '''
        # Create out folder
        outdir = "output/integration/"
        if not os.path.exists(outdir):
            os.makedirs(outdir)

        #Build up arguments and run
        input_txt = resource_filename(__name__, "data/integration/RankFilterInput_conf.txt")
        sys.argv = ['test', 
                    input_txt]
        # Execute main function with arguments provided through sys.argv
        rankfilter.main()
        #Compare with reference files
               
    def def_test_library_lookup_advanced(self):
        '''
        Run main for data/NIST_tabular and compare produced files with references determined earlier.
        '''
        # Create out folder
        outdir = "output/integration/" 
        if not os.path.exists(outdir):
            os.makedirs(outdir)
        outfile_base = os.path.join(outdir, 'produced_library_lookup_ADVANCED')
        outfile_txt = outfile_base + '.txt'

        #Build up arguments and run
        input_txt = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt")
        library = resource_filename(__name__, "../repositories/PRIMS-metabolomics/RI_DB_libraries/Library_RI_DB_capillary_columns-noDuplicates.txt")
        regress_model = resource_filename(__name__, "data/integration/regression_MODEL_for_columns.txt")
        sys.argv = ['test',
                    library,
                    input_txt,
                    'Capillary',
                    'Semi-standard non-polar',
                    outfile_txt,
                    'DB-5',
                    regress_model]
        # Execute main function with arguments provided through sys.argv
        library_lookup.main()


        
    def test_combine_output_advanced(self):
        '''
        Variant on test case above, but a bit more complex as some of the centrotypes have
        different NIST hits which should give them different RI values. This test also
        runs not only the combine output, but the other two preceding steps as well, 
        so it ensures the integration also works on the current code of all three tools. 
        '''
            
        # Run RankFilter 
        self.def_test_rank_filter_advanced()
        
        # Run library CAS RI lookup
        self.def_test_library_lookup_advanced()
        
        outdir = "output/integration/"    
        outfile_base = os.path.join(outdir, 'produced_combine_output')
        outfile_single_txt = outfile_base + '_single.txt'
        outfile_multi_txt = outfile_base + '_multi.txt'

        #Build up arguments and run
        input_rankfilter = resource_filename(__name__, "output/integration/produced_rank_filter_out.txt")
        input_caslookup = resource_filename(__name__, "output/integration/produced_library_lookup_ADVANCED.txt")
        sys.argv = ['test',
                    input_rankfilter,
                    input_caslookup,
                    outfile_single_txt,
                    outfile_multi_txt]
        # Execute main function with arguments provided through sys.argv
        combine_output.main()
        #Compare with reference files
#        reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt')
#        reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt')
#        self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt))
#        self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt))
        
        # Check 1: output single should have one record per centrotype:
        
        
        # Check 2: output single has more records than output single:
        combine_result_single_items =  combine_output._process_data(outfile_single_txt)
        combine_result_multi_items =  combine_output._process_data(outfile_multi_txt)
        self.assertGreater(len(combine_result_single_items['Centrotype']), 
                           len(combine_result_multi_items['Centrotype']))
        
        
        # Check 3: library_lookup RI column, centrotype column, ri_svr column are correct:
        caslookup_items = combine_output._process_data(input_caslookup)
        rankfilter_items = combine_output._process_data(input_rankfilter)
        
        # check that the caslookup RI column is correctly maintained in its original order in
        # the combined file:
        ri_caslookup = caslookup_items['RI']
        ri_combine_single = combine_result_single_items['RI']
        self.assertListEqual(ri_caslookup, ri_combine_single) 
        
        # check the centrotype column's integrity:
        centrotype_caslookup = caslookup_items['Centrotype']
        centrotype_combine_single = combine_result_single_items['Centrotype']
        centrotype_rankfilter = _get_centrotype_rankfilter(rankfilter_items['ID'])
        self.assertListEqual(centrotype_caslookup, centrotype_combine_single)
        self.assertListEqual(centrotype_caslookup, centrotype_rankfilter)
                
        # integration and integrity checks:
        file_NIST = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt")
        file_NIST_items = combine_output._process_data(file_NIST)
        # check that rank filter output has exactly the same ID items as the original NIST input file:
        self.assertListEqual(file_NIST_items['ID'], rankfilter_items['ID']) 
        # check the same for the CAS column:
        self.assertListEqual(_get_strippedcas(file_NIST_items['CAS']), rankfilter_items['CAS'])
        # now check the NIST CAS column against the cas lookup results:  
        cas_NIST = _get_processedcas(file_NIST_items['CAS'])
        self.assertListEqual(cas_NIST, caslookup_items['CAS'])
        # now check the CAS of the combined result. If all checks are OK, it means the CAS column's order
        # and values remained stable throughout all steps: 
        self.assertListEqual(rankfilter_items['CAS'], combine_result_single_items['CAS']) 
        
        # check that the rankfilter RIsvr column is correctly maintained in its original order in
        # the combined file:
        risvr_rankfilter = rankfilter_items['RIsvr']
        risvr_combine_single = combine_result_single_items['RIsvr']
        self.assertListEqual(risvr_rankfilter, risvr_combine_single) 

        
   

def _get_centrotype_rankfilter(id_list):
    '''
    returns the list of centrotype ids given a list of ID in the
    form e.g. 74-1.0-564-1905200-7, where the numbers before the 
    first "-" are the centrotype id
    '''
    result = []
    for compound_id_idx in xrange(len(id_list)):
        compound_id = id_list[compound_id_idx]
        centrotype = compound_id.split('-')[0]
        result.append(centrotype) 

    return result


def _get_processedcas(cas_list):
    '''
    returns the list cas numbers in the form C64175 instead of 64-17-5
    '''
    result = []
    for cas_id_idx in xrange(len(cas_list)):
        cas = cas_list[cas_id_idx]
        processed_cas = 'C' + str(cas.replace('-', '').strip())
        result.append(processed_cas) 

    return result

def _get_strippedcas(cas_list):
    '''
    removes the leading white space from e.g. " 64-17-5"
    '''
    result = []
    for cas_id_idx in xrange(len(cas_list)):
        cas = cas_list[cas_id_idx]
        processed_cas = cas.strip()
        result.append(processed_cas) 

    return result


def _read_file(filename):
    '''
    Helper method to quickly read a file
    @param filename:
    '''
    with open(filename) as handle:
        return handle.read()