From c81fe7b1f914963409076e53b7456584941d133f Mon Sep 17 00:00:00 2001 From: Ian Eddy Date: Fri, 12 Jul 2024 22:25:18 +0000 Subject: [PATCH 1/2] rming require --- R/DEoptim_fns.R | 232 +++++++++++++++++++++++------------------------- 1 file changed, 112 insertions(+), 120 deletions(-) diff --git a/R/DEoptim_fns.R b/R/DEoptim_fns.R index c838221..eac5c10 100644 --- a/R/DEoptim_fns.R +++ b/R/DEoptim_fns.R @@ -141,10 +141,10 @@ runDEoptim <- function(landscape, "_pid", Sys.getpid(), ".log" ) ) - message(crayon::blurred(paste0( + message(paste0( "Starting parallel model fitting for ", "fireSense_SpreadFit. Log: ", logPath - ))) + )) # Make sure logPath can be written in the workers -- need to create the dir @@ -154,148 +154,140 @@ runDEoptim <- function(landscape, # multiple times per machine, if not all 'localhost' revtunnel <- FALSE neededPkgs <- c("kSamples", "magrittr", "raster", "data.table", - "SpaDES.tools", "fireSenseUtils") + "SpaDES.tools", "fireSenseUtils", "sf") - # browser() if (!identical("localhost", unique(cores))) { repos <- c("https://predictiveecology.r-universe.dev", getOption("repos")) aa <- Require::pkgDep(unique(c("dqrng", "PredictiveEcology/SpaDES.tools@development", "PredictiveEcology/fireSenseUtils@development", "qs", "RCurl", neededPkgs)), recursive = TRUE) + pkgsNeeded <- unique(Require::extractPkgName(unname(unlist(aa)))) + revtunnel <- ifelse(all(cores == "localhost"), FALSE, TRUE) - coresUnique <- setdiff(unique(cores), "localhost") - message( - "Making sure packages with sufficient versions installed and loaded on: ", - paste(coresUnique, collapse = ", ") - ) - st <- system.time({ - cl <- parallelly::makeClusterPSOCK(coresUnique, revtunnel = revtunnel, rscript_libs = libPath) - }) - packageVersionFSU <- packageVersion("fireSenseUtils") - packageVersionST <- packageVersion("SpaDES.tools") - clusterExport(cl, list("libPath", "logPath", "packageVersionFSU", "packageVersionST", "repos"), - envir = environment()) - - parallel::clusterEvalQ( - cl, - { - # If this is first time that packages need to be installed for this user on this machine - # there won't be a folder present that is writable - if (!dir.exists(libPath)) { - dir.create(libPath, recursive = TRUE) - if (!"Require" %in% rownames(utils::installed.packages())) { - repos <- c("predictiveecology.r-universe.dev", getOption("repos")) - install.packages("Require", repos = repos) - } else if (packageVersion("Require") < "0.3.1.9098") { - repos <- c("predictiveecology.r-universe.dev", getOption("repos")) - install.packages("Require", repos = repos) - } + coresUnique <- setdiff(unique(cores), "localhost") + message( + "copying packages to: ", + paste(coresUnique, collapse = ", ") + ) + st <- system.time({ + cl <- parallelly::makeClusterPSOCK(coresUnique, revtunnel = revtunnel, rscript_libs = libPath) + }) + clusterExport(cl, list("libPath", "logPath", "repos", "pkgsNeeded"), + envir = environment()) + + parallel::clusterEvalQ(cl, { + # If this is first time that packages need to be installed for this user on this machine + # there won't be a folder present that is writable + if (!dir.exists(libPath)) { + dir.create(libPath, recursive = TRUE) + } + }) - if (FALSE) { - ## This will install the versions of SpaDES.tools and fireSenseUtils that are on the main machine - Require::Require(c("dqrng", "SpaDES.tools", "fireSenseUtils"), repos = repos) - } + out <- lapply(setdiff(unique(cores), "localhost"), function(ip) { - } - ) - pkgsNeeded <- unique(Require::extractPkgName(unname(unlist(aa)))) - out <- lapply(setdiff(unique(cores), "localhost"), function(ip) { - system(paste0("rsync -aruv --update ", paste(file.path(libPath, pkgsNeeded), collapse = " "), - " ", ip, ":", libPath)) - }) + rsync <- Sys.which("rsync") + if (!nzchar(rsync)) stop() + system(paste0(rsync, " -aruv --update ", paste(file.path(libPath, pkgsNeeded), collapse = " "), + " ", ip, ":", libPath)) + }) - parallel::stopCluster(cl) - } + GDALversions <- parallel::clusterEvalQ(cl, { + return(sf::sf_extSoftVersion()["GDAL"]) + }) - ## Now make full cluster with one worker per core listed in "cores" - message("Starting ", paste(paste(names(table(cores))), "x", table(cores), - collapse = ", " - ), " clusters") - message("Starting main parallel cluster ...") - st <- system.time({ - cl <- parallelly::makeClusterPSOCK(cores, - revtunnel = revtunnel, - outfile = logPath, rscript_libs = libPath - ) - }) + stopifnot(length(unique(sf::sf_extSoftVersion()["GDAL"], GDALversions)) == 1) + parallel::stopCluster(cl) + + ## Now make full cluster with one worker per core listed in "cores" + message("Starting ", paste(paste(names(table(cores))), "x", table(cores), + collapse = ", " + ), " clusters") + message("Starting main parallel cluster ...") + st <- system.time({ + cl <- parallelly::makeClusterPSOCK(cores, + revtunnel = revtunnel, + outfile = logPath, rscript_libs = libPath + ) + }) - on.exit(stopCluster(cl)) - message( - "it took ", round(st[3], 2), "s to start ", - paste(paste(names(table(cores))), "x", table(cores), collapse = ", "), " threads" - ) - message("Moving objects to each node in cluster") - - stMoveObjects <- try({ - system.time({ - objsToCopy <- mget(unlist(objsNeeded)) - objsToCopy <- lapply(objsToCopy, FUN = function(x) { - if (inherits(x, "SpatRaster")) { - x <- terra::wrap(x) - } else { - x - } - x - }) - filenameForTransfer <- normalizePath(tempfile(fileext = ".qs"), mustWork = FALSE, winslash = "/") - dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE) # during development, this was deleted accidentally - qs::qsave(objsToCopy, file = filenameForTransfer) - stExport <- system.time({ - outExp <- clusterExport(cl, varlist = "filenameForTransfer", envir = environment()) - }) - out11 <- clusterEvalQ(cl, { - dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE) - }) - out <- lapply(setdiff(unique(cores), "localhost"), function(ip) { - st1 <- system.time(system(paste0("rsync -a ", filenameForTransfer, " ", ip, ":", filenameForTransfer))) - }) - out <- clusterEvalQ(cl, { - out <- qs::qread(file = filenameForTransfer) - out <- lapply(out, FUN = function(x) { - if (inherits(x, "PackedSpatRaster")) { - x <- terra::unwrap(x) + on.exit(stopCluster(cl)) + message( + "it took ", round(st[3], 2), "s to start ", + paste(paste(names(table(cores))), "x", table(cores), collapse = ", "), " threads" + ) + message("Moving objects to each node in cluster") + + stMoveObjects <- try({ + system.time({ + objsToCopy <- mget(unlist(objsNeeded)) + objsToCopy <- lapply(objsToCopy, FUN = function(x) { + if (inherits(x, "SpatRaster")) { + x <- terra::wrap(x) } else { x } x }) - list2env(out, envir = .GlobalEnv) - }) - # Delete the file - out <- clusterEvalQ(cl, { - if (dir.exists(dirname(filenameForTransfer))) { - try(unlink(dirname(filenameForTransfer), recursive = TRUE), silent = TRUE) - } + filenameForTransfer <- normalizePath(tempfile(fileext = ".qs"), mustWork = FALSE, winslash = "/") + dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE) # during development, this was deleted accidentally + qs::qsave(objsToCopy, file = filenameForTransfer) + stExport <- system.time({ + outExp <- clusterExport(cl, varlist = "filenameForTransfer", envir = environment()) + }) + out11 <- clusterEvalQ(cl, { + dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE) + }) + out <- lapply(setdiff(unique(cores), "localhost"), function(ip) { + rsync <- Sys.which("rsync") + st1 <- system.time(system(paste0(rsync, " -a ", filenameForTransfer, " ", ip, ":", filenameForTransfer))) + }) + out <- clusterEvalQ(cl, { + out <- qs::qread(file = filenameForTransfer) + out <- lapply(out, FUN = function(x) { + if (inherits(x, "PackedSpatRaster")) { + x <- terra::unwrap(x) + } else { + x + } + x + }) + list2env(out, envir = .GlobalEnv) + }) + # Delete the file + out <- clusterEvalQ(cl, { + if (dir.exists(dirname(filenameForTransfer))) { + try(unlink(dirname(filenameForTransfer), recursive = TRUE), silent = TRUE) + } + }) }) }) - }) - if (is(stMoveObjects, "try-error")) { - message("The attempt to move objects to cluster using rsync and qs failed; trying clusterExport") - stMoveObjects <- system.time(clusterExport(cl, objsNeeded, envir = environment())) - list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv) - } - message("it took ", round(stMoveObjects[3], 2), "s to move objects to nodes") - message("loading packages in cluster nodes") - - clusterExport(cl, "neededPkgs", envir = environment()) - stPackages <- system.time(parallel::clusterEvalQ( - cl, - { - for (i in neededPkgs) { - library(i, character.only = TRUE) - } - message("loading ", i, " at ", Sys.time()) + if (is(stMoveObjects, "try-error")) { + message("The attempt to move objects to cluster using rsync and qs failed; trying clusterExport") + stMoveObjects <- system.time(clusterExport(cl, objsNeeded, envir = environment())) + list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv) } - )) - message("it took ", round(stPackages[3], 2), "s to load packages") + message("it took ", round(stMoveObjects[3], 2), "s to move objects to nodes") + message("loading packages in cluster nodes") + + clusterExport(cl, "neededPkgs", envir = environment()) + stPackages <- system.time(parallel::clusterEvalQ( + cl, + { + for (i in neededPkgs) { + library(i, character.only = TRUE) + } + message("loading ", i, " at ", Sys.time()) + } + )) + message("it took ", round(stPackages[3], 2), "s to load packages") - control$cluster <- cl - } else { - list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv) + control$cluster <- cl + } else { + list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv) + } } - ##################################################################### # DEOptim call ##################################################################### From 5bc8c459cd9382b261e6d6fbeb7396ad13855b04 Mon Sep 17 00:00:00 2001 From: ianmseddy Date: Tue, 16 Jul 2024 14:13:29 -0700 Subject: [PATCH 2/2] v bump for the new version --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index d9c3203..af55ec3 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -3,7 +3,7 @@ Type: Package Title: Utilities for Working With the 'fireSense' Group of 'SpaDES' Modules Description: Utilities for working with the 'fireSense' group of 'SpaDES' modules. Date: 2024-07-10 -Version: 0.0.5.9070 +Version: 0.0.5.9071 Authors@R: c( person("Jean", "Marchal", email = "jean.d.marchal@gmail.com", role = c("aut")),