Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use 'future.seed' and 'progressr' in the future framework #2

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ LazyData: TRUE
Imports:
future,
future.apply,
pbapply,
progressr,
irlba,
NMF (>= 0.23.0),
ggalluvial,
Expand Down
3 changes: 1 addition & 2 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ importFrom(dplyr,starts_with)
importFrom(dplyr,summarise)
importFrom(dplyr,summarize)
importFrom(dplyr,top_n)
importFrom(future,nbrOfWorkers)
importFrom(future,plan)
importFrom(future.apply,future_sapply)
importFrom(ggnetwork,geom_nodetext_repel)
Expand Down Expand Up @@ -228,7 +227,7 @@ importFrom(methods,setClass)
importFrom(methods,setClassUnion)
importFrom(methods,slot)
importFrom(patchwork,wrap_plots)
importFrom(pbapply,pbsapply)
importFrom(progressr,progressor)
importFrom(plyr,mapvalues)
importFrom(reshape2,melt)
importFrom(reticulate,import)
Expand Down
49 changes: 27 additions & 22 deletions R/analysis.R
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,9 @@ netAnalysis_contribution <- function(object, signaling, signaling.name = NULL, s
#' @param net compute the centrality measures on a specific signaling network given by a 2 or 3 dimemsional array net
#' @param net.name a character vector giving the name of signaling networks
#' @param thresh threshold of the p-value for determining significant interaction
#' @importFrom future nbrOfWorkers
#' @importFrom methods slot
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#'
#' @return
#' @export
Expand All @@ -211,17 +210,16 @@ netAnalysis_computeCentrality <- function(object = NULL, slot.name = "netP", net
}
if (length(dim(net)) == 3) {
nrun <- dim(net)[3]
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = pbapply::pbsapply,
no = future.apply::future_sapply
)
centr.all = my.sapply(
p <- progressr::progressor(nrun)
centr.all <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
Sys.sleep(1/nrun)
p(sprintf("%g of %g", x, nrun)) # Use with_progress() to see progress bar in client-side
net0 <- net[ , , x]
return(computeCentralityLocal(net0))
},
future.seed = TRUE,
simplify = FALSE
)
} else {
Expand Down Expand Up @@ -705,9 +703,9 @@ netEmbedding <- function(object, slot.name = "netP", type = c("functional","stru
#' @param nCores number of workers when doing parallel
#' @param k.eigen the number of eigenvalues used when doing spectral clustering
#' @importFrom methods slot
#' @importFrom future nbrOfWorkers plan
#' @importFrom future plan
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#' @return
#' @export
#'
Expand All @@ -733,28 +731,35 @@ netClustering <- function(object, slot.name = "netP", type = c("functional","str
} else {
N <- nrow(data.use)
kRange <- seq(2,min(N-1, 10),by = 1)
nCores <- as.integer(nCores)
if (do.parallel) {
future::plan("multisession", workers = nCores)
if (.Platform$OS.type == "windows") {
future::plan("multisession", workers = nCores)
} else {
future::plan("multicore", workers = nCores)
}
options(future.globals.maxSize = 1000 * 1024^2)
} else {
future::plan("sequential")
}
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = pbapply::pbsapply,
no = future.apply::future_sapply
)
results = my.sapply(
X = 1:length(kRange),
message(sprintf("future plan is '%s'", as.character(attr(future::plan(), "call"))[2]))
kN <- length(kRange)
p <- progressr::progressor(kN)
results <- future.apply::future_sapply(
X = 1:kN,
FUN = function(x) {
idents <- kmeans(data.use,kRange[x],nstart=10)$cluster
Sys.sleep(1/kN)
p(sprintf("%g of %g", x, kN)) # Use with_progress() to see progress bar in client-side
idents <- kmeans(data.use, kRange[x], nstart = 10)$cluster
clusIndex <- idents
#adjMat0 <- as.numeric(outer(clusIndex, clusIndex, FUN = "==")) - outer(1:N, 1:N, "==")
adjMat0 <- Matrix::Matrix(as.numeric(outer(clusIndex, clusIndex, FUN = "==")), nrow = N, ncol = N)
return(list(adjMat = adjMat0, ncluster = length(unique(idents))))
},
future.seed = TRUE,
simplify = FALSE
)
adjMat <- lapply(results, "[[", 1)
CM <- Reduce('+', adjMat)/length(kRange)
CM <- Reduce('+', adjMat)/kN
res <- computeEigengap(as.matrix(CM))
numCluster <- res$upper_bound
clusters = kmeans(data.use,numCluster,nstart=10)$cluster
Expand Down Expand Up @@ -1114,7 +1119,7 @@ rankNet <- function(object, slot.name = "netP", measure = c("weight","count"), m
}

gg <- ggplot(df, aes(x=name, y=contribution.scaled)) + geom_bar(stat="identity",width = bar.w) +
theme_classic() + theme(axis.text=element_text(size=font.size),axis.text.x = element_blank(), axis.ticks.x = element_blank(), axis.title.y = element_text(size=10)) +
theme_classic() + theme(axis.text=element_text(size=font.size),axis.text.x = element_blank(), axis.ticks.x = element_blank(), axis.title.y = element_text(size=font.size)) +
xlab("") + ylab(ylabel) + coord_flip()#+
if (!is.null(title)) {
gg <- gg + ggtitle(title)+ theme(plot.title = element_text(hjust = 0.5))
Expand Down
100 changes: 43 additions & 57 deletions R/modeling.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@
#' @param n Parameter in Hill function
#'
#'
#' @importFrom future nbrOfWorkers
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#' @importFrom stats aggregate
#' @importFrom Matrix crossprod
#' @importFrom utils txtProgressBar setTxtProgressBar
Expand All @@ -61,7 +60,8 @@
#' @export
#'
computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thresholdedMean", "median"), trim = 0.1, LR.use = NULL, raw.use = TRUE, population.size = FALSE,
distance.use = TRUE, interaction.range = 250, scale.distance = 0.01, k.min = 10, contact.dependent = TRUE, contact.range = NULL, contact.knn.k = NULL, contact.dependent.forced = FALSE, do.symmetric = TRUE,
distance.use = TRUE, interaction.range = 250, scale.distance = 0.01, k.min = 10,
contact.dependent = TRUE, contact.range = NULL, contact.knn.k = NULL, contact.dependent.forced = FALSE, do.symmetric = TRUE,
nboot = 100, seed.use = 1L, Kh = 0.5, n = 1) {
type <- match.arg(type)
cat(type, "is used for calculating the average gene expression per cell group.", "\n")
Expand All @@ -88,11 +88,6 @@ computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thres
}
complex_input <- object@DB$complex
cofactor_input <- object@DB$cofactor
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = sapply,
no = future.apply::future_sapply
)

ptm = Sys.time()

Expand Down Expand Up @@ -205,14 +200,17 @@ computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thres

set.seed(seed.use)
permutation <- replicate(nboot, sample.int(nC, size = nC))
data.use.avg.boot <- my.sapply(
p <- progressr::progressor(nboot)
data.use.avg.boot <- future.apply::future_sapply(
X = 1:nboot,
FUN = function(nE) {
p()
groupboot <- group[permutation[, nE]]
data.use.avgB <- aggregate(t(data.use), list(groupboot), FUN = FunMean)
data.use.avgB <- t(data.use.avgB[,-1])
return(data.use.avgB)
},
future.seed = TRUE,
simplify = FALSE
)
pb <- txtProgressBar(min = 0, max = nLR, style = 3, file = stderr())
Expand Down Expand Up @@ -257,10 +255,11 @@ computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thres

Pnull <- as.vector(Pnull)

#Pboot <- foreach(nE = 1:nboot) %dopar% {
Pboot <- sapply(
p <- progressr::progressor(nboot)
Pboot <- future.apply::future_sapply(
X = 1:nboot,
FUN = function(nE) {
p()
data.use.avgB <- data.use.avg.boot[[nE]]
dataLavgB <- computeExpr_LR(geneL[i], data.use.avgB, complex_input)
dataRavgB <- computeExpr_LR(geneR[i], data.use.avgB, complex_input)
Expand Down Expand Up @@ -518,20 +517,17 @@ computeAveExpr <- function(object, features = NULL, group.by = NULL, type = c("t
#' @param complex the names of complex
#' @return
#' @importFrom dplyr select starts_with
#' @importFrom future nbrOfWorkers
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#' @export
computeExpr_complex <- function(complex_input, data.use, complex) {
Rsubunits <- complex_input[complex,] %>% dplyr::select(starts_with("subunit"))
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = sapply,
no = future.apply::future_sapply
)
data.complex = my.sapply(
X = 1:nrow(Rsubunits),
nrun <- nrow(Rsubunits)
p <- progressr::progressor(nrun)
data.complex <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
p()
RsubunitsV <- unlist(Rsubunits[x,], use.names = F)
RsubunitsV <- RsubunitsV[RsubunitsV != ""]
return(geometricMean(data.use[RsubunitsV, , drop = FALSE]))
Expand All @@ -549,20 +545,17 @@ computeExpr_complex <- function(complex_input, data.use, complex) {
# @param FunMean the function for computing mean expression per group
# @return
# @importFrom dplyr select starts_with
# @importFrom future nbrOfWorkers
# @importFrom future.apply future_sapply
# @importFrom pbapply pbsapply
# #' @export
# @importFrom progressr progressor
# @export
.computeExprGroup_complex <- function(complex_input, data.use, complex, group, FunMean) {
Rsubunits <- complex_input[complex,] %>% dplyr::select(starts_with("subunit"))
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = pbapply::pbsapply,
no = future.apply::future_sapply
)
data.complex = my.sapply(
X = 1:nrow(Rsubunits),
nrun <- nrow(Rsubunits)
p <- progressr::progressor(nrun)
data.complex <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
p()
RsubunitsV <- unlist(Rsubunits[x,], use.names = F)
RsubunitsV <- RsubunitsV[RsubunitsV != ""]
RsubunitsV <- intersect(RsubunitsV, rownames(data.use))
Expand All @@ -586,8 +579,8 @@ computeExpr_complex <- function(complex_input, data.use, complex) {
#' @param geneLR a char vector giving a set of ligands or receptors
#' @param data.use data matrix (row are genes and columns are cells or cell groups)
#' @param complex_input the complex_input from CellChatDB
# #' @param group a factor defining the cell groups; If NULL, compute the expression of ligands or receptors in individual cells; otherwise, compute the average expression of ligands or receptors per cell group
# #' @param FunMean the function for computing average expression per cell group
#' @param group a factor defining the cell groups; If NULL, compute the expression of ligands or receptors in individual cells; otherwise, compute the average expression of ligands or receptors per cell group
#' @param FunMean the function for computing average expression per cell group
#' @return
#' @export
computeExpr_LR <- function(geneLR, data.use, complex_input){
Expand All @@ -614,9 +607,8 @@ computeExpr_LR <- function(geneLR, data.use, complex_input){
#' @param pairLRsig a data frame giving ligand-receptor interactions
#' @param type when type == "A", computing expression of co-activation receptor; when type == "I", computing expression of co-inhibition receptor.
#' @return
#' @importFrom future nbrOfWorkers
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#' @export
computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c("A", "I")) {
type <- match.arg(type)
Expand All @@ -627,16 +619,14 @@ computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c
}
index.coreceptor <- which(!is.na(coreceptor.all) & coreceptor.all != "")
if (length(index.coreceptor) > 0) {
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = sapply,
no = future.apply::future_sapply
)
coreceptor <- coreceptor.all[index.coreceptor]
coreceptor.ind <- cofactor_input[coreceptor, grepl("cofactor" , colnames(cofactor_input) )]
data.coreceptor.ind = my.sapply(
X = 1:nrow(coreceptor.ind),
nrun <- nrow(coreceptor.ind)
p <- progressr::progressor(nrun)
data.coreceptor.ind <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
p()
coreceptor.indV <- unlist(coreceptor.ind[x,], use.names = F)
coreceptor.indV <- coreceptor.indV[coreceptor.indV != ""]
coreceptor.indV <- intersect(coreceptor.indV, rownames(data.use))
Expand Down Expand Up @@ -667,10 +657,9 @@ computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c
# @param group a factor defining the cell groups
# @param FunMean the function for computing mean expression per group
# @return
# @importFrom future nbrOfWorkers
# @importFrom future.apply future_sapply
# @importFrom pbapply pbsapply
# #' @export
# @importFrom progressr progressor
# @export
.computeExprGroup_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c("A", "I"), group, FunMean) {
type <- match.arg(type)
if (type == "A") {
Expand All @@ -680,24 +669,21 @@ computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c
}
index.coreceptor <- which(!is.na(coreceptor.all) & coreceptor.all != "")
if (length(index.coreceptor) > 0) {
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = pbapply::pbsapply,
no = future.apply::future_sapply
)
coreceptor <- coreceptor.all[index.coreceptor]
coreceptor.ind <- cofactor_input[coreceptor, grepl("cofactor" , colnames(cofactor_input) )]
data.coreceptor.ind = my.sapply(
X = 1:nrow(coreceptor.ind),
nrun <- nrow(coreceptor.ind)
p <- progressr::progressor(nrun)
data.coreceptor.ind <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
p()
coreceptor.indV <- unlist(coreceptor.ind[x,], use.names = F)
coreceptor.indV <- coreceptor.indV[coreceptor.indV != ""]
coreceptor.indV <- intersect(coreceptor.indV, rownames(data.use))
if (length(coreceptor.indV) > 1) {
data.avg <- aggregate(t(data.use[coreceptor.indV,]), list(group), FUN = FunMean)
data.avg <- t(data.avg[,-1])
return(apply(1 + data.avg, 2, prod))
# return(1 + apply(data.avg, 2, mean))
} else if (length(coreceptor.indV) == 1) {
data.avg <- aggregate(matrix(data.use[coreceptor.indV,], ncol = 1), list(group), FUN = FunMean)
data.avg <- t(data.avg[,-1])
Expand Down Expand Up @@ -787,10 +773,10 @@ computeExprGroup_antagonist <- function(data.use, pairLRsig, cofactor_input, gro
#' @param data.use data matrix
#' @param cofactor_input the cofactor_input from CellChatDB
#' @param pairLRsig the L-R interactions
# #' @param group a factor defining the cell groups
#' @param group a factor defining the cell groups
#' @param index.agonist the index of agonist in the database
#' @param Kh a parameter in Hill function
# #' @param FunMean the function for computing mean expression per group
#' @param FunMean the function for computing mean expression per group
#' @param n Hill coefficient
#' @return
#' @export
Expand Down Expand Up @@ -823,11 +809,11 @@ computeExpr_agonist <- function(data.use, pairLRsig, cofactor_input, index.agoni
#' @param data.use data matrix
#' @param cofactor_input the cofactor_input from CellChatDB
#' @param pairLRsig the L-R interactions
# #' @param group a factor defining the cell groups
#' @param group a factor defining the cell groups
#' @param index.antagonist the index of antagonist in the database
#' @param Kh a parameter in Hill function
#' @param n Hill coefficient
# #' @param FunMean the function for computing mean expression per group
#' @param FunMean the function for computing mean expression per group
#' @return
#' @export
#' @importFrom stats aggregate
Expand Down Expand Up @@ -885,7 +871,7 @@ triMean <- function(x, na.rm = TRUE) {
#' @param na.rm whether remove na
#' @return
#' @importFrom Matrix nnzero
# #' @export
#' @export
thresholdedMean <- function(x, trim = 0.1, na.rm = TRUE) {
percent <- Matrix::nnzero(x)/length(x)
if (percent < trim) {
Expand Down
Loading