Skip to content

Commit

Permalink
Queue improvements (#695)
Browse files Browse the repository at this point in the history
* Introduce new waiting state
* Display the state in the progress bar (and drop the ETA)
* Record the last response/request when there's an error
* Ignore max retries
* Show retries in progress bar, and document it
  • Loading branch information
hadley authored Feb 21, 2025
1 parent a11740f commit 0a2caa0
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 44 deletions.
75 changes: 49 additions & 26 deletions R/req-perform-parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
#' parallel. Never use it without [req_throttle()]; otherwise it's too easy to
#' pummel a server with a very large number of simultaneous requests.
#'
#' While running, you'll get a progress bar that looks like:
#' `[working] (1 + 4) -> 5 -> 5`. The string tells you the current status of
#' the queue (e.g. working, waiting, errored, finishing) followed by (the
#' number of pending requests + pending retried requests) -> the number of
#' active requests -> the number of complete requests.
#'
#' ## Limitations
#'
#' The main limitation of `req_perform_parallel()` is that it assumes applies
Expand All @@ -16,6 +22,10 @@
#' these limitation, but it's enough work that I'm unlikely to do it unless
#' I know that people would fine it useful: so please let me know!
#'
#' Additionally, it does not respect the `max_tries` argument to `req_retry()`
#' because if you have five requests in flight and the first one gets rate
#' limited, it's likely that all the others do too.
#'
#' @inherit req_perform_sequential params return
#' @param pool `r lifecycle::badge("deprecated")`. No longer supported;
#' to control the maximum number of concurrent requests, set `max_active`.
Expand Down Expand Up @@ -89,9 +99,12 @@ req_perform_parallel <- function(
)

if (on_error == "stop") {
errors <- keep(queue$resps, is_error)
if (length(errors) > 0) {
cnd_signal(errors[[1]])
is_error <- map_lgl(queue$resps, is_error)
if (any(is_error)) {
i <- which(is_error)[[1]]
the$last_response <- queue$resps[[i]]$resp %||% queue$resps[[i]]
the$last_request <- queue$reqs[[i]]
cnd_signal(queue$resps[[i]])
}
}

Expand All @@ -103,14 +116,15 @@ RequestQueue <- R6::R6Class(
public = list(
pool = NULL,
rate_limit_deadline = 0,
token_deadline = 0,
max_active = NULL,

# Overall status for the queue
queue_status = NULL,
deadline = Inf,
n_pending = 0,
n_active = 0,
n_complete = 0,
n_retries = 0,
on_error = "stop",
progress = NULL,

Expand All @@ -122,7 +136,7 @@ RequestQueue <- R6::R6Class(
tries = integer(),

# Requests that have failed due to OAuth expiration; used to ensure that we
# don't retry repeatedly, but still allow all active requests to retry one
# don't retry repeatedly, but still allow all active requests to retry once
oauth_failed = integer(),

initialize = function(
Expand All @@ -139,8 +153,9 @@ RequestQueue <- R6::R6Class(
self$progress <- cli::cli_progress_bar(
total = n,
format = paste0(
"{self$n_pending} -> {self$n_active} -> {self$n_complete} | ",
"{cli::pb_bar} {cli::pb_percent} | ETA: {cli::pb_eta}"
"[{self$queue_status}] ",
"({self$n_pending} + {self$n_retried}) -> {self$n_active} -> {self$n_complete} | ",
"{cli::pb_bar} {cli::pb_percent}"
),
.envir = error_call
)
Expand Down Expand Up @@ -195,19 +210,34 @@ RequestQueue <- R6::R6Class(
process1 = function(deadline = Inf) {
if (self$queue_status == "done") {
FALSE
} else if (self$queue_status == "waiting") {
request_deadline <- max(self$token_deadline, self$rate_limit_deadline)
if (request_deadline <= deadline) {
# Assume we're done waiting; done_failure() will reset if needed
self$queue_status <- "working"
pool_wait_for_deadline(self$pool, request_deadline)
NULL
} else {
pool_wait_for_deadline(self$pool, deadline)
TRUE
}
} else if (self$queue_status == "working") {
if (self$n_pending == 0) {
self$queue_status <- "finishing"
} else if (self$n_active < self$max_active) {
self$submit_next(deadline)
if (!self$submit_next(deadline)) {
self$queue_status <- "waiting"
}
} else {
pool_wait_for_one(self$pool, deadline)
}
NULL
} else if (self$queue_status == "finishing") {
pool_wait_for_one(self$pool, deadline)

if (self$n_pending > 0) {
if (self$rate_limit_deadline > unix_time()) {
self$queue_status <- "waiting"
} else if (self$n_pending > 0) {
# we had to retry
self$queue_status <- "working"
} else if (self$n_active > 0) {
Expand All @@ -228,23 +258,12 @@ RequestQueue <- R6::R6Class(
submit_next = function(deadline) {
next_i <- which(self$status == "pending")[[1]]

# Need to wait for a token from the bucket AND for any rate limits.
# The ordering is important here because requests will complete
# while we wait and that might change the rate_limit_deadline
token_deadline <- throttle_deadline(self$reqs[[next_i]])
pool_wait_for_deadline(self$pool, min(token_deadline, deadline))
if (token_deadline >= deadline) {
self$token_deadline <- throttle_deadline(self$reqs[[next_i]])
if (self$token_deadline > unix_time()) {
throttle_return_token(self$reqs[[next_i]])
return()
return(FALSE)
}

while (unix_time() < self$rate_limit_deadline) {
pool_wait_for_deadline(self$pool, min(self$rate_limit_deadline, deadline))
if (self$rate_limit_deadline >= deadline) {
throttle_return_token(self$reqs[[next_i]])
return()
}
}
self$submit(next_i)
},

Expand All @@ -256,6 +275,7 @@ RequestQueue <- R6::R6Class(
self$tries[[i]] <- self$tries[[i]] + 1

self$pooled_reqs[[i]]$submit(self$pool)
TRUE
},

done_success = function(i, resp) {
Expand All @@ -280,11 +300,11 @@ RequestQueue <- R6::R6Class(
tries <- self$tries[[i]]

if (retry_is_transient(req, resp) && self$can_retry(i)) {
# Do we need to somehow expose this to the user? Because if they're
# hitting it a bunch, it's a sign that the throttling is too low
delay <- retry_after(req, resp, tries)
self$rate_limit_deadline <- unix_time() + delay

self$set_status(i, "pending")
self$n_retries <- self$n_retries + 1
} else if (resp_is_invalid_oauth_token(req, resp) && self$can_reauth(i)) {
# This isn't quite right, because if there are (e.g.) four requests in
# the queue and the first one fails, we'll clear the cache for all four,
Expand All @@ -293,6 +313,7 @@ RequestQueue <- R6::R6Class(
self$oauth_failed <- c(self$oauth_failed, i)
req_auth_clear_cache(self$reqs[[i]])
self$set_status(i, "pending")
self$n_retries <- self$n_retries + 1
} else {
self$set_status(i, "complete")
if (self$on_error != "continue") {
Expand Down Expand Up @@ -322,7 +343,8 @@ RequestQueue <- R6::R6Class(
},

can_retry = function(i) {
self$tries[[i]] < retry_max_tries(self$reqs[[i]])
TRUE
# self$tries[[i]] < retry_max_tries(self$reqs[[i]])
},
can_reauth = function(i) {
!i %in% self$oauth_failed
Expand All @@ -347,6 +369,7 @@ pool_wait_for_deadline <- function(pool, deadline) {
# pool might finish early; we still want to wait out the full time
remaining <- timeout - (unix_time() - now)
if (remaining > 0) {
# cat("Sleeping for ", remaining, " seconds\n", sep = "")
Sys.sleep(remaining)
}

Expand Down
10 changes: 10 additions & 0 deletions man/req_perform_parallel.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 8 additions & 18 deletions tests/testthat/test-req-perform-parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,6 @@ test_that("req_perform_parallel respects http_error() body message", {
expect_snapshot(req_perform_parallel(reqs), error = TRUE)
})

test_that("respects max retries", {
req <- local_app_request(function(req, res) {
i <- res$app$locals$i %||% 1
res$
set_status(429)$
set_header("retry-after", 0)$
send_json(list(status = "waiting"), auto_unbox = TRUE)
})
req <- req_retry(req, max_tries = 3)
queue <- RequestQueue$new(list(req), progress = FALSE)

queue$process()
expect_s3_class(queue$resps[[1]], "httr2_http_429")
expect_equal(queue$tries[1], 3)
})

test_that("requests are throttled", {
withr::defer(throttle_reset())

Expand Down Expand Up @@ -252,7 +236,7 @@ test_that("can retry a transient error", {

# Now we process the request and capture the retry
expect_null(queue$process1())
expect_equal(queue$queue_status, "working")
expect_equal(queue$queue_status, "waiting")
expect_equal(queue$rate_limit_deadline, mock_time + 2)
expect_equal(queue$n_pending, 1)
expect_s3_class(queue$resps[[1]], "httr2_http_429")
Expand All @@ -263,6 +247,10 @@ test_that("can retry a transient error", {
expect_equal(queue$queue_status, "working")
expect_equal(mock_time, 3)

# Now we go back to working
expect_null(queue$process1())
expect_equal(queue$queue_status, "working")

# Then resume finishing again
expect_null(queue$process1())
expect_equal(queue$queue_status, "finishing")
Expand Down Expand Up @@ -290,7 +278,9 @@ test_that("throttling is limited by deadline", {

# Check time only advances by one second, and token is returned to bucket
local_mocked_bindings(throttle_deadline = function(...) mock_time + 2)
queue$submit_next(1)
queue$process1(1)
expect_equal(queue$queue_status, "waiting")
queue$process1(1)
expect_equal(mock_time, 1)
expect_equal(the$throttle[["test"]]$tokens, 1)

Expand Down

0 comments on commit 0a2caa0

Please sign in to comment.