Mercurial > repos > pieterlukasse > prims_metabolomics2
changeset 5:41f122255d14
small update
author | pieter.lukasse@wur.nl |
---|---|
date | Thu, 19 Mar 2015 12:13:13 +0100 |
parents | 0d1557b3d540 |
children | 4393f982d18f |
files | Rscripts/filter-RIDB.R Rscripts/ridb-regression.R __init__.py static_resources/README.txt test/__init__.py test/integration_tests.py test/test_combine_output.py test/test_export_to_metexp_tabular.py test/test_library_lookup.py test/test_match_library.py test/test_query_mass_repos.py test/test_query_metexp.py test/test_query_metexp_LARGE.py |
diffstat | 13 files changed, 3 insertions(+), 1208 deletions(-) [+] |
line wrap: on
line diff
--- a/Rscripts/filter-RIDB.R Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,56 +0,0 @@ -## -# -# Removes duplicates from a RI-database -# -# Usage: -# Rscript filter-RIDB.R /path/to/retention_db.txt output_RIDB_file.txt -# -## - -# Commandline arguments -args <- commandArgs(TRUE) -ridb <- args[1] -out_file <- args[2] - -# Function to check duplicates -duplicates <- function(dat) { - s <- do.call("order", as.data.frame(dat)) - non.dup <- !duplicated(dat[s, ]) - orig.ind <- s[non.dup] - first.occ <- orig.ind[cumsum(non.dup)] - first.occ[non.dup] <- NA - first.occ[order(s)] -} - -# Load CSV file -ridb <- read.csv(ridb,header=TRUE, sep="\t") -## Filters on: CAS FORMULA Column type Column phase type Column name -filter_cols <- c(1, 3, 5, 6, 7) -cat("RIDB dimensions: ") -print(dim(ridb)) -deleted <- NULL -cat("Checking for duplicates...") -dups <- duplicates(ridb[,filter_cols]) -cat("\t[DONE]\nRemoving duplicates...") -newridb <- ridb -newridb["min"] <- NA -newridb["max"] <- NA -newridb["orig.columns"] <- NA -for (i in unique(dups)) { - if (!is.na(i)) { - rows <- which(dups == i) - duprows <- ridb[c(i, rows),] - # Replace duplicate rows with one row containing the median value - new_RI <- median(duprows$RI) - newridb$RI[i] <- median(duprows$RI) - newridb$min[i] <- min(duprows$RI) - newridb$max[i] <- max(duprows$RI) - newridb$orig.columns[i] <- paste(rows, collapse=",") - deleted <- c(deleted, rows) - } -} -cat("\t\t[DONE]\nCreating new dataset...") -out_ridb <- newridb[-deleted,] -cat("\t\t[DONE]\nWriting new dataset...") -write.table(out_ridb, na='', file=out_file, quote=T, sep="\t", row.names=F) -cat("\t\t[DONE]\n")
--- a/Rscripts/ridb-regression.R Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,208 +0,0 @@ -## -# -# Performs regression analysis using either 3rd degree polynomial- or linear-method -# -## - -# Commandline arguments -args <- commandArgs(TRUE) -if (length(args) < 7) - stop(cat("Missing arguments, usage:\n\tRscript ridb-regression.R RI-database ", - "ouput_file logfile min_residuals range_mod pvalue rsquared method ", - "plot(yes/no) plot_archive")) - -ridb <- args[1] -out_file <- args[2] -logfile <- args[3] -min_residuals <- as.integer(args[4]) -range_mod <- as.integer(args[5]) -pvalue <- as.double(args[6]) -rsquared <- as.double(args[7]) -method <- args[8] -plot <- tolower(args[9]) -if (plot == 'true') - plot_archive = args[10] - -# Do not show warnings etc. -sink(file='/dev/null') - -progress <- c() -logger <- function(logdata) { - ## Logs progress, adds a timestamp for each event - #cat(paste(Sys.time(), "\t", logdata, "\n", sep="")) ## DEBUG - progress <<- c(progress, paste(Sys.time(), "\t", logdata, sep="")) -} - -logger("Reading Retention Index Database..") - -# Read Retention Index Database -ridb <- read.csv(ridb, header=TRUE, sep="\t") -logger(paste("\t", nrow(ridb), "records read..")) -# Get a unique list -gc_columns <- unique(as.vector(as.matrix(ridb['Column.name'])[,1])) -cas_numbers <- unique(as.vector(as.matrix(ridb['CAS'])[,1])) - -add_poly_fit <- function(fit, gc1_index, gc2_index, range) { - pval = anova.lm(fit)$Pr - r.squared = summary(fit)$r.squared - - data = rep(NA, 11) - # Append results to matrix - data[1] = gc_columns[gc1_index] # Column 1 - data[2] = gc_columns[gc2_index] # Column 2 - data[3] = coefficients(fit)[1] # The 4 coefficients - data[4] = coefficients(fit)[2] - data[5] = coefficients(fit)[3] - data[6] = coefficients(fit)[4] - data[7] = range[1] # Left limit - data[8] = range[2] # Right limit - data[9] = length(fit$residuals) # Number of datapoints analysed - data[10] = pval[1] # p-value for resulting fitting - data[11] = r.squared # R-squared - return(data) -} - - -add_linear_fit <- function(fit, gc1_index, gc2_index, range) { - pval = anova.lm(fit)$Pr - r.squared = summary(fit)$r.squared - - data = rep(NA, 7) - # Append results to matrix - data[1] = gc_columns[gc1_index] # Column 1 - data[2] = gc_columns[gc2_index] # Column 2 - data[3] = coefficients(fit)[1] # The 4 coefficients - data[4] = coefficients(fit)[2] - data[7] = length(fit$residuals) # Number of datapoints analysed - data[8] = pval[1] # p-value for resulting fitting - data[9] = r.squared # R-squared - return(data) -} - - -add_fit <- function(fit, gc1_index, gc2_index, range, method) { - if (method == 'poly') - return(add_poly_fit(fit, gc1_index, gc2_index, range)) - else - return(add_linear_fit(fit, gc1_index, gc2_index, range)) -} - - -plot_fit <- function(ri1, ri2, gc1_index, gc2_index, coeff, range, method) { - if (method == 'poly') - pol <- function(x) coeff[4]*x^3 + coeff[3]*x^2 + coeff[2]*x + coeff[1] - else - pol <- function(x) coeff[2]*x + coeff[1] - pdf(paste('regression_model_', - make.names(gc_columns[gc1_index]), '_vs_', - make.names(gc_columns[gc2_index]), '.pdf', sep='')) - curve(pol, 250:3750, col="red", lwd=2.5, main='Regression Model', xlab=gc_columns[gc1_index], - ylab=gc_columns[gc2_index], xlim=c(250, 3750), ylim=c(250, 3750)) - points(ri1, ri2, lwd=0.4) - # Add vertical lines showing left- and right limits when using poly method - if (method == 'poly') - abline(v=range, col="grey", lwd=1.5) - dev.off() -} - -# Initialize output dataframe -if (method == 'poly') { - m <- data.frame(matrix(ncol = 11, nrow = 10)) -} else { - m <- data.frame(matrix(ncol = 9, nrow = 10)) -} - - -get_fit <- function(gc1, gc2, method) { - if (method == 'poly') - return(lm(gc1 ~ poly(gc2, 3, raw=TRUE))) - else - return(lm(gc1 ~ gc2)) -} - -# Permutate -k <- 1 -logger(paste("Permutating (with ", length(gc_columns), " GC-columns)..", sep="")) - -for (i in 1:(length(gc_columns)-1)) { - logger(paste("\tCalculating model for ", gc_columns[i], "..", sep="")) - breaks <- 0 - for (j in (i+1):length(gc_columns)) { - col1 = ridb[which(ridb['Column.name'][,1] == gc_columns[i]),] - col2 = ridb[which(ridb['Column.name'][,1] == gc_columns[j]),] - - # Find CAS numbers for which both columns have data (intersect) - cas_intersect = intersect(col1[['CAS']], col2[['CAS']]) - - # Skip if number of shared CAS entries is < cutoff - if (length(cas_intersect) < min_residuals) { - breaks = breaks + 1 - next - } - # Gather Retention Indices - col1_data = col1[['RI']][match(cas_intersect, col1[['CAS']])] - col2_data = col2[['RI']][match(cas_intersect, col2[['CAS']])] - - # Calculate the range within which regression is possible (and move if 'range_mod' != 0) - range = c(min(c(min(col1_data), min(col2_data))), max(c(max(col1_data), max(col2_data)))) - if (range_mod != 0) { - # Calculate percentage and add/subtract from range - perc = diff(range) / 100 - perc_cutoff = range_mod * perc - range = as.integer(range + c(perc_cutoff, -perc_cutoff)) - } - - # Calculate model for column1 vs column2 and plot if requested - fit = get_fit(col1_data, col2_data, method) - m[k,] = add_fit(fit, i, j, range, method) - - if (plot == 'true') - plot_fit(col1_data, col2_data, i, j, coefficients(fit), range, method) - - # Calculate model for column2 vs column1 and plot if requested - fit = get_fit(col2_data, col1_data, method) - m[k + 1,] = add_fit(fit, j, i, range, method) - - if (plot == 'true') - plot_fit(col2_data, col1_data, j, i, coefficients(fit), range, method) - - k = k + 2 - } - logger(paste("\t\t", breaks, " comparisons have been skipped due to nr. of datapoints < cutoff", sep="")) -} - -# Filter on pvalue and R-squared -logger("Filtering on pvalue and R-squared..") -if (method == 'poly') { - pval_index <- which(m[,10] < pvalue) - rsquared_index <- which(m[,11] > rsquared) -} else { - pval_index <- which(m[,8] < pvalue) - rsquared_index <- which(m[,9] > rsquared) -} -logger(paste(nrow(m) - length(pval_index), " models discarded due to pvalue > ", pvalue, sep="")) - -logger(paste(nrow(m) - length(rsquared_index), " models discarded due to R-squared < ", rsquared, sep="")) - -# Remaining rows -index = unique(c(pval_index, rsquared_index)) - -# Reduce dataset -m = m[index,] -sink() - -# Place plots in the history as a ZIP file -if (plot == 'true') { - logger("Creating archive with model graphics..") - system(paste("zip -9 -r models.zip *.pdf > /dev/null", sep="")) - system(paste("cp models.zip ", plot_archive, sep="")) -} - -# Save dataframe as tab separated file -logger("All done, saving data..") -header = c("Column1", "Column2", "Coefficient1", "Coefficient2", "Coefficient3", "Coefficient4", - "LeftLimit", "RightLimit", "Residuals", "pvalue", "Rsquared") -if (method != 'poly') - header = header[c(1:4, 7:11)] -write(progress, logfile) -write.table(m, file=out_file, sep="\t", quote=FALSE, col.names=header, row.names=FALSE)
--- a/__init__.py Thu Mar 19 12:10:19 2015 +0100 +++ b/__init__.py Thu Mar 19 12:13:13 2015 +0100 @@ -1,6 +0,0 @@ -''' -Module containing Galaxy tools for the LC or GC/MS pipeline -Created on Mar , 2014 - -@author: pieter lukasse -''' \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/static_resources/README.txt Thu Mar 19 12:13:13 2015 +0100 @@ -0,0 +1,3 @@ +This folder and respective files should be deployed together with the following tools: + + - ../export_to_metexp_tabular.py \ No newline at end of file
--- a/test/__init__.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -''' unit tests '''
--- a/test/integration_tests.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,268 +0,0 @@ -'''Integration tests for the GCMS project''' - -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -from GCMS import library_lookup, combine_output -from GCMS.rankfilter_GCMS import rankfilter -import os.path -import sys -import unittest -import re - - -class IntegrationTest(unittest.TestCase): - def test_library_lookup(self): - ''' - Run main for data/NIST_tabular and compare produced files with references determined earlier. - ''' - # Create out folder - outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup') - if not os.path.exists(outdir): - os.makedirs(outdir) - outfile_base = os.path.join(outdir, 'produced_library_lookup') - outfile_txt = outfile_base + '.txt' - - #Build up arguments and run - input_txt = resource_filename(__name__, "data/NIST_tabular.txt") - library = resource_filename(__name__, "data/RIDB_subset.txt") - regress_model = resource_filename(__name__, "data/ridb_poly_regression.txt") - sys.argv = ['test', - library, - input_txt, - 'Capillary', - 'Semi-standard non-polar', - outfile_txt, - 'HP-5', - regress_model] - # Execute main function with arguments provided through sys.argv - library_lookup.main() - #Compare with reference files - reference_txt = resource_filename(__name__, 'reference/produced_library_lookup.txt') - - #read both the reference file and actual output files - expected = _read_file(reference_txt) - actual = _read_file(outfile_txt) - - #convert the read in files to lists we can compare - expected = expected.split() - actual = actual.split() - - for exp, act in zip(expected, actual): - if re.match('\\d+\\.\\d+', exp): - exp = float(exp) - act = float(act) - self.assertAlmostEqual(exp, act, places=5) - else: - # compare values - self.failUnlessEqual(expected, actual) - - - def test_combine_output_simple(self): - ''' - Run main for data/NIST_tabular and compare produced files with references determined earlier. - ''' - # Create out folder - outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup') - if not os.path.exists(outdir): - os.makedirs(outdir) - outfile_base = os.path.join(outdir, 'produced_combine_output') - outfile_single_txt = outfile_base + '_single.txt' - outfile_multi_txt = outfile_base + '_multi.txt' - - #Build up arguments and run - input_rankfilter = resource_filename(__name__, "data/Rankfilter.txt") - input_caslookup = resource_filename(__name__, "data/Caslookup.txt") - sys.argv = ['test', - input_rankfilter, - input_caslookup, - outfile_single_txt, - outfile_multi_txt] - # Execute main function with arguments provided through sys.argv - combine_output.main() - #Compare with reference files - # reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt') - # reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt') - # self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt)) - # self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt)) - - #Clean up - #shutil.rmtree(tempdir) - - - - def def_test_rank_filter_advanced(self): - ''' - Run main of RankFilter - ''' - # Create out folder - outdir = "output/integration/" - if not os.path.exists(outdir): - os.makedirs(outdir) - - #Build up arguments and run - input_txt = resource_filename(__name__, "data/integration/RankFilterInput_conf.txt") - sys.argv = ['test', - input_txt] - # Execute main function with arguments provided through sys.argv - rankfilter.main() - #Compare with reference files - - def def_test_library_lookup_advanced(self): - ''' - Run main for data/NIST_tabular and compare produced files with references determined earlier. - ''' - # Create out folder - outdir = "output/integration/" - if not os.path.exists(outdir): - os.makedirs(outdir) - outfile_base = os.path.join(outdir, 'produced_library_lookup_ADVANCED') - outfile_txt = outfile_base + '.txt' - - #Build up arguments and run - input_txt = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt") - library = resource_filename(__name__, "../repositories/PRIMS-metabolomics/RI_DB_libraries/Library_RI_DB_capillary_columns-noDuplicates.txt") - regress_model = resource_filename(__name__, "data/integration/regression_MODEL_for_columns.txt") - sys.argv = ['test', - library, - input_txt, - 'Capillary', - 'Semi-standard non-polar', - outfile_txt, - 'DB-5', - regress_model] - # Execute main function with arguments provided through sys.argv - library_lookup.main() - - - - def test_combine_output_advanced(self): - ''' - Variant on test case above, but a bit more complex as some of the centrotypes have - different NIST hits which should give them different RI values. This test also - runs not only the combine output, but the other two preceding steps as well, - so it ensures the integration also works on the current code of all three tools. - ''' - - # Run RankFilter - self.def_test_rank_filter_advanced() - - # Run library CAS RI lookup - self.def_test_library_lookup_advanced() - - outdir = "output/integration/" - outfile_base = os.path.join(outdir, 'produced_combine_output') - outfile_single_txt = outfile_base + '_single.txt' - outfile_multi_txt = outfile_base + '_multi.txt' - - #Build up arguments and run - input_rankfilter = resource_filename(__name__, "output/integration/produced_rank_filter_out.txt") - input_caslookup = resource_filename(__name__, "output/integration/produced_library_lookup_ADVANCED.txt") - sys.argv = ['test', - input_rankfilter, - input_caslookup, - outfile_single_txt, - outfile_multi_txt] - # Execute main function with arguments provided through sys.argv - combine_output.main() - #Compare with reference files -# reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt') -# reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt') -# self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt)) -# self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt)) - - # Check 1: output single should have one record per centrotype: - - - # Check 2: output single has more records than output single: - combine_result_single_items = combine_output._process_data(outfile_single_txt) - combine_result_multi_items = combine_output._process_data(outfile_multi_txt) - self.assertGreater(len(combine_result_single_items['Centrotype']), - len(combine_result_multi_items['Centrotype'])) - - - # Check 3: library_lookup RI column, centrotype column, ri_svr column are correct: - caslookup_items = combine_output._process_data(input_caslookup) - rankfilter_items = combine_output._process_data(input_rankfilter) - - # check that the caslookup RI column is correctly maintained in its original order in - # the combined file: - ri_caslookup = caslookup_items['RI'] - ri_combine_single = combine_result_single_items['RI'] - self.assertListEqual(ri_caslookup, ri_combine_single) - - # check the centrotype column's integrity: - centrotype_caslookup = caslookup_items['Centrotype'] - centrotype_combine_single = combine_result_single_items['Centrotype'] - centrotype_rankfilter = _get_centrotype_rankfilter(rankfilter_items['ID']) - self.assertListEqual(centrotype_caslookup, centrotype_combine_single) - self.assertListEqual(centrotype_caslookup, centrotype_rankfilter) - - # integration and integrity checks: - file_NIST = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt") - file_NIST_items = combine_output._process_data(file_NIST) - # check that rank filter output has exactly the same ID items as the original NIST input file: - self.assertListEqual(file_NIST_items['ID'], rankfilter_items['ID']) - # check the same for the CAS column: - self.assertListEqual(_get_strippedcas(file_NIST_items['CAS']), rankfilter_items['CAS']) - # now check the NIST CAS column against the cas lookup results: - cas_NIST = _get_processedcas(file_NIST_items['CAS']) - self.assertListEqual(cas_NIST, caslookup_items['CAS']) - # now check the CAS of the combined result. If all checks are OK, it means the CAS column's order - # and values remained stable throughout all steps: - self.assertListEqual(rankfilter_items['CAS'], combine_result_single_items['CAS']) - - # check that the rankfilter RIsvr column is correctly maintained in its original order in - # the combined file: - risvr_rankfilter = rankfilter_items['RIsvr'] - risvr_combine_single = combine_result_single_items['RIsvr'] - self.assertListEqual(risvr_rankfilter, risvr_combine_single) - - - - -def _get_centrotype_rankfilter(id_list): - ''' - returns the list of centrotype ids given a list of ID in the - form e.g. 74-1.0-564-1905200-7, where the numbers before the - first "-" are the centrotype id - ''' - result = [] - for compound_id_idx in xrange(len(id_list)): - compound_id = id_list[compound_id_idx] - centrotype = compound_id.split('-')[0] - result.append(centrotype) - - return result - - -def _get_processedcas(cas_list): - ''' - returns the list cas numbers in the form C64175 instead of 64-17-5 - ''' - result = [] - for cas_id_idx in xrange(len(cas_list)): - cas = cas_list[cas_id_idx] - processed_cas = 'C' + str(cas.replace('-', '').strip()) - result.append(processed_cas) - - return result - -def _get_strippedcas(cas_list): - ''' - removes the leading white space from e.g. " 64-17-5" - ''' - result = [] - for cas_id_idx in xrange(len(cas_list)): - cas = cas_list[cas_id_idx] - processed_cas = cas.strip() - result.append(processed_cas) - - return result - - -def _read_file(filename): - ''' - Helper method to quickly read a file - @param filename: - ''' - with open(filename) as handle: - return handle.read()
--- a/test/test_combine_output.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,106 +0,0 @@ -''' -Created on Mar 27, 2012 - -@author: marcelk -''' -from GCMS import combine_output -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -import os -import shutil -import tempfile -import unittest - - -class Test(unittest.TestCase): - ''' - Tests for the 'combine_output' Galaxy tool - ''' - - def setUp(self): - self.rf_output = resource_filename(__name__, "data/RankFilter.txt") - self.cl_output = resource_filename(__name__, "data/CasLookup.txt") - - def test_process_data(self): - ''' - Tests the processing of the RankFilter and CasLookup files into dictionaries - ''' - rfdata = combine_output._process_data(self.rf_output) - cldata = combine_output._process_data(self.cl_output) - self.assertEqual(set([' 18457-04-0', ' 55133-95-4', ' 58-08-2', ' 112-34-5']), set(rfdata['CAS'])) - self.assertEqual(set(['C58082', 'C18457040', 'C55133954', 'C112345']), set(cldata['CAS'])) - - def test_add_hit(self): - ''' - Tests the combination of two records from both the RankFilter- and CasLookup-tools - ''' - rfdata = combine_output._process_data(self.rf_output) - cldata = combine_output._process_data(self.cl_output) - index = 0 - rf_record = dict(zip(rfdata.keys(), [rfdata[key][index] for key in rfdata.keys()])) - cl_record = dict(zip(cldata.keys(), [cldata[key][index] for key in cldata.keys()])) - - hit = combine_output._add_hit(rf_record, cl_record) - self.assertEqual(len(hit), 27) - - # Pass empty record, should fail combination - self.assertRaises(KeyError, combine_output._add_hit, rf_record, {}) - - def test_merge_data(self): - ''' - Tests the merging of the RankFilter and CasLookup data - ''' - rfdata = combine_output._process_data(self.rf_output) - cldata = combine_output._process_data(self.cl_output) - merged, _ = combine_output._merge_data(rfdata, cldata) - centrotypes = _get_centrotypes(merged) - self.failUnless(all(centrotype in centrotypes for centrotype in ('2716','12723', '3403', '12710'))) - -def _get_centrotypes(merged): - ''' - returns centrotype codes found in merged set - ''' - result = [] - for item_idx in xrange(len(merged)): - item = merged[item_idx] - centrotype = item[0][0] - result.append(centrotype) - - return result - - def test_remove_formula(self): - ''' - Tests the removal of the Formula from the 'Name' field (RankFilter output) - ''' - name = "Caffeine C8H10N4O2" - compound_name, compound_formula = combine_output._remove_formula(name) - self.assertEqual(compound_name, 'Caffeine') - self.assertEqual(compound_formula, 'C8H10N4O2') - name = "Ethanol C2H6O" - compound_name, compound_formula = combine_output._remove_formula(name) - self.assertEqual(compound_name, 'Ethanol') - self.assertEqual(compound_formula, 'C2H6O') - # No formula to remove - name = "Butanoic acid, 4-[(trimethylsilyl)oxy]-, trimethylsilyl ester" - compound_name, compound_formula = combine_output._remove_formula(name) - self.assertEqual(compound_name, name) - self.assertEqual(compound_formula, False) - - def test_save_data(self): - ''' - Tests the creation of the output tabular files (no content testing) - ''' - temp_folder = tempfile.mkdtemp(prefix='gcms_combine_output_') - saved_single_data = '{0}/{1}'.format(temp_folder, 'output_single.tsv') - saved_multi_data = '{0}/{1}'.format(temp_folder, 'output_multi.tsv') - rfdata = combine_output._process_data(self.rf_output) - cldata = combine_output._process_data(self.cl_output) - merged, nhits = combine_output._merge_data(rfdata, cldata) - combine_output._save_data(merged, nhits, saved_single_data, saved_multi_data) - self.failUnless(os.path.exists(saved_single_data)) - self.failUnless(os.path.exists(saved_multi_data)) - shutil.rmtree(temp_folder) - - -if __name__ == "__main__": - #import sys;sys.argv = ['', 'Test.testName'] - unittest.main()
--- a/test/test_export_to_metexp_tabular.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,112 +0,0 @@ -'''Integration tests for the GCMS project''' - -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -from GCMS import export_to_metexp_tabular -import os.path -import sys -import unittest - - -class IntegrationTest(unittest.TestCase): - - - def test_MM_calculations(self): - ''' - test the implemented method for MM calculations for - given chemical formulas - ''' - export_to_metexp_tabular.init_elements_and_masses_map() - - formula = "C8H18O3" - # should be = 12.01*8 + 1.01*18 + 16*3 = 162.26 - result = export_to_metexp_tabular.get_molecular_mass(formula) - self.assertEqual(162.26, result) - - formula = "CH2O3Fe2Ni" - # should be = 12.01*1 + 1.01*2 + 16*3 + 55.85*2 + 58.71 = 232.44 - result = export_to_metexp_tabular.get_molecular_mass(formula) - self.assertAlmostEqual(232.44, result, 2) - - - - - - def test_combine_output_simple(self): - ''' - comment me - ''' - # Create out folder - outdir = "output/metexp/" - if not os.path.exists(outdir): - os.makedirs(outdir) - - #Build up arguments and run - - rankfilter_and_caslookup_combined_file = resource_filename(__name__, "data/dummy1_produced_combine_output_single.txt") - msclust_quantification_and_spectra_file = resource_filename(__name__, "data/dummy1_sim.txt") - output_csv = resource_filename(__name__, outdir + "metexp_tabular.txt") - - sys.argv = ['test', - rankfilter_and_caslookup_combined_file, - msclust_quantification_and_spectra_file, - output_csv, - 'tomato', - 'leafs', - 'test experiment', - 'pieter', - 'DB5 column'] - - # Execute main function with arguments provided through sys.argv - export_to_metexp_tabular.main() - - ''' - # Asserts are based on reading in with process_data and comparing values of - # certain columns - - # Check 3: library_lookup RI column, centrotype column, ri_svr column are correct: - caslookup_items = combine_output._process_data(input_caslookup) - rankfilter_items = combine_output._process_data(input_rankfilter) - - # check that the caslookup RI column is correctly maintained in its original order in - # the combined file: - ri_caslookup = caslookup_items['RI'] - ri_combine_single = combine_result_single_items['RI'] - self.assertListEqual(ri_caslookup, ri_combine_single) - - # check the centrotype column's integrity: - centrotype_caslookup = caslookup_items['Centrotype'] - centrotype_combine_single = combine_result_single_items['Centrotype'] - centrotype_rankfilter = _get_centrotype_rankfilter(rankfilter_items['ID']) - self.assertListEqual(centrotype_caslookup, centrotype_combine_single) - self.assertListEqual(centrotype_caslookup, centrotype_rankfilter) - - # integration and integrity checks: - file_NIST = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt") - file_NIST_items = combine_output._process_data(file_NIST) - # check that rank filter output has exactly the same ID items as the original NIST input file: - self.assertListEqual(file_NIST_items['ID'], rankfilter_items['ID']) - # check the same for the CAS column: - self.assertListEqual(_get_strippedcas(file_NIST_items['CAS']), rankfilter_items['CAS']) - # now check the NIST CAS column against the cas lookup results: - cas_NIST = _get_processedcas(file_NIST_items['CAS']) - self.assertListEqual(cas_NIST, caslookup_items['CAS']) - # now check the CAS of the combined result. If all checks are OK, it means the CAS column's order - # and values remained stable throughout all steps: - self.assertListEqual(rankfilter_items['CAS'], combine_result_single_items['CAS']) - - # check that the rankfilter RIsvr column is correctly maintained in its original order in - # the combined file: - risvr_rankfilter = rankfilter_items['RIsvr'] - risvr_combine_single = combine_result_single_items['RIsvr'] - self.assertListEqual(risvr_rankfilter, risvr_combine_single) - ''' - - - -def _read_file(filename): - ''' - Helper method to quickly read a file - @param filename: - ''' - with open(filename) as handle: - return handle.read()
--- a/test/test_library_lookup.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,180 +0,0 @@ -''' -Created on Mar 6, 2012 - -@author: marcelk -''' -from GCMS import library_lookup, match_library -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -import os -import shutil -import tempfile -import unittest - - -class Test(unittest.TestCase): - ''' - Tests the 'library_lookup' Galaxy tool - ''' - - def setUp(self): - self.ri_database = resource_filename(__name__, "data/RIDB_subset.txt") - self.nist_output = resource_filename(__name__, "data/NIST_tabular.txt") - self.ridb_poly_regress = resource_filename(__name__, "data/ridb_poly_regression.txt") - self.ridb_linear_regress = resource_filename(__name__, "data/ridb_linear_regression.txt") - - def test_create_lookup_table(self): - ''' - Tests the 'create_lookup_table' function - ''' - column_type = 'Capillary' - polarity = 'Semi-standard non-polar' - lookup_dict = library_lookup.create_lookup_table(self.ri_database, column_type, polarity) - self.assertFalse(False in [res[4] == 'Capillary' for res in lookup_dict['4177166']]) - self.assertEqual(['C51276336', '2,6-Dimethyl-octa-1,7-dien-3,6-diol', 'C10H18O2', - '1277', 'Capillary', 'Semi-standard non-polar', 'DB-5MS', '1', - 'C51276336_DB-5MS', '', '', ''], lookup_dict['51276336'][1]) - - def test_read_model(self): - ''' - Tests reading the regression model data containing the parameters required for converting - retention indices between GC-columns - ''' - model, _ = library_lookup._read_model(self.ridb_poly_regress) - # Order of values: coefficient 1 through 4, left limit, right limit - # Polynomial model - self.assertEqual([20.6155874639486, 0.945187096379008, 3.96480787567566e-05, -9.04377237159287e-09, - 628.0, 2944.0, 405.0, 0, 0.998685262365514], model['HP-5']['SE-54']) - self.assertEqual([-92.3963391356951, 1.26116176393346, -0.000191991657547972, 4.15387371263164e-08, - 494.0, 2198.0, 407.0, 0, 0.996665023122993], model['Apiezon L']['Squalane']) - # Linear model - model, _ = library_lookup._read_model(self.ridb_linear_regress) - self.assertEqual([2.81208738561543, 0.99482475526584, 628.0, 2944.0, 405.0, 0, 0.998643883946458], - model['HP-5']['SE-54']) - self.assertEqual([19.979922768462, 0.993741869298272, 494.0, 2198.0, 407.0, 0, 0.99636062891041], - model['Apiezon L']['Squalane']) - - def test_apply_regression(self): - ''' - Tests the regression model on some arbitrary retention indices - ''' - poly_model, _ = library_lookup._read_model(self.ridb_poly_regress) - linear_model, _ = library_lookup._read_model(self.ridb_linear_regress) - retention_indices = [1000, 1010, 1020, 1030, 1040, 1050] - converted_poly = [] - converted_linear = [] - for ri in retention_indices: - converted_poly.append(library_lookup._apply_poly_regression('HP-5', 'DB-5', ri, poly_model)) - converted_linear.append(library_lookup._apply_linear_regression('HP-5', 'DB-5', ri, linear_model)) - - self.assertEqual([1003.0566541860778, 1013.0979459524663, 1023.1358645806529, 1033.170466241159, - 1043.2018071045052, 1053.2299433412131], converted_poly) - self.assertEqual([1001.8127584915925, 1011.830140783027, 1021.8475230744615, 1031.864905365896, - 1041.8822876573306, 1051.899669948765], converted_linear) - - # Test polynomial limit detection, the following RI falls outside of the possible limits - ri = 3400 - converted_poly = library_lookup._apply_poly_regression('HP-5', 'DB-5', ri, poly_model) - self.assertEqual(False, converted_poly) - - def test_preferred_hit(self): - ''' Tests the matching of the hits with the preferred column, including regression ''' - model, method = library_lookup._read_model(self.ridb_poly_regress) - column_type = 'Capillary' - polarity = 'Semi-standard non-polar' - lookup_dict = library_lookup.create_lookup_table(self.ri_database, column_type, polarity) - hits = lookup_dict['150867'] - # No regression, should however consider order of given preference - match = library_lookup._preferred(hits, ['SE-52', 'DB-5', 'HP-5'], column_type, polarity, model, method) - expected = (['C150867', '(E)-phytol', 'C20H40O', '2110', 'Capillary', - 'Semi-standard non-polar', 'SE-52', '', 'C150867_SE-52', '', '', ''], False) - self.assertEqual(expected, match) - - # Perform regression by looking for 'OV-101' which isn't there. 'SE-52' has the best regression model - # of the available columns - match = library_lookup._preferred(hits, ['OV-101'], column_type, polarity, model, method) - expected = (['C150867', '(E)-phytol', 'C20H40O', 2158.5769891569125, 'Capillary', - 'Semi-standard non-polar', 'SE-52', '', 'C150867_SE-52', '', '', ''], 'SE-52') - self.assertEqual(expected, match) - - def test_format_result(self): - ''' - Tests the 'format_result' function - ''' - column_type = 'Capillary' - polarity = 'Semi-standard non-polar' - - # Look for DB-5 - pref_column = ['DB-5'] - model, method = library_lookup._read_model(self.ridb_poly_regress) - lookup_dict = library_lookup.create_lookup_table(self.ri_database, column_type, polarity) - data = library_lookup.format_result(lookup_dict, self.nist_output, pref_column, column_type, - polarity, model, method)#False, None) - - # remove non-hits from set: - data = _get_hits_only(data) - self.assertEqual(['C544354', 'Ethyl linoleate', 'C20H36O2', '2155', 'Capillary', 'Semi-standard non-polar', - 'DB-5', '1', 'C544354_DB-5', '1810', 'None', '', '', '0'], data[20]) - self.assertEqual(111, len(data)) - - # Look for both DB-5 and HP-5 - pref_column = ['DB-5', 'HP-5'] - data = library_lookup.format_result(lookup_dict, self.nist_output, pref_column, column_type, - polarity, False, None) - # remove non-hits from set: - data = _get_hits_only(data) - self.assertEqual(['C502614', '.beta.-(E)-Farnesene', 'C15H24', '1508', 'Capillary', 'Semi-standard non-polar', - 'DB-5', '1', 'C502614_DB-5', '942', 'None', '1482', '1522', '22'], data[50]) - self.assertEqual(106, len(data)) - - - def test_save_data(self): - ''' - Tests the creation of the output tabular file - ''' - temp_folder = tempfile.mkdtemp(prefix='gcms_combine_output_') - saved_data = '{0}/{1}'.format(temp_folder, 'output.tsv') - column_type = 'Capillary' - polarity = 'Semi-standard non-polar' - pref_column = ['DB-5'] - lookup_dict = library_lookup.create_lookup_table(self.ri_database, column_type, polarity) - data = library_lookup.format_result(lookup_dict, self.nist_output, pref_column, column_type, polarity, False, None) - library_lookup._save_data(data, saved_data) - self.failUnless(os.path.exists(saved_data)) - shutil.rmtree(temp_folder) - - - def test_match_library_get_lib_files(self): - ''' - Tests the match_library.py functionality - ''' - riqc_libs_dir = resource_filename(__name__, "../repositories/PRIMS-metabolomics/RI_DB_libraries") - get_library_files_output = match_library.get_directory_files(riqc_libs_dir) - self.assertEqual(2, len(get_library_files_output)) - self.assertEqual("Library_RI_DB_capillary_columns-noDuplicates", get_library_files_output[0][0]) - #TODO change assert below to assert that the result is a file, so the test can run on other dirs as well: - #self.assertEqual("E:\\workspace\\PRIMS-metabolomics\\python-tools\\tools\\GCMS\\test\\data\\riqc_libs\\RI DB library (capillary columns) Dec.2012.txt", get_library_files_output[0][1]) - #self.assertEqual("RI DB library (capillary columns) Jan.2013", get_library_files_output[1][0]) - try: - get_library_files_output = match_library.get_directory_files("/blah") - # should not come here - self.assertTrue(False) - except: - # should come here - self.assertTrue(True) - -def _get_hits_only(data): - ''' - removes items that have RI == 0.0 and Name == '' (these are dummy lines just for the output - ''' - result = [] - for item_idx in xrange(len(data)): - item = data[item_idx] - if item[1] != '' and item[3] > 0.0 : - result.append(item) - - return result - - -if __name__ == "__main__": - #import sys;sys.argv = ['', 'Test.testName'] - unittest.main()
--- a/test/test_match_library.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,51 +0,0 @@ -''' -Created on Mar 6, 2012 - -@author: marcelk -''' -from GCMS import match_library -import unittest -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 - - -class Test(unittest.TestCase): - ''' - Tests the 'match_library' Galaxy tool - ''' - nist_db = resource_filename(__name__, "data/RIDB_subset.txt") - - def test_get_column_type(self): - ''' - Tests the 'get_column_type' function that returns tuples of unique elements - for column types in the RI database - ''' - galaxy_output = match_library.get_column_type(self.nist_db) - self.assertEqual([('Capillary(9999)', 'Capillary', False)], galaxy_output) - - def test_filter_column(self): - ''' - Tests the 'filter_column' function showing the column phase for all 'Capillary' columns - ''' - galaxy_output = match_library.filter_column(self.nist_db, 'Capillary') - self.assertEqual([('Semi-standard non-polar(9999)', 'Semi-standard non-polar', False)], galaxy_output) - - def test_filter_column2(self): - ''' - Tests the 'filter_column' function showing all possibilities for columns having both the - 'Capillary' and 'Semi-standard non-polar' properties - ''' - galaxy_output = match_library.filter_column2(self.nist_db, 'Capillary', 'Semi-standard non-polar') - self.failUnless(('Apiezon M(6)', 'Apiezon M', False) in galaxy_output) - - def test_count_occurrence(self): - ''' - Tests the 'count_occurrence' function - ''' - test_list = [2, 0, 0, 2, 1, 3, 4, 5, 3, 2, 3, 4, 5, 5, 4, 2, 5, 3, 4, 3, 5, 4, 2, 0, 4] - counts = match_library.count_occurrence(test_list) - self.assertEqual({0: 3, 1: 1, 2: 5, 3: 5, 4: 6, 5: 5}, counts) - - -if __name__ == "__main__": - #import sys;sys.argv = ['', 'Test.testName'] - unittest.main()
--- a/test/test_query_mass_repos.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,62 +0,0 @@ -'''Integration tests for the GCMS project''' - -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -from MS import query_mass_repos -import os.path -import sys -import unittest - - -class IntegrationTest(unittest.TestCase): - - - - - def test_simple(self): - ''' - Simple initial test - ''' - # Create out folder - outdir = "output/query_mass_repos/" - if not os.path.exists(outdir): - os.makedirs(outdir) - - #Build up arguments and run - - # input_file = sys.argv[1] - # molecular_mass_col = sys.argv[2] - # repository_file = sys.argv[3] - # mass_tolerance = float(sys.argv[4]) - # output_result = sys.argv[5] - - input_file = resource_filename(__name__, "data/service_query_tabular.txt") - - molecular_mass_col = "mass (Da)" - dblink_file = resource_filename(__name__, "data/MFSearcher ExactMassDB service.txt") - output_result = resource_filename(__name__, outdir + "metexp_query_results_added.txt") - - - - - sys.argv = ['test', - input_file, - molecular_mass_col, - dblink_file, - '0.001', - 'ms', - output_result] - - # Execute main function with arguments provided through sys.argv - query_mass_repos.main() - - - - - -def _read_file(filename): - ''' - Helper method to quickly read a file - @param filename: - ''' - with open(filename) as handle: - return handle.read()
--- a/test/test_query_metexp.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,79 +0,0 @@ -'''Integration tests for the GCMS project''' - -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -from GCMS import query_metexp -import os.path -import sys -import unittest - - -class IntegrationTest(unittest.TestCase): - - -# def test_MM_calculations(self): -# ''' -# test the implemented method for MM calculations for -# given chemical formulas -# ''' -# export_to_metexp_tabular.init_elements_and_masses_map() -# -# formula = "C8H18O3" -# # should be = 12.01*8 + 1.01*18 + 16*3 = 162.26 -# result = export_to_metexp_tabular.get_molecular_mass(formula) -# self.assertEqual(162.26, result) -# -# formula = "CH2O3Fe2Ni" -# # should be = 12.01*1 + 1.01*2 + 16*3 + 55.85*2 + 58.71 = 232.44 -# result = export_to_metexp_tabular.get_molecular_mass(formula) -# self.assertAlmostEqual(232.44, result, 2) -# -# -# - - - def test_simple(self): - ''' - Simple initial test - ''' - # Create out folder - outdir = "output/metexp_query/" - if not os.path.exists(outdir): - os.makedirs(outdir) - - #Build up arguments and run - - # input_file = sys.argv[1] - # molecular_mass_col = sys.argv[2] - # formula_col = sys.argv[3] - # metexp_dblink_file = sys.argv[4] - # output_result = sys.argv[5] - - input_file = resource_filename(__name__, "data/metexp_query_tabular.txt") - casid_col = "CAS" - formula_col = "FORMULA" - molecular_mass_col = "MM" - metexp_dblink_file = resource_filename(__name__, "data/METEXP Test DB.txt") - output_result = resource_filename(__name__, outdir + "metexp_query_results_added.txt") - - sys.argv = ['test', - input_file, - casid_col, - formula_col, - molecular_mass_col, - metexp_dblink_file, - 'GC', - output_result] - - # Execute main function with arguments provided through sys.argv - query_metexp.main() - - - - -def _read_file(filename): - ''' - Helper method to quickly read a file - @param filename: - ''' - with open(filename) as handle: - return handle.read()
--- a/test/test_query_metexp_LARGE.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,79 +0,0 @@ -'''Integration tests for the GCMS project''' - -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -from GCMS import query_metexp -import os.path -import sys -import unittest - - -class IntegrationTest(unittest.TestCase): - - -# def test_MM_calculations(self): -# ''' -# test the implemented method for MM calculations for -# given chemical formulas -# ''' -# export_to_metexp_tabular.init_elements_and_masses_map() -# -# formula = "C8H18O3" -# # should be = 12.01*8 + 1.01*18 + 16*3 = 162.26 -# result = export_to_metexp_tabular.get_molecular_mass(formula) -# self.assertEqual(162.26, result) -# -# formula = "CH2O3Fe2Ni" -# # should be = 12.01*1 + 1.01*2 + 16*3 + 55.85*2 + 58.71 = 232.44 -# result = export_to_metexp_tabular.get_molecular_mass(formula) -# self.assertAlmostEqual(232.44, result, 2) -# -# -# - - - def test_large(self): - ''' - Simple test, but on larger set, last test executed in 28s - ''' - # Create out folder - outdir = "output/metexp_query/" - if not os.path.exists(outdir): - os.makedirs(outdir) - - #Build up arguments and run - - # input_file = sys.argv[1] - # molecular_mass_col = sys.argv[2] - # formula_col = sys.argv[3] - # metexp_dblink_file = sys.argv[4] - # output_result = sys.argv[5] - - input_file = resource_filename(__name__, "data/metexp_query_tabular_large.txt") - casid_col = "CAS" - formula_col = "FORMULA" - molecular_mass_col = "MM" - metexp_dblink_file = resource_filename(__name__, "data/METEXP Test DB.txt") - output_result = resource_filename(__name__, outdir + "metexp_query_results_added_LARGE.txt") - - sys.argv = ['test', - input_file, - casid_col, - formula_col, - molecular_mass_col, - metexp_dblink_file, - 'GC', - output_result] - - # Execute main function with arguments provided through sys.argv - query_metexp.main() - - - - -def _read_file(filename): - ''' - Helper method to quickly read a file - @param filename: - ''' - with open(filename) as handle: - return handle.read()