Skip to content

Commit

Permalink
Merge branch 'donotuseRequire' into development
Browse files Browse the repository at this point in the history
  • Loading branch information
ianmseddy committed Jul 16, 2024
2 parents da8cf26 + 5bc8c45 commit dcc8208
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 109 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Type: Package
Title: Utilities for Working With the 'fireSense' Group of 'SpaDES' Modules
Description: Utilities for working with the 'fireSense' group of 'SpaDES' modules.
Date: 2024-07-10
Version: 0.0.5.9070
Version: 0.0.5.9071
Authors@R: c(
person("Jean", "Marchal", email = "[email protected]",
role = c("aut")),
Expand Down
208 changes: 100 additions & 108 deletions R/DEoptim_fns.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ runDEoptim <- function(landscape,
"_pid", Sys.getpid(), ".log"
)
)
message(crayon::blurred(paste0(
message(paste0(
"Starting parallel model fitting for ",
"fireSense_SpreadFit. Log: ", logPath
)))
))

# Make sure logPath can be written in the workers -- need to create the dir

Expand All @@ -154,148 +154,140 @@ runDEoptim <- function(landscape,
# multiple times per machine, if not all 'localhost'
revtunnel <- FALSE
neededPkgs <- c("kSamples", "magrittr", "raster", "data.table",
"SpaDES.tools", "fireSenseUtils")
"SpaDES.tools", "fireSenseUtils", "sf")

# browser()
if (!identical("localhost", unique(cores))) {
repos <- c("https://predictiveecology.r-universe.dev", getOption("repos"))

aa <- Require::pkgDep(unique(c("dqrng", "PredictiveEcology/SpaDES.tools@development",
"PredictiveEcology/fireSenseUtils@development", "qs", "RCurl", neededPkgs)), recursive = TRUE)
pkgsNeeded <- unique(Require::extractPkgName(unname(unlist(aa))))

revtunnel <- ifelse(all(cores == "localhost"), FALSE, TRUE)

coresUnique <- setdiff(unique(cores), "localhost")
message(
"Making sure packages with sufficient versions installed and loaded on: ",
"copying packages to: ",
paste(coresUnique, collapse = ", ")
)
st <- system.time({
cl <- parallelly::makeClusterPSOCK(coresUnique, revtunnel = revtunnel, rscript_libs = libPath)
})
packageVersionFSU <- packageVersion("fireSenseUtils")
packageVersionST <- packageVersion("SpaDES.tools")
clusterExport(cl, list("libPath", "logPath", "packageVersionFSU", "packageVersionST", "repos"),
clusterExport(cl, list("libPath", "logPath", "repos", "pkgsNeeded"),
envir = environment())

parallel::clusterEvalQ(
cl,
{
# If this is first time that packages need to be installed for this user on this machine
# there won't be a folder present that is writable
if (!dir.exists(libPath)) {
dir.create(libPath, recursive = TRUE)
if (!"Require" %in% rownames(utils::installed.packages())) {
repos <- c("predictiveecology.r-universe.dev", getOption("repos"))
install.packages("Require", repos = repos)
} else if (packageVersion("Require") < "0.3.1.9098") {
repos <- c("predictiveecology.r-universe.dev", getOption("repos"))
install.packages("Require", repos = repos)
}

if (FALSE) {
## This will install the versions of SpaDES.tools and fireSenseUtils that are on the main machine
Require::Require(c("dqrng", "SpaDES.tools", "fireSenseUtils"), repos = repos)
}
}
parallel::clusterEvalQ(cl, {
# If this is first time that packages need to be installed for this user on this machine
# there won't be a folder present that is writable
if (!dir.exists(libPath)) {
dir.create(libPath, recursive = TRUE)
}
)
pkgsNeeded <- unique(Require::extractPkgName(unname(unlist(aa))))
})

out <- lapply(setdiff(unique(cores), "localhost"), function(ip) {
system(paste0("rsync -aruv --update ", paste(file.path(libPath, pkgsNeeded), collapse = " "),

rsync <- Sys.which("rsync")
if (!nzchar(rsync)) stop()
system(paste0(rsync, " -aruv --update ", paste(file.path(libPath, pkgsNeeded), collapse = " "),
" ", ip, ":", libPath))
})

GDALversions <- parallel::clusterEvalQ(cl, {
return(sf::sf_extSoftVersion()["GDAL"])
})

stopifnot(length(unique(sf::sf_extSoftVersion()["GDAL"], GDALversions)) == 1)
parallel::stopCluster(cl)
}

## Now make full cluster with one worker per core listed in "cores"
message("Starting ", paste(paste(names(table(cores))), "x", table(cores),
collapse = ", "
), " clusters")
message("Starting main parallel cluster ...")
st <- system.time({
cl <- parallelly::makeClusterPSOCK(cores,
revtunnel = revtunnel,
outfile = logPath, rscript_libs = libPath
)
})
## Now make full cluster with one worker per core listed in "cores"
message("Starting ", paste(paste(names(table(cores))), "x", table(cores),
collapse = ", "
), " clusters")
message("Starting main parallel cluster ...")
st <- system.time({
cl <- parallelly::makeClusterPSOCK(cores,
revtunnel = revtunnel,
outfile = logPath, rscript_libs = libPath
)
})

on.exit(stopCluster(cl))
message(
"it took ", round(st[3], 2), "s to start ",
paste(paste(names(table(cores))), "x", table(cores), collapse = ", "), " threads"
)
message("Moving objects to each node in cluster")

stMoveObjects <- try({
system.time({
objsToCopy <- mget(unlist(objsNeeded))
objsToCopy <- lapply(objsToCopy, FUN = function(x) {
if (inherits(x, "SpatRaster")) {
x <- terra::wrap(x)
} else {
x
}
x
})
filenameForTransfer <- normalizePath(tempfile(fileext = ".qs"), mustWork = FALSE, winslash = "/")
dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE) # during development, this was deleted accidentally
qs::qsave(objsToCopy, file = filenameForTransfer)
stExport <- system.time({
outExp <- clusterExport(cl, varlist = "filenameForTransfer", envir = environment())
})
out11 <- clusterEvalQ(cl, {
dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE)
})
out <- lapply(setdiff(unique(cores), "localhost"), function(ip) {
st1 <- system.time(system(paste0("rsync -a ", filenameForTransfer, " ", ip, ":", filenameForTransfer)))
})
out <- clusterEvalQ(cl, {
out <- qs::qread(file = filenameForTransfer)
out <- lapply(out, FUN = function(x) {
if (inherits(x, "PackedSpatRaster")) {
x <- terra::unwrap(x)
on.exit(stopCluster(cl))
message(
"it took ", round(st[3], 2), "s to start ",
paste(paste(names(table(cores))), "x", table(cores), collapse = ", "), " threads"
)
message("Moving objects to each node in cluster")

stMoveObjects <- try({
system.time({
objsToCopy <- mget(unlist(objsNeeded))
objsToCopy <- lapply(objsToCopy, FUN = function(x) {
if (inherits(x, "SpatRaster")) {
x <- terra::wrap(x)
} else {
x
}
x
})
list2env(out, envir = .GlobalEnv)
})
# Delete the file
out <- clusterEvalQ(cl, {
if (dir.exists(dirname(filenameForTransfer))) {
try(unlink(dirname(filenameForTransfer), recursive = TRUE), silent = TRUE)
}
filenameForTransfer <- normalizePath(tempfile(fileext = ".qs"), mustWork = FALSE, winslash = "/")
dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE) # during development, this was deleted accidentally
qs::qsave(objsToCopy, file = filenameForTransfer)
stExport <- system.time({
outExp <- clusterExport(cl, varlist = "filenameForTransfer", envir = environment())
})
out11 <- clusterEvalQ(cl, {
dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE)
})
out <- lapply(setdiff(unique(cores), "localhost"), function(ip) {
rsync <- Sys.which("rsync")
st1 <- system.time(system(paste0(rsync, " -a ", filenameForTransfer, " ", ip, ":", filenameForTransfer)))
})
out <- clusterEvalQ(cl, {
out <- qs::qread(file = filenameForTransfer)
out <- lapply(out, FUN = function(x) {
if (inherits(x, "PackedSpatRaster")) {
x <- terra::unwrap(x)
} else {
x
}
x
})
list2env(out, envir = .GlobalEnv)
})
# Delete the file
out <- clusterEvalQ(cl, {
if (dir.exists(dirname(filenameForTransfer))) {
try(unlink(dirname(filenameForTransfer), recursive = TRUE), silent = TRUE)
}
})
})
})
})

if (is(stMoveObjects, "try-error")) {
message("The attempt to move objects to cluster using rsync and qs failed; trying clusterExport")
stMoveObjects <- system.time(clusterExport(cl, objsNeeded, envir = environment()))
list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv)
}
message("it took ", round(stMoveObjects[3], 2), "s to move objects to nodes")
message("loading packages in cluster nodes")

clusterExport(cl, "neededPkgs", envir = environment())
stPackages <- system.time(parallel::clusterEvalQ(
cl,
{
for (i in neededPkgs) {
library(i, character.only = TRUE)
}
message("loading ", i, " at ", Sys.time())
if (is(stMoveObjects, "try-error")) {
message("The attempt to move objects to cluster using rsync and qs failed; trying clusterExport")
stMoveObjects <- system.time(clusterExport(cl, objsNeeded, envir = environment()))
list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv)
}
))
message("it took ", round(stPackages[3], 2), "s to load packages")
message("it took ", round(stMoveObjects[3], 2), "s to move objects to nodes")
message("loading packages in cluster nodes")

control$cluster <- cl
} else {
list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv)
}
clusterExport(cl, "neededPkgs", envir = environment())
stPackages <- system.time(parallel::clusterEvalQ(
cl,
{
for (i in neededPkgs) {
library(i, character.only = TRUE)
}
message("loading ", i, " at ", Sys.time())
}
))
message("it took ", round(stPackages[3], 2), "s to load packages")

control$cluster <- cl
} else {
list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv)
}
}
#####################################################################
# DEOptim call
#####################################################################
Expand Down

0 comments on commit dcc8208

Please sign in to comment.