Skip to content

Commit

Permalink
Merge pull request #18 from dipterix/lapply_async-fix
Browse files Browse the repository at this point in the history
lapply_async2 now evaluates the globals size before running parallel …
  • Loading branch information
dipterix authored Nov 22, 2024
2 parents 65c0540 + ad285bc commit 9dbf108
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 106 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: dipsaus
Type: Package
Title: A Dipping Sauce for Data Analysis and Visualizations
Version: 0.2.9.9000
Version: 0.2.9.9001
Authors@R: c(
person("Zhengjia", "Wang", email = "[email protected]", role = c("aut", "cre")),
person("John", "Magnotti", email = "[email protected]", role = c("ctb"),
Expand Down
239 changes: 134 additions & 105 deletions R/parallels-future.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,35 +54,6 @@
lapply_async2 <- function(x, FUN, FUN.args = list(),
callback = NULL, plan = TRUE,
future.chunk.size = NULL, future.seed = sample.int(1, n = 1e5 - 1), ...){
if(length(plan) && !isFALSE(plan)){
if(isTRUE(plan)){
make_forked_clusters(..., clean = TRUE)
} else {
current_plan <- future::plan("list")
on.exit({
# Restore plan might encounter errors, use try-catch
tryCatch({
future::plan(
current_plan,
substitute = FALSE,
.call = NULL,
.cleanup = FALSE,
.init = FALSE
)
}, error = function(e){
future::plan('sequential')
})
}, add = TRUE, after = TRUE)
if(is.character(plan) && plan == 'callr'){
if (!requireNamespace("future.callr", quietly = TRUE)) {
stop("Package \"future.callr\" is needed to set plan as 'callr'. Please install it.", call. = FALSE)
}
future::plan(future.callr::callr, ..., .call = NULL, .cleanup = FALSE, .init = FALSE)
}else{
future::plan(plan, ..., .call = NULL, .cleanup = FALSE, .init = FALSE)
}
}
}

if(inherits(callback, "formula")) {
penv <- parent.frame()
Expand All @@ -109,10 +80,67 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),
call <- as_call(quote(FUN), quote(el), .list = FUN.args)
if(is.null(callback_call)){
f <- dipsaus::new_function2(alist(el = ), body = call, quote_type = 'quote', env = environment())
fs <- future.apply::future_lapply(
x, f, future.seed = future.seed,
future.scheduling = TRUE,
future.chunk.size = future.chunk.size)
globals_and_packages <- future::getGlobalsAndPackages(list(f), maxSize = 1e12)

parallel <- TRUE
tryCatch(
{
max_globals_size <- getOption("future.globals.maxSize", default = 524288000) # 524288000 = 500MB
globals_size <- attr(globals_and_packages$globals, "total_size")
if( isTRUE(globals_size >= max_globals_size) ) {
future::plan('sequential')
parallel <- FALSE
}
},
error = function(e) {
# Nothing
}
)

if( parallel ) {

if(length(plan) && !isFALSE(plan)){
if(isTRUE(plan)){
make_forked_clusters(..., clean = TRUE)
} else {
current_plan <- future::plan("list")
on.exit({
# Restore plan might encounter errors, use try-catch
tryCatch({
future::plan(
current_plan,
substitute = FALSE,
.call = NULL,
.cleanup = FALSE,
.init = FALSE
)
}, error = function(e){
future::plan('sequential')
})
}, add = TRUE, after = TRUE)
if(is.character(plan) && plan == 'callr'){
if (!requireNamespace("future.callr", quietly = TRUE)) {
stop("Package \"future.callr\" is needed to set plan as 'callr'. Please install it.", call. = FALSE)
}
future::plan(future.callr::callr, ..., .call = NULL, .cleanup = FALSE, .init = FALSE)
}else{
future::plan(plan, ..., .call = NULL, .cleanup = FALSE, .init = FALSE)
}
}
}

fs <- future.apply::future_lapply(
x, f,
future.scheduling = TRUE,
future.chunk.size = future.chunk.size,
future.seed = future.seed,
future.globals = globals_and_packages$globals,
future.packages = globals_and_packages$packages
)
} else {
message("Using single thread due to large global objects")
fs <- lapply(x, f)
}
} else {

old.handlers <- progressr::handlers(handler_dipsaus_progress())
Expand All @@ -125,80 +153,33 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),
# if(is.null(shiny::getDefaultReactiveDomain())){

progressr::with_progress({
...p <- progressr::progressor(steps = 2 * length(x) + 1)

# f <- function(el, ...) {
# msg <- ""
# if( is.function(callback) ){
# callback_formals <- formals(callback)
# if (length(callback_formals)){
# msg <- callback(el)
# }else{
# msg <- callback()
# }
# }
# if(is.character(msg)) {
# msg <- paste(msg, collapse = "")
# } else {
# msg <- deparse(el, width.cutoff = 30)
# if(length(msg) > 1){
# msg <- msg[[1]]
# }
# if(nchar(msg) >= 10){
# msg <- sprintf("%s...", substr(msg, stop = 7, start = 1))
# }
# }
#
# p(message = sprintf("%s (started)", msg))
# on.exit({
# p(message = sprintf("%s (end)", msg))
# }, add = TRUE, after = TRUE)
#
# return(FUN(el, ...))
# }

# f <- dipsaus::new_function2(alist(el = ), body = bquote({
# ...msg... <- .(callback_call)
# if(is.character(...msg...)) {
# ...msg... <- paste(...msg..., collapse = "")
# } else {
# ...msg... <- deparse(el, width.cutoff = 30)
# if(length(...msg...) > 1){
# ...msg... <- ...msg...[[1]]
# }
# if(nchar(...msg...) >= 10){
# ...msg... <- sprintf("%s...", substr(...msg..., stop = 7, start = 1))
# }
# }
# p(message = sprintf("%s (started)", ...msg...), )
# on.exit({
# p(message = sprintf("%s (end)", ...msg...))
# }, add = TRUE, after = TRUE)
#
# .(call)
#
# }), quote_type = "quote", env = environment())

# manually parse globals
...p <- progressr::progressor(steps = 2 * length(x) + 2)
...FUN2 <- dipsaus::new_function2(args = formals(FUN), env = baseenv())
...callback2 <- dipsaus::new_function2(args = formals(callback), env = baseenv())
...FUN.args <- FUN.args
...FUN_globals <- future::getGlobalsAndPackages(FUN, envir = environment(FUN))$globals
...callback_globals <- future::getGlobalsAndPackages(callback, envir = environment(callback))$globals
...FUN_globals <- future::getGlobalsAndPackages(FUN, envir = environment(FUN), maxSize = 1e12)$globals
...callback_globals <- future::getGlobalsAndPackages(callback, envir = environment(callback), maxSize = 1e12)$globals
store_env <- new.env(parent = globalenv())
store_env$...p <- ...p
store_env$...FUN2 <- ...FUN2
store_env$...callback2 <- ...callback2
store_env$...FUN.args <- ...FUN.args
store_env$...FUN_globals <- ...FUN_globals
store_env$...callback_globals <- ...callback_globals

# print(...FUN_globals)

ff <- function() {
ff <- new_function2(body = quote({
...p
...FUN.args
...FUN2
...FUN_globals
...callback_globals
...callback2
}
globals_and_packages <- future::getGlobalsAndPackages(list(FUN, callback, ff))
}), env = store_env, quote_type = "quote")

# print(names(globals_and_packages$globals))
globals_and_packages <- future::getGlobalsAndPackages(list(FUN, callback, ff), maxSize = 1e12)

f <- dipsaus::new_function2(alist(el = ), body = bquote({

Expand Down Expand Up @@ -227,20 +208,68 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),
re <- do.call(...FUN2, c(list(el), ...FUN.args))
return(re)

}), quote_type = "quote", env = new.env(parent = globalenv()))
}), quote_type = "quote", env = new.env(parent = store_env))

fs <- future.apply::future_lapply(
x, f,
future.scheduling = TRUE,
future.chunk.size = future.chunk.size,
future.seed = future.seed,
future.globals = globals_and_packages$globals,
future.packages = globals_and_packages$packages
parallel <- TRUE
tryCatch(
{
max_globals_size <- getOption("future.globals.maxSize", default = 524288000) # 524288000 = 500MB
if( isTRUE(attr(globals_and_packages$globals, "total_size") >= max_globals_size) ) {
future::plan('sequential')
parallel <- FALSE
}
},
error = function(e) {
# Nothing
}
)
# fs <- future.apply::future_lapply(x, f,
# future.scheduling = TRUE,
# future.chunk.size = future.chunk.size,
# future.seed = future.seed)

if( parallel ) {

...p(message = "Preparing for parallel workers...")

if(length(plan) && !isFALSE(plan)){
if(isTRUE(plan)){
make_forked_clusters(..., clean = TRUE)
} else {
current_plan <- future::plan("list")
on.exit({
# Restore plan might encounter errors, use try-catch
tryCatch({
future::plan(
current_plan,
substitute = FALSE,
.call = NULL,
.cleanup = FALSE,
.init = FALSE
)
}, error = function(e){
future::plan('sequential')
})
}, add = TRUE, after = TRUE)
if(is.character(plan) && plan == 'callr'){
if (!requireNamespace("future.callr", quietly = TRUE)) {
stop("Package \"future.callr\" is needed to set plan as 'callr'. Please install it.", call. = FALSE)
}
future::plan(future.callr::callr, ..., .call = NULL, .cleanup = FALSE, .init = FALSE)
}else{
future::plan(plan, ..., .call = NULL, .cleanup = FALSE, .init = FALSE)
}
}
}

fs <- future.apply::future_lapply(
x, f,
future.scheduling = TRUE,
future.chunk.size = future.chunk.size,
future.seed = future.seed,
future.globals = globals_and_packages$globals,
future.packages = globals_and_packages$packages
)
} else {
...p(message = "Using single thread due to large global objects\n")
fs <- lapply(x, f)
}

p("Results collected\n")

Expand Down

0 comments on commit 9dbf108

Please sign in to comment.