Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More queue improvements #699

Merged
merged 4 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 37 additions & 46 deletions R/req-perform-parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#'
#' While running, you'll get a progress bar that looks like:
#' `[working] (1 + 4) -> 5 -> 5`. The string tells you the current status of
#' the queue (e.g. working, waiting, errored, finishing) followed by (the
#' the queue (e.g. working, waiting, errored) followed by (the
#' number of pending requests + pending retried requests) -> the number of
#' active requests -> the number of complete requests.
#'
Expand All @@ -24,7 +24,8 @@
#'
#' Additionally, it does not respect the `max_tries` argument to `req_retry()`
#' because if you have five requests in flight and the first one gets rate
#' limited, it's likely that all the others do too.
#' limited, it's likely that all the others do too. This also means that
#' the circuit breaker is never triggered.
#'
#' @inherit req_perform_sequential params return
#' @param pool `r lifecycle::badge("deprecated")`. No longer supported;
Expand Down Expand Up @@ -154,7 +155,7 @@
total = n,
format = paste0(
"[{self$queue_status}] ",
"({self$n_pending} + {self$n_retried}) -> {self$n_active} -> {self$n_complete} | ",
"({self$n_pending} + {self$n_retries}) -> {self$n_active} -> {self$n_complete} | ",
"{cli::pb_bar} {cli::pb_percent}"
),
.envir = error_call
Expand Down Expand Up @@ -209,67 +210,58 @@
# Exposed for testing, so we can manaully work through one step at a time
process1 = function(deadline = Inf) {
if (self$queue_status == "done") {
FALSE
} else if (self$queue_status == "waiting") {
return(FALSE)
}

if (!is.null(self$progress)) {
cli::cli_progress_update(id = self$progress, set = self$n_complete)
}

if (self$queue_status == "waiting") {
request_deadline <- max(self$token_deadline, self$rate_limit_deadline)
if (request_deadline <= deadline) {
# Assume we're done waiting; done_failure() will reset if needed
if (request_deadline <= unix_time()) {

Check warning on line 222 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L222

Added line #L222 was not covered by tests
self$queue_status <- "working"
pool_wait_for_deadline(self$pool, request_deadline)
NULL
return()

Check warning on line 224 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L224

Added line #L224 was not covered by tests
}

if (self$rate_limit_deadline > self$token_deadline) {
waiting <- "for rate limit"

Check warning on line 228 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L227-L228

Added lines #L227 - L228 were not covered by tests
} else {
pool_wait_for_deadline(self$pool, deadline)
TRUE
waiting <- "for throttling"

Check warning on line 230 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L230

Added line #L230 was not covered by tests
}
pool_wait_for_deadline(self$pool, min(request_deadline, deadline), waiting)
NULL

Check warning on line 233 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L232-L233

Added lines #L232 - L233 were not covered by tests
} else if (self$queue_status == "working") {
if (self$n_pending == 0) {
self$queue_status <- "finishing"
} else if (self$n_active < self$max_active) {
if (self$n_pending == 0 && self$n_active == 0) {
self$queue_status <- "done"
} else if (self$n_pending > 0 && self$n_active <= self$max_active) {
if (!self$submit_next(deadline)) {
self$queue_status <- "waiting"
}
} else {
pool_wait_for_one(self$pool, deadline)
}
NULL
} else if (self$queue_status == "finishing") {
pool_wait_for_one(self$pool, deadline)

if (self$rate_limit_deadline > unix_time()) {
self$queue_status <- "waiting"
} else if (self$n_pending > 0) {
# we had to retry
self$queue_status <- "working"
} else if (self$n_active > 0) {
# keep going
self$queue_status <- "finishing"
} else if (self$queue_status == "errored") {

Check warning on line 245 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L245

Added line #L245 was not covered by tests
# Finish out any active requests but don't add any more
if (self$n_active > 0) {
pool_wait_for_one(self$pool, deadline)

Check warning on line 248 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L247-L248

Added lines #L247 - L248 were not covered by tests
} else {
self$queue_status <- "done"
}
NULL
} else if (self$queue_status == "errored") {
# Finish out any active request but don't add any more
pool_wait_for_one(self$pool, deadline)
self$queue_status <- if (self$n_active > 0) "errored" else "done"
NULL
}
},

submit_next = function(deadline) {
next_i <- which(self$status == "pending")[[1]]
i <- which(self$status == "pending")[[1]]

self$token_deadline <- throttle_deadline(self$reqs[[next_i]])
self$token_deadline <- throttle_deadline(self$reqs[[i]])
if (self$token_deadline > unix_time()) {
throttle_return_token(self$reqs[[next_i]])
throttle_return_token(self$reqs[[i]])

Check warning on line 261 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L261

Added line #L261 was not covered by tests
return(FALSE)
}

self$submit(next_i)
},

submit = function(i) {
retry_check_breaker(self$reqs[[i]], self$tries[[i]], error_call = error_call)

self$set_status(i, "active")
self$resps[i] <- list(NULL)
self$tries[[i]] <- self$tries[[i]] + 1
Expand Down Expand Up @@ -305,6 +297,7 @@

self$set_status(i, "pending")
self$n_retries <- self$n_retries + 1
self$queue_status <- "waiting"

Check warning on line 300 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L300

Added line #L300 was not covered by tests
} else if (resp_is_invalid_oauth_token(req, resp) && self$can_reauth(i)) {
# This isn't quite right, because if there are (e.g.) four requests in
# the queue and the first one fails, we'll clear the cache for all four,
Expand Down Expand Up @@ -336,10 +329,6 @@
)

self$status[[i]] <- status

if (!is.null(self$progress)) {
cli::cli_progress_update(id = self$progress, set = self$n_complete)
}
},

can_retry = function(i) {
Expand All @@ -357,7 +346,7 @@
pool_wait(pool, poll = TRUE, timeout = timeout)
}

pool_wait_for_deadline <- function(pool, deadline) {
pool_wait_for_deadline <- function(pool, deadline, waiting_for) {
now <- unix_time()
timeout <- deadline - now
if (timeout <= 0) {
Expand All @@ -368,8 +357,10 @@

# pool might finish early; we still want to wait out the full time
remaining <- timeout - (unix_time() - now)
if (remaining > 0) {
# cat("Sleeping for ", remaining, " seconds\n", sep = "")
if (remaining > 2) {
# Use a progress bar
sys_sleep(remaining, waiting_for)

Check warning on line 362 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L362

Added line #L362 was not covered by tests
} else if (remaining > 0) {
Sys.sleep(remaining)
}

Expand Down
5 changes: 3 additions & 2 deletions man/req_perform_parallel.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 26 additions & 16 deletions tests/testthat/test-req-perform-parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ test_that("both curl and HTTP errors become errors on continue", {
test_that("errors can cancel outstanding requests", {
reqs <- list2(
request_test("/status/:status", status = 404),
request_test("/delay/:secs", secs = 2),
request_test("/delay/:secs", secs = 1),
request_test("/delay/:secs", secs = 1),
)
out <- req_perform_parallel(reqs, on_error = "return", max_active = 1)
expect_s3_class(out[[1]], "httr2_http_404")
expect_null(out[[2]])
# second request might succeed or fail depend on the timing, but the
# third request should definitely fail
expect_null(out[[3]])
})

test_that("req_perform_parallel resspects http_error() error override", {
Expand Down Expand Up @@ -224,43 +227,50 @@ test_that("can retry a transient error", {

queue <- RequestQueue$new(list(req), progress = FALSE)

# Start processing
# submit the request
expect_null(queue$process1())
expect_equal(queue$queue_status, "working")
expect_equal(queue$n_active, 1)
expect_equal(queue$n_pending, 0)
expect_equal(queue$status[[1]], "active")

# No pending, so switch to finishing
expect_null(queue$process1())
expect_equal(queue$queue_status, "finishing")

# Now we process the request and capture the retry
# process the response and capture the retry
expect_null(queue$process1())
expect_equal(queue$queue_status, "waiting")
expect_equal(queue$rate_limit_deadline, mock_time + 2)
expect_equal(queue$n_pending, 1)
expect_s3_class(queue$resps[[1]], "httr2_http_429")
expect_equal(resp_body_json(queue$resps[[1]]$resp), list(status = "waiting"))

# Now we "wait" 2 seconds
# Starting waiting
expect_null(queue$process1())
expect_equal(queue$queue_status, "working")
expect_equal(queue$queue_status, "waiting")
expect_equal(mock_time, 3)

# Now we go back to working
# Finishing waiting
expect_null(queue$process1())
expect_equal(queue$queue_status, "working")
expect_equal(queue$n_active, 0)
expect_equal(queue$n_pending, 1)

# Then resume finishing again
# Resubmit
expect_null(queue$process1())
expect_equal(queue$queue_status, "finishing")
expect_equal(queue$queue_status, "working")
expect_equal(queue$n_active, 1)
expect_equal(queue$n_pending, 0)

# And we're finally done
# Process the response
expect_null(queue$process1())
expect_equal(queue$queue_status, "working")
expect_equal(queue$n_active, 0)
expect_equal(queue$n_pending, 0)
expect_s3_class(queue$resps[[1]], "httr2_response")
expect_equal(resp_body_json(queue$resps[[1]]), list(status = "done"))

# So we're finally done
expect_null(queue$process1())
expect_equal(queue$queue_status, "done")
expect_false(queue$process1())

expect_equal(resp_body_json(queue$resps[[1]]), list(status = "done"))
})

test_that("throttling is limited by deadline", {
Expand Down
Loading