diff w4mkmeans_routines.R @ 2:c415b7dc6f37 draft default tip

planemo upload for repository https://github.com/HegemanLab/w4mkmeans_galaxy_wrapper/tree/master commit 3e916537da6bb37e6f3927d7a11e98e0ab6ef5ec
author eschen42
date Mon, 05 Mar 2018 12:40:17 -0500
parents 02cafb660b72
children
line wrap: on
line diff
--- a/w4mkmeans_routines.R	Wed Aug 09 18:06:55 2017 -0400
+++ b/w4mkmeans_routines.R	Mon Mar 05 12:40:17 2018 -0500
@@ -4,11 +4,11 @@
 
 library(parallel)
 
-w4kmeans_usage <- function() {
-  return ( 
+w4mkmeans_usage <- function() {
+  return (
     c(
      "w4mkmeans: bad input.",
-     "# contract:",
+     "  contract:",
      "    required - caller will provide an environment comprising:",
      "      log_print          - a logging function with the signature function(x, ...) expecting strings as x and ...",
      "      variableMetadata   - the corresponding W4M data.frame having feature metadata",
@@ -18,8 +18,8 @@
      "    optional - environment may comprise:",
      "      kfeatures          - an array of integers, the k's to apply for clustering by feature (default, empty array)",
      "      ksamples           - an array of integers, the k's to apply for clustering by sample (default, empty array)",
-     "      iter.max           - the maximum number of iterations when calculating a cluster (default = 10)",
-     "      nstart             - how many random sets of centers should be chosen (default = 1)",
+     "      iter_max           - the maximum number of iterations when calculating a cluster (default = 20)",
+     "      nstart             - how many random sets of centers should be chosen (default = 20)",
      "      algorithm          - string from c('Hartigan-Wong', 'Lloyd', 'Forgy', 'MacQueen') (default = Hartigan-Wong)",
      "      categorical_prefix - string from c('Hartigan-Wong', 'Lloyd', 'Forgy', 'MacQueen') (default = Hartigan-Wong)",
      "      ",
@@ -35,13 +35,15 @@
 w4mkmeans <- function(env) {
   # abort if 'env' is null or is not an environment
   if ( is.null(env) || ! is.environment(env) ) {
-    lapply(w4kmeans_usage(),print)
-  } 
+    lapply(w4mkmeans_usage(),print)
+  }
+  # extract parameters from 'env'
+  log_action  <- env$log_print
   # supply default arguments
-  if ( ! exists("iter.max"          , env) ) env$iter.max  <- 10
-  if ( ! exists("nstart"            , env) ) env$nstart    <- 1
+  if ( ! exists("iter_max"          , env) ) env$iter_max  <- 20
+  if ( ! exists("nstart"            , env) ) env$nstart    <- 20
   if ( ! exists("algorithm"         , env) ) env$algorithm <- 'Hartigan-Wong'
-  if ( ! exists("categorical_prefix", env) ) env$categorical_prefix <- 'k'
+  if ( ! exists("categorical_prefix", env) ) env$categorical_prefix <- 'c'
   if ( ! exists("ksamples"          , env) ) env$ksamples  <- c()
   if ( ! exists("kfeatures"         , env) ) env$kfeatures <- c()
   # check mandatory arguments
@@ -55,11 +57,11 @@
   missing_from_env <- setdiff(expected, (ls(env)))
   if ( length(missing_from_env) > 0 ) {
     print(paste(c('expected environment members not found: ', as.character(missing_from_env)), collapse = ", "))
-    lapply(w4kmeans_usage(),print)
+    lapply(w4mkmeans_usage(),log_action)
     stop("w4mkmeans: contract has been broken")
-  } 
+  }
   # extract parameters from 'env'
-  failure_action  <- env$log_print
+  log_action  <- env$log_print
   scores          <- c( "clusterOn\tk\ttotalSS\tbetweenSS\tproportion" )
   sampleMetadata  <- env$sampleMetadata
   featureMetadata <- env$variableMetadata
@@ -70,39 +72,79 @@
     i <- i[i > 0]         # eliminate non-positive integers
     i <- unique(sort(i))  # eliminate redundancy and disorder
     if (length(a)!=length(i)) {
-      failure_action("Some values for '", what, "' were skipped where not unique, not positive, or not convertible to an integer.")
+      log_action("Some values for '", what, "' were skipped where not unique, not positive, or not convertible to an integer.")
     }
     return (i)            # return results, if any
   }
   ksamples        <- positive_ints(env$ksamples , "ksamples")
   kfeatures       <- positive_ints(env$kfeatures, "kfeatures")
 
+  log_action("w4mkmeans: preparing data matrix")
+  # prepare data matrix (normalize, eliminate zero-variance rows, etc.; no transformation)
+  dm_en <- new.env()
+  dm_en$log <- c()
+  preparation_result <- tryCatchFunc(function(){
+    dm <- prepare.data.matrix(
+            x.matrix = env$dataMatrix
+          , data.transformation = function(x) { x }
+          , en = dm_en
+          )
+    my_log <- dm_en$log
+    for ( i in 1:length(my_log) ) {
+      log_action(my_log[i])
+    }
+    dm
+  })
+  if ( !preparation_result$success ) {
+    postmortem <- paste("prepare.data.matrix failed:", preparation_result$msg)
+    log_action(postmortem)
+    stop(postmortem)
+  }
+
+  env$preparedDataMatrix <- preparation_result$value
+
+  log_action("w4mkmeans: determining evaluation mode")
+
   myLapply <- parLapply
-  # uncomment the next line to mimic parLapply, but without parallelization (for testing/experimentation)
-  # myLapply <- function(cl, ...) lapply(...)
   cl <- NULL
+  tryCatch(
+    expr = {
+      outfile <- ""
+      # outfile: Where to direct the stdout and stderr connection output from the workers.
+      #   - "" indicates no redirection (which may only be useful for workers on the local machine).
+      #   - Defaults to ‘/dev/null’ (‘nul:’ on Windows).
+      #   - The other possibility is a file path on the worker's host.
+      #     - Files will be opened in append mode, as all workers log to the same file.
+      cl <- makePSOCKcluster( names = slots, outfile = outfile )
+    }
+    , error = function(e) {
+      log_action(sprintf("w4kmeans: falling back to serial evaluation because makePSOCKcluster(names = %d) threw an exception", slots))
+      # mimic parLapply, but without parallelization (as a last resort)
+      myLapply <<- function(cl, ...) lapply(...)
+    }
+  )
   if ( identical(myLapply, parLapply) ) {
-    failure_action(sprintf("w4mkmeans: using parallel evaluation with %d slots", slots))
-    failure_action(names(cl))
-    cl <- makePSOCKcluster(names = slots)
+    log_action(sprintf("w4mkmeans: using parallel evaluation with %d slots", slots))
     # from ?makePSOCKcluster: "It is good practice to shut down the workers by calling stopCluster."
+    # note varlist: names of references must be passed to the cluster so that they can be resolved
     clusterExport(
       cl = cl
     , varlist = c(
-        "tryCatchFunc"
-      , "calc_kmeans_one_dimension_one_k"
-      , "prepare.data.matrix"
+        "tryCatchFunc"                     # required by calc_kmeans_one_dimension_one_k
+      , "format_error"                     # required by tryCatchFunc when errors are caught
+      , "iso_date"                         # required by log_print
+      , "log_print"                        # required by calc_kmeans_one_dimension_one_k
       )
     )
     final <- function(cl) {
       # from ?makePSOCKcluster: "It is good practice to shut down the workers by calling stopCluster."
       if ( !is.null(cl) ) {
-        failure_action("w4mkmeans: stopping cluster used for parallel evaluation")
+        log_action("w4mkmeans: stopping cluster used for parallel evaluation")
         stopCluster(cl)
       }
     }
   } else {
-    failure_action("w4mkmeans: using sequential evaluation (1 slot)")
+    log_action("w4mkmeans: using sequential evaluation (one slot)")
     final <- function(cl) { }
   }
 
@@ -115,7 +157,7 @@
       # for each $i in ksamples, append column 'k$i' to data frame sampleMetadata
       ksamples_length <- length(ksamples)
       if ( ksamples_length > 0 ) {
-        smpl_result_list <- myLapply( 
+        smpl_result_list <- myLapply(
             cl = cl
           , ksamples
           , calc_kmeans_one_dimension_one_k
@@ -134,7 +176,7 @@
       # for each $i in kfeatures, append column 'k$i' to data frame featureMetadata
       kfeatures_length <- length(kfeatures)
       if ( kfeatures_length > 0 ) {
-        feat_result_list <- myLapply( 
+        feat_result_list <- myLapply(
             cl = cl
           , kfeatures
           , calc_kmeans_one_dimension_one_k
@@ -150,22 +192,24 @@
         }
       }
 
-      return ( 
+      return (
         list(
           variableMetadata = featureMetadata
-        , sampleMetadata   = sampleMetadata  
-        , scores           = scores          
+        , sampleMetadata   = sampleMetadata
+        , scores           = scores
         )
       )
     }
-  , finally = final(cl)
+  , finally = {
+      final(cl)
+    }
   )
 }
 
 # calculate k-means for features or samples
 #   - recall that the dataMatrix has features in rows and samples in columns
 # return value:
-#   list(clusters = km$cluster, scores = scores) 
+#   list(clusters = km$cluster, scores = scores)
 # arguments:
 #   env:
 #     environment having dataMatrix
@@ -179,40 +223,64 @@
   # abort if environment is not as expected
   if ( is.null(env) || ! is.environment(env) ) {
     stop("calc_kmeans_one_dimension_one_k - argument 'env' is not an environment")
-  } 
+  }
   if ( ! exists("log_print", env) || ! is.function(env$log_print) ) {
     stop("calc_kmeans_one_dimension_one_k - argument 'env' - environment does not include log_print or it is not a function")
-  } 
+  }
+  log_action  <- env$log_print
   # abort if k is not as expected
   if ( ! is.numeric(k) ) {
     stop(sprintf("calc_kmeans_one_dimension_one_k - expected numeric argument 'k' but type is %s", typeof(k)))
-  } 
+  }
   k <- as.integer(k)
   # abort if dimension is not as expected
-  if (   ! is.character(dimension) 
+  if (   ! is.character(dimension)
       || ! Reduce( f =`|`, x = sapply(X = c("features","samples"), FUN = `==`, dimension), init = FALSE) ) {
     stop("calc_kmeans_one_dimension_one_k - argument 'dimension' is neither 'features' nor 'samples'")
-  } 
-  dm           <- env$dataMatrix
-  iter.max     <- env$iter.max
+  }
+  dm           <- env$preparedDataMatrix
+  iter_max     <- env$iter_max
   nstart       <- env$nstart
   algorithm    <- env$algorithm
   dim_features <- dimension == "features"
+
   # tryCatchFunc produces a list
-  #   On success of expr(), tryCatchFunc produces
-  #     list(success TRUE, value = expr(), msg = "")
-  #   On failure of expr(), tryCatchFunc produces
+  #   On success of func(), tryCatchFunc produces
+  #     list(success = TRUE, value = func(), msg = "")
+  #   On failure of func(), tryCatchFunc produces
   #     list(success = FALSE, value = NA, msg = "the error message")
-  result_list <- tryCatchFunc( expr = function() {
+  result_list <- tryCatchFunc( func = function() {
     # kmeans clusters the rows; features are the columns of args_env$dataMatrix; samples, the rows
     # - to calculate sample-clusters, no transposition is needed because samples are rows
     # - to calculate feature-clusters, transposition is needed so that features will be the rows
-    if ( ! dim_features ) dm <- t(dm)
-    dm <- prepare.data.matrix( x.matrix = dm, data.transformation = function(x) { x } )
+    if ( ! dim_features ) {
+      dm <- t(dm)
+    }
+
     # need to set.seed to get reproducible results from kmeans
     set.seed(4567)
+
     # do the k-means clustering
-    km <- kmeans( x = dm, centers = k, iter.max, nstart = nstart, algorithm = algorithm )
+    withCallingHandlers(
+      {
+        km <<- kmeans( x = dm, centers = k, iter.max = iter_max, nstart = nstart, algorithm = algorithm )
+      }
+    , warning = function(w) {
+        lw <- list(w)
+        smplwrn <- as.character(w[[1]])
+        log_print(
+            sprintf( "Warning for %s: center = %d, nstart = %d, iter_max = %d: %s"
+          , if (dim_features) "features" else "samples"
+          , k
+          , nstart
+          , iter_max
+          , smplwrn
+          )
+        )
+      }
+    )
+
+    # collect the scores
     scores <-
       sprintf("%s\t%d\t%0.5e\t%0.5e\t%0.5f"
              , dimension
@@ -221,8 +289,16 @@
              , km$betweenss
              , km$betweenss/km$totss
              )
+
+    # return list of results
     list(clusters = km$cluster, scores = scores)
   })
+
+  # return either
+  #     list(success = TRUE, value = func(), msg = "")
+  # or
+  #     list(success = FALSE, value = NA, msg = "the error message")
   return ( result_list )
 }
 
+# vim: sw=2 ts=2 et :