diff --git a/DESCRIPTION b/DESCRIPTION index 45db214..f4f1f28 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -12,7 +12,7 @@ LazyData: true Imports: future, future.apply, - pbapply, + progressr, irlba, NMF (>= 0.23.0), ggalluvial, diff --git a/R/analysis.R b/R/analysis.R index 9dff159..17e0bef 100644 --- a/R/analysis.R +++ b/R/analysis.R @@ -190,10 +190,9 @@ netAnalysis_contribution <- function(object, signaling, signaling.name = NULL, s #' @param net compute the centrality measures on a specific signaling network given by a 2 or 3 dimemsional array net #' @param net.name a character vector giving the name of signaling networks #' @param thresh threshold of the p-value for determining significant interaction -#' @importFrom future nbrOfWorkers #' @importFrom methods slot #' @importFrom future.apply future_sapply -#' @importFrom pbapply pbsapply +#' @importFrom progressr progressor #' #' @return #' @export @@ -211,17 +210,16 @@ netAnalysis_computeCentrality <- function(object = NULL, slot.name = "netP", net } if (length(dim(net)) == 3) { nrun <- dim(net)[3] - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = pbapply::pbsapply, - no = future.apply::future_sapply - ) - centr.all = my.sapply( + p <- progressr::progressor(nrun) + centr.all <- future.apply::future_sapply( X = 1:nrun, FUN = function(x) { + Sys.sleep(1/nrun) + p(sprintf("%g of %g", x, nrun)) # Use with_progress() to see progress bar in client-side net0 <- net[ , , x] - return(computeCentralityLocal(net0)) + computeCentralityLocal(net0) }, + future.seed = TRUE, simplify = FALSE ) } else { @@ -705,9 +703,9 @@ netEmbedding <- function(object, slot.name = "netP", type = c("functional","stru #' @param nCores number of workers when doing parallel #' @param k.eigen the number of eigenvalues used when doing spectral clustering #' @importFrom methods slot -#' @importFrom future nbrOfWorkers plan +#' @importFrom future plan #' @importFrom future.apply future_sapply -#' @importFrom pbapply pbsapply +#' @importFrom progressr progressor #' @return #' @export #' @@ -733,28 +731,35 @@ netClustering <- function(object, slot.name = "netP", type = c("functional","str } else { N <- nrow(data.use) kRange <- seq(2,min(N-1, 10),by = 1) + kN <- length(kRange) + nCores <- as.integer(nCores) if (do.parallel) { - future::plan("multiprocess", workers = nCores) + if (.Platform$OS.type == "windows") { + future::plan("multisession", workers = nCores) + } else { + future::plan("multicore", workers = nCores) + } options(future.globals.maxSize = 1000 * 1024^2) + } else { + future::plan("sequential") } - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = pbapply::pbsapply, - no = future.apply::future_sapply - ) - results = my.sapply( - X = 1:length(kRange), + message(sprintf("future plan is '%s'", as.character(attr(future::plan(), "call"))[2])) + p <- progressr::progressor(kN) + results <- future.apply::future_sapply( + X = 1:kN, FUN = function(x) { - idents <- kmeans(data.use,kRange[x],nstart=10)$cluster + Sys.sleep(1/kN) + p(sprintf("%g of %g", x, kN)) # Use with_progress() to see progress bar in client-side + idents <- kmeans(data.use, kRange[x], nstart = 10)$cluster clusIndex <- idents - #adjMat0 <- as.numeric(outer(clusIndex, clusIndex, FUN = "==")) - outer(1:N, 1:N, "==") adjMat0 <- Matrix::Matrix(as.numeric(outer(clusIndex, clusIndex, FUN = "==")), nrow = N, ncol = N) - return(list(adjMat = adjMat0, ncluster = length(unique(idents)))) + list(adjMat = adjMat0, ncluster = length(unique(idents))) }, + future.seed = TRUE, simplify = FALSE ) adjMat <- lapply(results, "[[", 1) - CM <- Reduce('+', adjMat)/length(kRange) + CM <- Reduce('+', adjMat)/kN res <- computeEigengap(as.matrix(CM)) numCluster <- res$upper_bound clusters = kmeans(data.use,numCluster,nstart=10)$cluster diff --git a/R/modeling.R b/R/modeling.R index 38b75e1..c6e056e 100644 --- a/R/modeling.R +++ b/R/modeling.R @@ -30,9 +30,8 @@ #' @param n parameter in Hill function #' #' -#' @importFrom future nbrOfWorkers #' @importFrom future.apply future_sapply -#' @importFrom pbapply pbsapply +#' @importFrom progressr progressor #' @importFrom stats aggregate #' @importFrom Matrix crossprod #' @importFrom utils txtProgressBar setTxtProgressBar @@ -69,11 +68,6 @@ computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thres } complex_input <- object@DB$complex cofactor_input <- object@DB$cofactor - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = sapply, - no = future.apply::future_sapply - ) ptm = Sys.time() @@ -154,14 +148,17 @@ computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thres set.seed(seed.use) permutation <- replicate(nboot, sample.int(nC, size = nC)) - data.use.avg.boot <- my.sapply( + p <- progressr::progressor(nboot) + data.use.avg.boot <- future.apply::future_sapply( X = 1:nboot, FUN = function(nE) { + p() groupboot <- group[permutation[, nE]] data.use.avgB <- aggregate(t(data.use), list(groupboot), FUN = FunMean) data.use.avgB <- t(data.use.avgB[,-1]) - return(data.use.avgB) + data.use.avgB }, + future.seed = TRUE, simplify = FALSE ) pb <- txtProgressBar(min = 0, max = nLR, style = 3, file = stderr()) @@ -203,10 +200,11 @@ computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thres Pnull <- as.vector(Pnull) - #Pboot <- foreach(nE = 1:nboot) %dopar% { - Pboot <- sapply( + p <- progressr::progressor(nboot) + Pboot <- future.apply::future_sapply( X = 1:nboot, FUN = function(nE) { + p() data.use.avgB <- data.use.avg.boot[[nE]] dataLavgB <- computeExpr_LR(geneL[i], data.use.avgB, complex_input) dataRavgB <- computeExpr_LR(geneR[i], data.use.avgB, complex_input) @@ -458,23 +456,20 @@ computeAveExpr <- function(object, features = NULL, group.by = NULL, type = c("t #' @param complex the names of complex #' @return #' @importFrom dplyr select starts_with -#' @importFrom future nbrOfWorkers #' @importFrom future.apply future_sapply -#' @importFrom pbapply pbsapply +#' @importFrom progressr progressor #' @export computeExpr_complex <- function(complex_input, data.use, complex) { Rsubunits <- complex_input[complex,] %>% dplyr::select(starts_with("subunit")) - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = sapply, - no = future.apply::future_sapply - ) - data.complex = my.sapply( - X = 1:nrow(Rsubunits), + nrun <- nrow(Rsubunits) + p <- progressr::progressor(nrun) + data.complex <- future.apply::future_sapply( + X = 1:nrun, FUN = function(x) { + p() RsubunitsV <- unlist(Rsubunits[x,], use.names = F) RsubunitsV <- RsubunitsV[RsubunitsV != ""] - return(geometricMean(data.use[RsubunitsV,])) + geometricMean(data.use[RsubunitsV,]) } ) data.complex <- t(data.complex) @@ -489,20 +484,17 @@ computeExpr_complex <- function(complex_input, data.use, complex) { # @param FunMean the function for computing mean expression per group # @return # @importFrom dplyr select starts_with -# @importFrom future nbrOfWorkers # @importFrom future.apply future_sapply -# @importFrom pbapply pbsapply -# #' @export +# @importFrom progressr progressor +# @export .computeExprGroup_complex <- function(complex_input, data.use, complex, group, FunMean) { Rsubunits <- complex_input[complex,] %>% dplyr::select(starts_with("subunit")) - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = pbapply::pbsapply, - no = future.apply::future_sapply - ) - data.complex = my.sapply( - X = 1:nrow(Rsubunits), + nrun <- nrow(Rsubunits) + p <- progressr::progressor(nrun) + data.complex <- future.apply::future_sapply( + X = 1:nrun, FUN = function(x) { + p() RsubunitsV <- unlist(Rsubunits[x,], use.names = F) RsubunitsV <- RsubunitsV[RsubunitsV != ""] RsubunitsV <- intersect(RsubunitsV, rownames(data.use)) @@ -515,7 +507,7 @@ computeExpr_complex <- function(complex_input, data.use, complex) { } else { data.avg = matrix(0, nrow = 1, ncol = length(unique(group))) } - return(geometricMean(data.avg)) + geometricMean(data.avg) } ) data.complex <- t(data.complex) @@ -554,9 +546,8 @@ computeExpr_LR <- function(geneLR, data.use, complex_input){ #' @param pairLRsig a data frame giving ligand-receptor interactions #' @param type when type == "A", computing expression of co-activation receptor; when type == "I", computing expression of co-inhibition receptor. #' @return -#' @importFrom future nbrOfWorkers #' @importFrom future.apply future_sapply -#' @importFrom pbapply pbsapply +#' @importFrom progressr progressor #' @export computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c("A", "I")) { type <- match.arg(type) @@ -567,25 +558,23 @@ computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c } index.coreceptor <- which(!is.na(coreceptor.all) & coreceptor.all != "") if (length(index.coreceptor) > 0) { - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = sapply, - no = future.apply::future_sapply - ) coreceptor <- coreceptor.all[index.coreceptor] coreceptor.ind <- cofactor_input[coreceptor, grepl("cofactor" , colnames(cofactor_input) )] - data.coreceptor.ind = my.sapply( - X = 1:nrow(coreceptor.ind), + nrun <- nrow(coreceptor.ind) + p <- progressr::progressor(nrun) + data.coreceptor.ind <- future.apply::future_sapply( + X = 1:nrun, FUN = function(x) { + p() coreceptor.indV <- unlist(coreceptor.ind[x,], use.names = F) coreceptor.indV <- coreceptor.indV[coreceptor.indV != ""] coreceptor.indV <- intersect(coreceptor.indV, rownames(data.use)) if (length(coreceptor.indV) == 1) { - return(1 + data.use[coreceptor.indV, ]) + 1 + data.use[coreceptor.indV, ] } else if (length(coreceptor.indV) > 1) { - return(apply(1 + data.use[coreceptor.indV, ], 2, prod)) + apply(1 + data.use[coreceptor.indV, ], 2, prod) } else { - return(matrix(1, nrow = 1, ncol = ncol(data.use))) + matrix(1, nrow = 1, ncol = ncol(data.use)) } } ) @@ -607,9 +596,8 @@ computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c # @param group a factor defining the cell groups # @param FunMean the function for computing mean expression per group # @return -# @importFrom future nbrOfWorkers # @importFrom future.apply future_sapply -# @importFrom pbapply pbsapply +#' @importFrom progressr progressor # #' @export .computeExprGroup_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c("A", "I"), group, FunMean) { type <- match.arg(type) @@ -620,30 +608,27 @@ computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c } index.coreceptor <- which(!is.na(coreceptor.all) & coreceptor.all != "") if (length(index.coreceptor) > 0) { - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = pbapply::pbsapply, - no = future.apply::future_sapply - ) coreceptor <- coreceptor.all[index.coreceptor] coreceptor.ind <- cofactor_input[coreceptor, grepl("cofactor" , colnames(cofactor_input) )] - data.coreceptor.ind = my.sapply( - X = 1:nrow(coreceptor.ind), + nrun <- nrow(coreceptor.ind) + p <- progressr::progressor(nrun) + data.coreceptor.ind <- future.apply::future_sapply( + X = 1:nrun, FUN = function(x) { + p() coreceptor.indV <- unlist(coreceptor.ind[x,], use.names = F) coreceptor.indV <- coreceptor.indV[coreceptor.indV != ""] coreceptor.indV <- intersect(coreceptor.indV, rownames(data.use)) if (length(coreceptor.indV) > 1) { data.avg <- aggregate(t(data.use[coreceptor.indV,]), list(group), FUN = FunMean) data.avg <- t(data.avg[,-1]) - return(apply(1 + data.avg, 2, prod)) - # return(1 + apply(data.avg, 2, mean)) + apply(1 + data.avg, 2, prod) } else if (length(coreceptor.indV) == 1) { data.avg <- aggregate(matrix(data.use[coreceptor.indV,], ncol = 1), list(group), FUN = FunMean) data.avg <- t(data.avg[,-1]) - return(1 + data.avg) + 1 + data.avg } else { - return(matrix(1, nrow = 1, ncol = length(unique(group)))) + matrix(1, nrow = 1, ncol = length(unique(group))) } } ) diff --git a/R/utilities.R b/R/utilities.R index 88241da..441b7f4 100644 --- a/R/utilities.R +++ b/R/utilities.R @@ -274,9 +274,8 @@ subsetData <- function(object, features = NULL) { #' @param thresh.pc Threshold of the percent of cells expressed in one cluster #' @param thresh.fc Threshold of Log Fold Change #' @param thresh.p Threshold of p-values -#' @importFrom future nbrOfWorkers -#' @importFrom pbapply pbsapply #' @importFrom future.apply future_sapply +#' @importFrom progressr progressor #' @importFrom stats sd wilcox.test #' @importFrom stats p.adjust #' @@ -335,12 +334,6 @@ identifyOverExpressedGenes <- function(object, data.use = NULL, group.by = NULL, } } - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = pbapply::pbsapply, - no = future.apply::future_sapply - ) - mean.fxn <- function(x) { return(log(x = mean(x = expm1(x = x)) + 1)) } @@ -397,13 +390,16 @@ identifyOverExpressedGenes <- function(object, data.use = NULL, group.by = NULL, data1 <- data.use[features, cell.use1, drop = FALSE] data2 <- data.use[features, cell.use2, drop = FALSE] + nrun <- nrow(x = data1) + p <- progressr::progressor(nrun) pvalues <- unlist( - x = my.sapply( - X = 1:nrow(x = data1), + x <- future.apply::future_sapply( + X = 1:nrun, FUN = function(x) { - # return(wilcox.test(data1[x, ], data2[x, ], alternative = "greater")$p.value) - return(wilcox.test(data1[x, ], data2[x, ])$p.value) - } + Sys.sleep(1/nrun) + p(sprintf("%g of %g in %s", x, nrun, level.use[i])) # Use with_progress() to see progress bar in client-side + wilcox.test(data1[x, ], data2[x, ])$p.value + } ) ) @@ -458,9 +454,8 @@ identifyOverExpressedGenes <- function(object, data.use = NULL, group.by = NULL, #' @param features.name a char name used for storing the over-expressed ligands and receptors in `object@var.features[[paste0(features.name, ".LR")]]` #' @param features a vector of features to use. default use all over-expressed genes in `object@var.features[[features.name]]` #' @param return.object whether returning a CellChat object. If FALSE, it will return a data frame containing over-expressed ligands and (complex) receptors associated with each cell group -#' @importFrom future nbrOfWorkers #' @importFrom future.apply future_sapply -#' @importFrom pbapply pbsapply +#' @importFrom progressr progressor #' @importFrom dplyr select #' #' @return A CellChat object or a data frame. If returning a CellChat object, a new element named paste0(features.name, ".LR") will be added into the list `object@var.features` @@ -488,29 +483,29 @@ identifyOverExpressedLigandReceptor <- function(object, features.name = "feature markers.all <- subset(markers.all, subset = features %in% features.use) } - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = pbapply::pbsapply, - no = future.apply::future_sapply - ) complexSubunits <- complex_input[, grepl("subunit" , colnames(complex_input))] markers.all.new <- data.frame() for (i in 1:nrow(markers.all)) { - if (markers.all$features[i] %in% LR.use) { + features <- markers.all$features[i] + if (features %in% LR.use) { markers.all.new <- rbind(markers.all.new, markers.all[i, , drop = FALSE]) } else { + nrun <- nrow(complexSubunits) + p <- progressr::progressor(nrun) index.sig <- unlist( - x = my.sapply( - X = 1:nrow(complexSubunits), + x <- future.apply::future_sapply( + X = 1:nrun, FUN = function(x) { + Sys.sleep(1/nrun) + p(sprintf("%g of %g in %s", x, nrun, features)) # Use with_progress() to see progress bar in client-side complexsubunitsV <- unlist(complexSubunits[x,], use.names = F) complexsubunitsV <- complexsubunitsV[complexsubunitsV != ""] - if (markers.all$features[i] %in% complexsubunitsV) { + if (features %in% complexsubunitsV) { return(x) - } + } } - ) + ) ) complexSubunits.sig <- rownames(complexSubunits[index.sig,]) markers.all.complex <- data.frame() @@ -539,9 +534,8 @@ identifyOverExpressedLigandReceptor <- function(object, features.name = "feature #' @param features.name a char name used for assess the results in `object@var.features[[features.name]]` #' @param features a vector of features to use. default use all over-expressed genes in `object@var.features[[features.name]]` #' @param return.object whether returning a CellChat object. If FALSE, it will return a data frame containing the over-expressed ligand-receptor pairs -#' @importFrom future nbrOfWorkers #' @importFrom future.apply future_sapply -#' @importFrom pbapply pbsapply +#' @importFrom progressr progressor #' @importFrom dplyr select #' #' @return A CellChat object or a data frame. If returning a CellChat object, a new element named 'LRsig' will be added into the list `object@LR` @@ -563,21 +557,20 @@ identifyOverExpressedInteractions <- function(object, features.name = "features" interaction_input <- DB$interaction complex_input <- DB$complex - my.sapply <- ifelse( - test = future::nbrOfWorkers() == 1, - yes = pbapply::pbsapply, - no = future.apply::future_sapply - ) complexSubunits <- complex_input[, grepl("subunit" , colnames(complex_input))] + nrun <- nrow(complexSubunits) + p <- progressr::progressor(nrun) index.sig <- unlist( - x = my.sapply( - X = 1:nrow(complexSubunits), + x <- future.apply::future_sapply( + X = 1:nrun, FUN = function(x) { + Sys.sleep(1/nrun) + p(sprintf("(Step 1) %g of %g", x, nrun)) # Use with_progress() to see progress bar in client-side complexsubunitsV <- unlist(complexSubunits[x,], use.names = F) complexsubunitsV <- complexsubunitsV[complexsubunitsV != ""] if (length(intersect(complexsubunitsV, features.sig)) > 0 & all(complexsubunitsV %in% gene.use)) { return(x) - } + } } ) ) @@ -585,10 +578,14 @@ identifyOverExpressedInteractions <- function(object, features.name = "features" pairLR <- select(interaction_input, ligand, receptor) + nrun <- nrow(pairLR) + p <- progressr::progressor(nrun) index.sig <- unlist( - x = my.sapply( - X = 1:nrow(pairLR), + x <- future.apply::future_sapply( + X = 1:nrun, FUN = function(x) { + Sys.sleep(1/nrun) + p(sprintf("(Step 2) %g of %g", x, nrun)) # Use with_progress() to see progress bar in client-side if (all(unlist(pairLR[x,], use.names = F) %in% c(features.sig, rownames(complexSubunits.sig)))) { return(x) }