view w4mkmeans_routines.R @ 2:c415b7dc6f37 draft default tip

planemo upload for repository https://github.com/HegemanLab/w4mkmeans_galaxy_wrapper/tree/master commit 3e916537da6bb37e6f3927d7a11e98e0ab6ef5ec
author eschen42
date Mon, 05 Mar 2018 12:40:17 -0500
parents 02cafb660b72
children
line wrap: on
line source

##------------------------------------------------------------------------------------------------------
## these are the batch-independent and file-structure-independent routines to support the w4mkmeans tool
##------------------------------------------------------------------------------------------------------

library(parallel)

w4mkmeans_usage <- function() {
  return (
    c(
     "w4mkmeans: bad input.",
     "  contract:",
     "    required - caller will provide an environment comprising:",
     "      log_print          - a logging function with the signature function(x, ...) expecting strings as x and ...",
     "      variableMetadata   - the corresponding W4M data.frame having feature metadata",
     "      sampleMetdata      - the corresponding W4M data.frame having sample metadata",
     "      dataMatrix         - the corresponding W4M matrix",
     "      slots              - the number of parallel slots for calculating kmeans",
     "    optional - environment may comprise:",
     "      kfeatures          - an array of integers, the k's to apply for clustering by feature (default, empty array)",
     "      ksamples           - an array of integers, the k's to apply for clustering by sample (default, empty array)",
     "      iter_max           - the maximum number of iterations when calculating a cluster (default = 20)",
     "      nstart             - how many random sets of centers should be chosen (default = 20)",
     "      algorithm          - string from c('Hartigan-Wong', 'Lloyd', 'Forgy', 'MacQueen') (default = Hartigan-Wong)",
     "      categorical_prefix - string from c('Hartigan-Wong', 'Lloyd', 'Forgy', 'MacQueen') (default = Hartigan-Wong)",
     "      ",
     "    this routine will return a list comprising:",
     "      variableMetadata   - the input variableMetadata data.frame with updates, if any",
     "      sampleMetadata     - the input sampleMetadata data.frame with updates, if any",
     "      scores             - an array of strings, each representing a line of a tsv having the following header:",
     "                             clusterOn TAB k TAB totalSS TAB betweenSS TAB proportion"
    )
  )
}

w4mkmeans <- function(env) {
  # abort if 'env' is null or is not an environment
  if ( is.null(env) || ! is.environment(env) ) {
    lapply(w4mkmeans_usage(),print)
  }
  # extract parameters from 'env'
  log_action  <- env$log_print
  # supply default arguments
  if ( ! exists("iter_max"          , env) ) env$iter_max  <- 20
  if ( ! exists("nstart"            , env) ) env$nstart    <- 20
  if ( ! exists("algorithm"         , env) ) env$algorithm <- 'Hartigan-Wong'
  if ( ! exists("categorical_prefix", env) ) env$categorical_prefix <- 'c'
  if ( ! exists("ksamples"          , env) ) env$ksamples  <- c()
  if ( ! exists("kfeatures"         , env) ) env$kfeatures <- c()
  # check mandatory arguments
  expected <- c(
    "log_print"
  , "variableMetadata"
  , "sampleMetadata"
  , "dataMatrix"
  , "slots"
  )
  missing_from_env <- setdiff(expected, (ls(env)))
  if ( length(missing_from_env) > 0 ) {
    print(paste(c('expected environment members not found: ', as.character(missing_from_env)), collapse = ", "))
    lapply(w4mkmeans_usage(),log_action)
    stop("w4mkmeans: contract has been broken")
  }
  # extract parameters from 'env'
  log_action  <- env$log_print
  scores          <- c( "clusterOn\tk\ttotalSS\tbetweenSS\tproportion" )
  sampleMetadata  <- env$sampleMetadata
  featureMetadata <- env$variableMetadata
  slots           <- env$slots
  positive_ints <- function(a, what) {
    i <- as.integer(a)    # may introduce NAs by coercion
    i <- i[!is.na(i)]     # eliminate NAs
    i <- i[i > 0]         # eliminate non-positive integers
    i <- unique(sort(i))  # eliminate redundancy and disorder
    if (length(a)!=length(i)) {
      log_action("Some values for '", what, "' were skipped where not unique, not positive, or not convertible to an integer.")
    }
    return (i)            # return results, if any
  }
  ksamples        <- positive_ints(env$ksamples , "ksamples")
  kfeatures       <- positive_ints(env$kfeatures, "kfeatures")

  log_action("w4mkmeans: preparing data matrix")
  # prepare data matrix (normalize, eliminate zero-variance rows, etc.; no transformation)
  dm_en <- new.env()
  dm_en$log <- c()
  preparation_result <- tryCatchFunc(function(){
    dm <- prepare.data.matrix(
            x.matrix = env$dataMatrix
          , data.transformation = function(x) { x }
          , en = dm_en
          )
    my_log <- dm_en$log
    for ( i in 1:length(my_log) ) {
      log_action(my_log[i])
    }
    dm
  })
  if ( !preparation_result$success ) {
    postmortem <- paste("prepare.data.matrix failed:", preparation_result$msg)
    log_action(postmortem)
    stop(postmortem)
  }

  env$preparedDataMatrix <- preparation_result$value

  log_action("w4mkmeans: determining evaluation mode")

  myLapply <- parLapply
  cl <- NULL
  tryCatch(
    expr = {
      outfile <- ""
      # outfile: Where to direct the stdout and stderr connection output from the workers.
      #   - "" indicates no redirection (which may only be useful for workers on the local machine).
      #   - Defaults to ‘/dev/null’ (‘nul:’ on Windows).
      #   - The other possibility is a file path on the worker's host.
      #     - Files will be opened in append mode, as all workers log to the same file.
      cl <- makePSOCKcluster( names = slots, outfile = outfile )
    }
    , error = function(e) {
      log_action(sprintf("w4kmeans: falling back to serial evaluation because makePSOCKcluster(names = %d) threw an exception", slots))
      # mimic parLapply, but without parallelization (as a last resort)
      myLapply <<- function(cl, ...) lapply(...)
    }
  )
  if ( identical(myLapply, parLapply) ) {
    log_action(sprintf("w4mkmeans: using parallel evaluation with %d slots", slots))
    # from ?makePSOCKcluster: "It is good practice to shut down the workers by calling stopCluster."
    # note varlist: names of references must be passed to the cluster so that they can be resolved
    clusterExport(
      cl = cl
    , varlist = c(
        "tryCatchFunc"                     # required by calc_kmeans_one_dimension_one_k
      , "format_error"                     # required by tryCatchFunc when errors are caught
      , "iso_date"                         # required by log_print
      , "log_print"                        # required by calc_kmeans_one_dimension_one_k
      )
    )
    final <- function(cl) {
      # from ?makePSOCKcluster: "It is good practice to shut down the workers by calling stopCluster."
      if ( !is.null(cl) ) {
        log_action("w4mkmeans: stopping cluster used for parallel evaluation")
        stopCluster(cl)
      }
    }
  } else {
    log_action("w4mkmeans: using sequential evaluation (one slot)")
    final <- function(cl) { }
  }

  tryCatch(
    expr = {
      # These myLapply calls produce lists of lists of results:
      #   - The outer list has no keys and its members are accessed by index
      #   - The inner list has keys "clusters" and "scores"

      # for each $i in ksamples, append column 'k$i' to data frame sampleMetadata
      ksamples_length <- length(ksamples)
      if ( ksamples_length > 0 ) {
        smpl_result_list <- myLapply(
            cl = cl
          , ksamples
          , calc_kmeans_one_dimension_one_k
          , env = env
          , dimension = "samples"
          )
        for ( i in 1:ksamples_length ) {
          result <- smpl_result_list[[i]]
          if (result$success) {
            sampleMetadata[sprintf("k%d",ksamples[i])] <- sprintf("%s%d", env$categorical_prefix, result$value$clusters)
            scores <- c(scores, result$value$scores)
          }
        }
      }

      # for each $i in kfeatures, append column 'k$i' to data frame featureMetadata
      kfeatures_length <- length(kfeatures)
      if ( kfeatures_length > 0 ) {
        feat_result_list <- myLapply(
            cl = cl
          , kfeatures
          , calc_kmeans_one_dimension_one_k
          , env = env
          , dimension = "features"
          )
        for ( i in 1:kfeatures_length ) {
          result <- feat_result_list[[i]]
          if (result$success) {
            featureMetadata[sprintf("k%d",kfeatures[i])] <- sprintf("%s%d", env$categorical_prefix, result$value$clusters)
            scores <- c(scores, result$value$scores)
          }
        }
      }

      return (
        list(
          variableMetadata = featureMetadata
        , sampleMetadata   = sampleMetadata
        , scores           = scores
        )
      )
    }
  , finally = {
      final(cl)
    }
  )
}

# calculate k-means for features or samples
#   - recall that the dataMatrix has features in rows and samples in columns
# return value:
#   list(clusters = km$cluster, scores = scores)
# arguments:
#   env:
#     environment having dataMatrix
#   dimension:
#   - "samples":  produce clusters column to add to the sampleMetadata table
#     - this is the default case
#   - "variables":  produce clusters column to add to the variableMetadata table
#   k:
#     integer, the number of clusters to make
calc_kmeans_one_dimension_one_k <- function(k, env, dimension = "samples") {
  # abort if environment is not as expected
  if ( is.null(env) || ! is.environment(env) ) {
    stop("calc_kmeans_one_dimension_one_k - argument 'env' is not an environment")
  }
  if ( ! exists("log_print", env) || ! is.function(env$log_print) ) {
    stop("calc_kmeans_one_dimension_one_k - argument 'env' - environment does not include log_print or it is not a function")
  }
  log_action  <- env$log_print
  # abort if k is not as expected
  if ( ! is.numeric(k) ) {
    stop(sprintf("calc_kmeans_one_dimension_one_k - expected numeric argument 'k' but type is %s", typeof(k)))
  }
  k <- as.integer(k)
  # abort if dimension is not as expected
  if (   ! is.character(dimension)
      || ! Reduce( f =`|`, x = sapply(X = c("features","samples"), FUN = `==`, dimension), init = FALSE) ) {
    stop("calc_kmeans_one_dimension_one_k - argument 'dimension' is neither 'features' nor 'samples'")
  }
  dm           <- env$preparedDataMatrix
  iter_max     <- env$iter_max
  nstart       <- env$nstart
  algorithm    <- env$algorithm
  dim_features <- dimension == "features"

  # tryCatchFunc produces a list
  #   On success of func(), tryCatchFunc produces
  #     list(success = TRUE, value = func(), msg = "")
  #   On failure of func(), tryCatchFunc produces
  #     list(success = FALSE, value = NA, msg = "the error message")
  result_list <- tryCatchFunc( func = function() {
    # kmeans clusters the rows; features are the columns of args_env$dataMatrix; samples, the rows
    # - to calculate sample-clusters, no transposition is needed because samples are rows
    # - to calculate feature-clusters, transposition is needed so that features will be the rows
    if ( ! dim_features ) {
      dm <- t(dm)
    }

    # need to set.seed to get reproducible results from kmeans
    set.seed(4567)

    # do the k-means clustering
    withCallingHandlers(
      {
        km <<- kmeans( x = dm, centers = k, iter.max = iter_max, nstart = nstart, algorithm = algorithm )
      }
    , warning = function(w) {
        lw <- list(w)
        smplwrn <- as.character(w[[1]])
        log_print(
            sprintf( "Warning for %s: center = %d, nstart = %d, iter_max = %d: %s"
          , if (dim_features) "features" else "samples"
          , k
          , nstart
          , iter_max
          , smplwrn
          )
        )
      }
    )

    # collect the scores
    scores <-
      sprintf("%s\t%d\t%0.5e\t%0.5e\t%0.5f"
             , dimension
             , k
             , km$totss
             , km$betweenss
             , km$betweenss/km$totss
             )

    # return list of results
    list(clusters = km$cluster, scores = scores)
  })

  # return either
  #     list(success = TRUE, value = func(), msg = "")
  # or
  #     list(success = FALSE, value = NA, msg = "the error message")
  return ( result_list )
}

# vim: sw=2 ts=2 et :