diff --git a/R/parallel.R b/R/parallel.R index f82be232d..ec0497327 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -313,52 +313,88 @@ queue_teardown <- function(queue) { tasks <- queue$list_tasks() num <- nrow(tasks) + # calling quit() here creates a race condition, and the output of + # the deferred_run() might be lost. Instead we close the input + # connection in a separate task. clean_fn <- function() { withr::deferred_run(.GlobalEnv) - quit(save = "no", status = 1L, runLast = TRUE) } - topoll <- list() + topoll <- integer() for (i in seq_len(num)) { - if (!is.null(tasks$worker[[i]])) { + if ( + !is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle" + ) { # The worker might have crashed or exited, so this might fail. # If it does then we'll just ignore that worker tryCatch( { tasks$worker[[i]]$call(clean_fn) - topoll <- c(topoll, tasks$worker[[i]]$get_poll_connection()) + topoll <- c(topoll, i) }, - error = function(e) tasks$worker[i] <- list(NULL) + error = function(e) NULL ) } } - # Give covr time to write out the coverage files + # Give covr a bit more time if (in_covr()) { grace <- 30L } else { - grace <- 3L + grace <- 1L } + first_error <- NULL limit <- Sys.time() + grace while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) { timeout <- as.double(timeout, units = "secs") * 1000 - pr <- processx::poll(topoll, as.integer(timeout)) + conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection()) + pr <- unlist(processx::poll(conns, as.integer(timeout))) + for (i in which(pr == "ready")) { + msg <- tasks$worker[[topoll[i]]]$read() + first_error <- first_error %||% msg$error + } topoll <- topoll[pr != "ready"] } + topoll <- integer() for (i in seq_len(num)) { - if (!is.null(tasks$worker[[i]])) { + if ( + !is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle" + ) { tryCatch( - close(tasks$worker[[i]]$get_input_connection()), + { + close(tasks$worker[[i]]$get_input_connection()) + topoll <- c(topoll, i) + }, error = function(e) NULL ) + } + } + + limit <- Sys.time() + grace + while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) { + timeout <- as.double(timeout, units = "secs") * 1000 + conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection()) + pr <- unlist(processx::poll(conns, as.integer(timeout))) + topoll <- topoll[pr != "ready"] + } + + for (i in seq_len(num)) { + if (!is.null(tasks$worker[[i]])) { if (ps::ps_is_supported()) { - tasks$worker[[i]]$kill_tree() + tryCatch(tasks$worker[[i]]$kill_tree(), error = function(e) NULL) } else { - tasks$worker[[i]]$kill() + tryCatch(tasks$worker[[i]]$kill(), error = function(e) NULL) } } } + + if (!is.null(first_error)) { + cli::cli_abort( + "At least one parallel worker failed to run teardown", + parent = first_error + ) + } } # Reporter that just forwards events in the subprocess back to the main process diff --git a/tests/testthat/test-parallel-teardown.R b/tests/testthat/test-parallel-teardown.R index b0eee4338..7634cb7f7 100644 --- a/tests/testthat/test-parallel-teardown.R +++ b/tests/testthat/test-parallel-teardown.R @@ -1,5 +1,4 @@ test_that("teardown error", { - skip("teardown errors are ignored") skip_on_covr() withr::local_envvar(TESTTHAT_PARALLEL = "TRUE") err <- tryCatch( @@ -9,6 +8,14 @@ test_that("teardown error", { ))), error = function(e) e ) - expect_s3_class(err, "testthat_process_error") - expect_match(err$message, "Error in teardown", fixed = TRUE) + expect_s3_class(err$parent, "callr_error") + expect_match( + err$message, + "At least one parallel worker failed to run teardown" + ) + expect_match( + err$parent$parent$parent$message, + "Error in teardown", + fixed = TRUE + ) })