Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use same package versions on cluster machines as source #19

Merged
merged 6 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 71 additions & 52 deletions R/DEoptim_fns.R
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
.verbose,
visualizeDEoptim,
.plotSize = list(height = 1600, width = 2000)) {
if (isTRUE(is.na(cores))) cores <- NULL

Check warning on line 100 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L100

Added line #L100 was not covered by tests
origBlas <- blas_get_num_procs()
if (origBlas > 1) {
blas_set_num_threads(1)
Expand Down Expand Up @@ -150,61 +151,79 @@
## Make cluster with just one worker per machine --> don't need to do these steps
# multiple times per machine, if not all 'localhost'
revtunnel <- FALSE
neededPkgs <- c("kSamples", "magrittr", "raster", "data.table",
"SpaDES.tools", "fireSenseUtils")

Check warning on line 155 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L154-L155

Added lines #L154 - L155 were not covered by tests

# browser()
if (!identical("localhost", unique(cores))) {
revtunnel <- ifelse(all(cores == "localhost"), FALSE, TRUE)
repos <- c("https://predictiveecology.r-universe.dev", getOption("repos"))

Check warning on line 159 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L159

Added line #L159 was not covered by tests

coresUnique <- setdiff(unique(cores), "localhost")
message(
"Making sure packages with sufficient versions installed and loaded on: ",
paste(coresUnique, collapse = ", ")
)
st <- system.time({
cl <- parallelly::makeClusterPSOCK(coresUnique, revtunnel = revtunnel, rscript_libs = libPath)
})
packageVersionFSU <- packageVersion("fireSenseUtils")
packageVersionST <- packageVersion("SpaDES.tools")
clusterExport(cl, list("libPath", "logPath", "packageVersionFSU", "packageVersionST"), envir = environment())

parallel::clusterEvalQ(
cl,
{
# If this is first time that packages need to be installed for this user on this machine
# there won't be a folder present that is writable
if (!dir.exists(libPath)) {
dir.create(libPath, recursive = TRUE)
aa <- Require::pkgDep(unique(c("dqrng", "PredictiveEcology/SpaDES.tools@development",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to be careful allowing Require to install update anything without specifying exact versions (i.e., those installed on the main node); otherwise we get mismatched packages on each node.

Can you confirm that the versions installed are exactly the same, and that it's not just grabbing the latest versions of everything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't installing them ... this is just determining which to use. It copies them from the source folder. Previously this was not checking versions. Now it is using the exact version because they are being copied.

"PredictiveEcology/fireSenseUtils@development", "qs", "RCurl", neededPkgs)), recursive = TRUE)
revtunnel <- ifelse(all(cores == "localhost"), FALSE, TRUE)

Check warning on line 163 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L161-L163

Added lines #L161 - L163 were not covered by tests

coresUnique <- setdiff(unique(cores), "localhost")
message(
"Making sure packages with sufficient versions installed and loaded on: ",
paste(coresUnique, collapse = ", ")

Check warning on line 168 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L165-L168

Added lines #L165 - L168 were not covered by tests
)
st <- system.time({
cl <- parallelly::makeClusterPSOCK(coresUnique, revtunnel = revtunnel, rscript_libs = libPath)

Check warning on line 171 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L170-L171

Added lines #L170 - L171 were not covered by tests
})
packageVersionFSU <- packageVersion("fireSenseUtils")
packageVersionST <- packageVersion("SpaDES.tools")
clusterExport(cl, list("libPath", "logPath", "packageVersionFSU", "packageVersionST", "repos"),
envir = environment())

Check warning on line 176 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L173-L176

Added lines #L173 - L176 were not covered by tests

parallel::clusterEvalQ(
cl,

Check warning on line 179 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L178-L179

Added lines #L178 - L179 were not covered by tests
{
# If this is first time that packages need to be installed for this user on this machine
# there won't be a folder present that is writable
if (!dir.exists(libPath)) {
stop("libPath directory creation failed.\n",
"Try creating on each machine manually, using e.g.,\n",
" mkdir -p ", libPath)
dir.create(libPath, recursive = TRUE)

Check warning on line 184 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L184

Added line #L184 was not covered by tests

if (!dir.exists(libPath)) {
stop("libPath directory creation failed.\n",
"Try creating on each machine manually, using e.g.,\n",
" mkdir -p ", libPath)

Check warning on line 189 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L186-L189

Added lines #L186 - L189 were not covered by tests
}
}
}

if (!"Require" %in% rownames(utils::installed.packages())) {
remotes::install_github("PredictiveEcology/Require@development")
} else if (packageVersion("Require") < "0.1.0.9000") {
remotes::install_github("PredictiveEcology/Require@development")
}
# logPath <- Require::checkPath(dirname(logPath), create = TRUE)
logPath <- dir.create(dirname(logPath), showWarnings = FALSE, recursive = TRUE)

Check warning on line 194 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L194

Added line #L194 was not covered by tests

message(Sys.info()[["nodename"]])

Check warning on line 196 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L196

Added line #L196 was not covered by tests

## Use the binary packages for install if Ubuntu & Linux
Require::setLinuxBinaryRepo()
#scp -r /home/emcintir/.local/share/R/Edehzhie/packages/x86_64-pc-linux-gnu/4.3/fireSenseUtils emcintir@10.│^C
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops

#20.0.97:/home/emcintir/.local/share/R/Edehzhie/packages/x86_64-pc-linux-gnu/4.3

logPath <- Require::checkPath(dirname(logPath), create = TRUE)
needRequire <- FALSE
if (!"Require" %in% rownames(utils::installed.packages())) {
needRequire <- TRUE
} else if (packageVersion("Require") < "0.1.0.9000") {
needRequire <- TRUE

Check warning on line 205 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L201-L205

Added lines #L201 - L205 were not covered by tests
}
if (isTRUE(needRequire))
install.packages("Require", repos = repos, lib = libPath)

Check warning on line 208 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L207-L208

Added lines #L207 - L208 were not covered by tests

message(Sys.info()[["nodename"]])
## Use the binary packages for install if Ubuntu & Linux
# Require::setLinuxBinaryRepo()

## This will install the versions of SpaDES.tools and fireSenseUtils that are on the main machine
Require::Require(
c(
"dqrng",
paste0("PredictiveEcology/SpaDES.tools@development (>=", packageVersionST, ")"),
paste0("PredictiveEcology/fireSenseUtils@development (>=", packageVersionFSU, ")")
),
upgrade = FALSE
)
}
)
parallel::stopCluster(cl)
if (FALSE) {

Check warning on line 213 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L213

Added line #L213 was not covered by tests
## This will install the versions of SpaDES.tools and fireSenseUtils that are on the main machine
Require::Require(c("dqrng", "SpaDES.tools", "fireSenseUtils"), repos = repos)

Check warning on line 215 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L215

Added line #L215 was not covered by tests
}

}
)
pkgsNeeded <- unique(Require::extractPkgName(unname(unlist(aa))))
out <- lapply(setdiff(unique(cores), "localhost"), function(ip) {
system(paste0("rsync -aruv --update ", paste(file.path(libPath, pkgsNeeded), collapse = " "),
" ", ip, ":", libPath))

Check warning on line 223 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L220-L223

Added lines #L220 - L223 were not covered by tests
})

parallel::stopCluster(cl)

Check warning on line 226 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L226

Added line #L226 was not covered by tests
}

## Now make full cluster with one worker per core listed in "cores"
Expand Down Expand Up @@ -237,14 +256,14 @@
}
x
})
filenameForTransfer <- Require::normPath(tempfile(fileext = ".qs"))
Require::checkPath(dirname(filenameForTransfer), create = TRUE) # during development, this was deleted accidentally
filenameForTransfer <- normalizePath(tempfile(fileext = ".qs"), mustWork = FALSE, winslash = "/")
dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE) # during development, this was deleted accidentally

Check warning on line 260 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L259-L260

Added lines #L259 - L260 were not covered by tests
qs::qsave(objsToCopy, file = filenameForTransfer)
stExport <- system.time({
outExp <- clusterExport(cl, varlist = "filenameForTransfer", envir = environment())
})
out11 <- clusterEvalQ(cl, {
Require::checkPath(dirname(filenameForTransfer), create = TRUE)
dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE)

Check warning on line 266 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L266

Added line #L266 was not covered by tests
})
out <- lapply(setdiff(unique(cores), "localhost"), function(ip) {
st1 <- system.time(system(paste0("rsync -a ", filenameForTransfer, " ", ip, ":", filenameForTransfer)))
Expand Down Expand Up @@ -277,13 +296,12 @@
}
message("it took ", round(stMoveObjects[3], 2), "s to move objects to nodes")
message("loading packages in cluster nodes")

clusterExport(cl, "neededPkgs", envir = environment())

Check warning on line 300 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L300

Added line #L300 was not covered by tests
stPackages <- system.time(parallel::clusterEvalQ(
cl,
{
for (i in c(
"kSamples", "magrittr", "raster", "data.table",
"SpaDES.tools", "fireSenseUtils"
)) {
for (i in neededPkgs) {

Check warning on line 304 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L304

Added line #L304 was not covered by tests
library(i, character.only = TRUE)
}
message("loading ", i, " at ", Sys.time())
Expand Down Expand Up @@ -417,6 +435,7 @@
mutuallyExclusive = mutuallyExclusive,
doAssertions = doObjFunAssertions,
Nreps = Nreps,
plot.it = FALSE,

Check warning on line 438 in R/DEoptim_fns.R

View check run for this annotation

Codecov / codecov/patch

R/DEoptim_fns.R#L438

Added line #L438 was not covered by tests
controlForCache = controlForCache,
objFunCoresInternal = objFunCoresInternal,
thresh = thresh,
Expand Down
2 changes: 2 additions & 0 deletions R/cleanUpSpreadFirePoints.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#' @importFrom sf st_as_sf st_crs
#' @importFrom terra extract xyFromCell
cleanUpSpreadFirePoints <- function(firePoints, bufferDT, flammableRTM) {
if (is.data.table(bufferDT))
bufferDT <- as.data.table(bufferDT)

Check warning on line 23 in R/cleanUpSpreadFirePoints.R

View check run for this annotation

Codecov / codecov/patch

R/cleanUpSpreadFirePoints.R#L22-L23

Added lines #L22 - L23 were not covered by tests
FlamPoints <- as.data.table(extract(flammableRTM, firePoints, cells = TRUE))
setnames(FlamPoints, c("ID", "isFlammable", "cells"))
FlamPoints[, isFlammable := as.numeric(as.character(isFlammable))] #otherwise factor = 1 and 2
Expand Down
29 changes: 18 additions & 11 deletions R/objFunSpread.R
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@
medSPRight <- medSP <= maxFireSpread & medSP >= lowerSpreadProb
spreadOutEnough <- sdSP / medSP > 0.025
ret <- list()
minLik <- 1e-19 # min(emp$lik[emp$lik > 0])
minLik <- 1e-29 # min(emp$lik[emp$lik > 0])

Check warning on line 405 in R/objFunSpread.R

View check run for this annotation

Codecov / codecov/patch

R/objFunSpread.R#L405

Added line #L405 was not covered by tests
loci <- annualFires$cells
summ <- summary(nonEdgeValues)
lowSPLowEnough <- summ[2] < lanscape1stQuantileThresh
Expand All @@ -424,8 +424,6 @@
}
}

# att <- try(if (medSPRight && spreadOutEnough) { "hi" })
# if (is(att, "try-error")) browser()
if (medSPRight && spreadOutEnough && lowSPLowEnough) {
if (verbose) {
ww <- if (isTRUE(weighted)) "weighted" else "unweighted"
Expand All @@ -443,6 +441,8 @@
minSize <- 100
if (doAssertions || plot.it) {
tableOfBufferedMaps <- annualFireBufferedDT[, list(numAvailPixels = .N), by = "ids"]
tableOfBufferedMaps <- tableOfBufferedMaps[annualFires, on = "ids"]
setnames(tableOfBufferedMaps, old = "cells", new = "initialPixels")

Check warning on line 445 in R/objFunSpread.R

View check run for this annotation

Codecov / codecov/patch

R/objFunSpread.R#L444-L445

Added lines #L444 - L445 were not covered by tests
minSizes <- tableOfBufferedMaps$numAvailPixels
minSize <- quantile(minSizes, 0.3)
if (minSize < 2000) {
Expand All @@ -463,17 +463,20 @@
maxSizes <- maxSizes[!dups]
loci <- annualFires$cells[!dups]
}
spreadState <- lapply(seq_len(Nreps), function(i) {
SpaDES.tools::spread2(
st <- system.time(spreadState <- lapply(seq_len(Nreps), function(i) {
SpaDES.tools::spread(

Check warning on line 467 in R/objFunSpread.R

View check run for this annotation

Codecov / codecov/patch

R/objFunSpread.R#L466-L467

Added lines #L466 - L467 were not covered by tests
# SpaDES.tools::spread2(
landscape = r,
maxSize = maxSizes,
start = loci,
# start = loci,
loci = loci,

Check warning on line 472 in R/objFunSpread.R

View check run for this annotation

Codecov / codecov/patch

R/objFunSpread.R#L472

Added line #L472 was not covered by tests
spreadProb = cells,
asRaster = FALSE,
# asRaster = FALSE,
returnIndices = TRUE,

Check warning on line 475 in R/objFunSpread.R

View check run for this annotation

Codecov / codecov/patch

R/objFunSpread.R#L475

Added line #L475 was not covered by tests
allowOverlap = FALSE,
skipChecks = TRUE
)
})
}))
if (isTRUE(plot.it)) {
par(
mfrow = c(7, 7), omi = c(0.5, 0, 0, 0),
Expand All @@ -491,11 +494,16 @@
)
}
spreadState <- rbindlist(spreadState, idcol = "rep")
if ("indices" %in% colnames(spreadState))
setnames(spreadState, old = "indices", "pixels")
if ("initialLocus" %in% colnames(spreadState) )
setnames(spreadState, old = "initialLocus", "initialPixels")

Check warning on line 500 in R/objFunSpread.R

View check run for this annotation

Codecov / codecov/patch

R/objFunSpread.R#L497-L500

Added lines #L497 - L500 were not covered by tests
if (isTRUE(doSNLL_FSTest)) {
emp <- spreadState[, list(N = .N), by = c("rep", "initialPixels")]
emp <- emp[annualFires, on = c("initialPixels" = "cells")]
if (plot.it) {
emp <- tableOfBufferedMaps[emp, on = c("initialPixels"), nomatch = NULL]
colsToKeep <- c(setdiff(colnames(tableOfBufferedMaps), colnames(emp)), "initialPixels")
emp <- tableOfBufferedMaps[, ..colsToKeep][emp, on = c("initialPixels"), nomatch = NULL]

Check warning on line 506 in R/objFunSpread.R

View check run for this annotation

Codecov / codecov/patch

R/objFunSpread.R#L505-L506

Added lines #L505 - L506 were not covered by tests
maxX <- log(max(c(annualFires$size, emp$N, emp$numAvailPixels)))
emp <- setorderv(emp, c("size"), order = -1L)
numLargest <- 4
Expand All @@ -511,7 +519,6 @@
} else {
uniqueEmpIds
}
if (is(sam, "try-error")) browser()
emp[ids %in% sam,
{
dat <- round(log(N))
Expand Down Expand Up @@ -627,7 +634,7 @@
predLiklihood <- rast(r)
predLiklihood[out$pixelID] <- predictedLiklihood
predLiklihood <- crop(predLiklihood, ex)
spIgnits <- SpatialPoints(coords = xyFromCell(r, thisFire$cells))
spIgnits <- terra::vect(xyFromCell(r, thisFire$cells))

Check warning on line 637 in R/objFunSpread.R

View check run for this annotation

Codecov / codecov/patch

R/objFunSpread.R#L637

Added line #L637 was not covered by tests
spIgnits <- buffer(spIgnits, width = 5000)
spIgnits <- crop(spIgnits, ex)
list(
Expand Down