From be38592536c090715eef4897f34f67db6dd45b96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Wed, 13 Aug 2025 14:02:43 +0200 Subject: [PATCH 1/5] Handle errors in parallel teardown files I also changed the exit status of the subprocesses to 0 (normal exit), from 1. I didn't find any reason why it was 1, hopefully there wasn't any. It is a normal exit, so 0 makes more senset to me (now). Closes #1165. --- R/parallel.R | 19 ++++++++++++++++--- tests/testthat/test-parallel-teardown.R | 13 ++++++++++--- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/R/parallel.R b/R/parallel.R index 56bffe3e1..d5ec5b140 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -333,7 +333,7 @@ queue_teardown <- function(queue) { clean_fn <- function() { withr::deferred_run(.GlobalEnv) - quit(save = "no", status = 1L, runLast = TRUE) + quit(save = "no", status = 0L, runLast = TRUE) } topoll <- list() @@ -344,7 +344,7 @@ queue_teardown <- function(queue) { tryCatch( { tasks$worker[[i]]$call(clean_fn) - topoll <- c(topoll, tasks$worker[[i]]$get_poll_connection()) + topoll <- c(topoll, tasks$worker[[i]]) }, error = function(e) tasks$worker[i] <- list(NULL) ) @@ -357,10 +357,16 @@ queue_teardown <- function(queue) { } else { grace <- 3L } + first_error <- NULL limit <- Sys.time() + grace while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) { timeout <- as.double(timeout, units = "secs") * 1000 - pr <- processx::poll(topoll, as.integer(timeout)) + conns <- lapply(topoll, function(x) x$get_poll_connection()) + pr <- unlist(processx::poll(conns, as.integer(timeout))) + for (i in which(pr == "ready")) { + msg <- topoll[[i]]$read() + first_error <- first_error %||% msg$error + } topoll <- topoll[pr != "ready"] } @@ -377,6 +383,13 @@ queue_teardown <- function(queue) { } } } + + if (!is.null(first_error)) { + cli::cli_abort( + "At least one parallel worker failed to run teardown", + parent = first_error + ) + } } # Reporter that just forwards events in the subprocess back to the main process diff --git a/tests/testthat/test-parallel-teardown.R b/tests/testthat/test-parallel-teardown.R index b3a2dd765..fecbd826b 100644 --- a/tests/testthat/test-parallel-teardown.R +++ b/tests/testthat/test-parallel-teardown.R @@ -1,5 +1,4 @@ test_that("teardown error", { - skip("teardown errors are ignored") withr::local_envvar(TESTTHAT_PARALLEL = "TRUE") err <- tryCatch( capture.output(suppressMessages(testthat::test_local( @@ -8,6 +7,14 @@ test_that("teardown error", { ))), error = function(e) e ) - expect_s3_class(err, "testthat_process_error") - expect_match(err$message, "Error in teardown", fixed = TRUE) + expect_s3_class(err$parent, "callr_error") + expect_match( + err$message, + "At least one parallel worker failed to run teardown" + ) + expect_match( + err$parent$parent$parent$message, + "Error in teardown", + fixed = TRUE + ) }) From f7d6b596874ee7f4180af2d0c6d32e92b6cb6709 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sat, 11 Oct 2025 11:05:10 +0200 Subject: [PATCH 2/5] Only call teardown on idle workers It is an error to call it on busy ones. This might happen on an interrupt. --- R/parallel.R | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/R/parallel.R b/R/parallel.R index 88a0a66b1..86ea3a473 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -320,7 +320,9 @@ queue_teardown <- function(queue) { topoll <- list() for (i in seq_len(num)) { - if (!is.null(tasks$worker[[i]])) { + if ( + !is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle" + ) { # The worker might have crashed or exited, so this might fail. # If it does then we'll just ignore that worker tryCatch( From a5b90c1536fdb57c33495d06448f56dd2119c2ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sat, 11 Oct 2025 11:06:25 +0200 Subject: [PATCH 3/5] Need to tryCatch() worker cleanup In case the process has terminated already. --- R/parallel.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/parallel.R b/R/parallel.R index 86ea3a473..22a163bb7 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -361,9 +361,9 @@ queue_teardown <- function(queue) { error = function(e) NULL ) if (ps::ps_is_supported()) { - tasks$worker[[i]]$kill_tree() + tryCatch(tasks$worker[[i]]$kill_tree(), error = function(e) NULL) } else { - tasks$worker[[i]]$kill() + tryCatch(tasks$worker[[i]]$kill(), error = function(e) NULL) } } } From 337139e02d8606f07ff04efc6a5c75fee6e8fdec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sat, 11 Oct 2025 11:06:48 +0200 Subject: [PATCH 4/5] Do not call quit() in worker teardown Because it creates a race condition and the output of the teardown function might be lost. This means that covr probably does not work for these workers, because there is no time for it to write out the coverage counters. --- R/parallel.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/R/parallel.R b/R/parallel.R index 22a163bb7..20036f355 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -313,9 +313,10 @@ queue_teardown <- function(queue) { tasks <- queue$list_tasks() num <- nrow(tasks) + # calling quit() here creates a race condition, and the output of + # the deferred_run() might be lost. clean_fn <- function() { withr::deferred_run(.GlobalEnv) - quit(save = "no", status = 0L, runLast = TRUE) } topoll <- list() From 92acd3dfc6924d8950bd9293fa8d1dbb87c29604 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sat, 11 Oct 2025 11:20:10 +0200 Subject: [PATCH 5/5] Improve parallel teardown Close the input connection first, and then give the workers some time to quit. This helps covr to write out the coverage counters. --- R/parallel.R | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/R/parallel.R b/R/parallel.R index 20036f355..ec0497327 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -314,12 +314,13 @@ queue_teardown <- function(queue) { num <- nrow(tasks) # calling quit() here creates a race condition, and the output of - # the deferred_run() might be lost. + # the deferred_run() might be lost. Instead we close the input + # connection in a separate task. clean_fn <- function() { withr::deferred_run(.GlobalEnv) } - topoll <- list() + topoll <- integer() for (i in seq_len(num)) { if ( !is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle" @@ -329,38 +330,57 @@ queue_teardown <- function(queue) { tryCatch( { tasks$worker[[i]]$call(clean_fn) - topoll <- c(topoll, tasks$worker[[i]]) + topoll <- c(topoll, i) }, - error = function(e) tasks$worker[i] <- list(NULL) + error = function(e) NULL ) } } - # Give covr time to write out the coverage files + # Give covr a bit more time if (in_covr()) { grace <- 30L } else { - grace <- 3L + grace <- 1L } first_error <- NULL limit <- Sys.time() + grace while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) { timeout <- as.double(timeout, units = "secs") * 1000 - conns <- lapply(topoll, function(x) x$get_poll_connection()) + conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection()) pr <- unlist(processx::poll(conns, as.integer(timeout))) for (i in which(pr == "ready")) { - msg <- topoll[[i]]$read() + msg <- tasks$worker[[topoll[i]]]$read() first_error <- first_error %||% msg$error } topoll <- topoll[pr != "ready"] } + topoll <- integer() for (i in seq_len(num)) { - if (!is.null(tasks$worker[[i]])) { + if ( + !is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle" + ) { tryCatch( - close(tasks$worker[[i]]$get_input_connection()), + { + close(tasks$worker[[i]]$get_input_connection()) + topoll <- c(topoll, i) + }, error = function(e) NULL ) + } + } + + limit <- Sys.time() + grace + while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) { + timeout <- as.double(timeout, units = "secs") * 1000 + conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection()) + pr <- unlist(processx::poll(conns, as.integer(timeout))) + topoll <- topoll[pr != "ready"] + } + + for (i in seq_len(num)) { + if (!is.null(tasks$worker[[i]])) { if (ps::ps_is_supported()) { tryCatch(tasks$worker[[i]]$kill_tree(), error = function(e) NULL) } else {