Skip to content

Commit

Permalink
Implement resp_stream_aws() (#571)
Browse files Browse the repository at this point in the history
  • Loading branch information
hadley authored Oct 23, 2024
1 parent 22fd5bd commit 44b5067
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 14 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export(resp_raw)
export(resp_retry_after)
export(resp_status)
export(resp_status_desc)
export(resp_stream_aws)
export(resp_stream_lines)
export(resp_stream_raw)
export(resp_stream_sse)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# httr2 1.0.5

* New `resp_stream_aws()` to retrieve AWS's special streaming format. With thanks to <https://github.com/lifion/lifion-aws-event-stream/> for a simple reference implementation.
* New `req_auth_aws_v4()` signs request using AWS's special format (#562, #566).
* `req_perform_parallel()` and `req_perform_promise()` now correctly set up the method and body (#549).

Expand Down
135 changes: 135 additions & 0 deletions R/resp-stream-aws.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#' @export
#' @rdname resp_stream_raw
#' @order 2
resp_stream_aws <- function(resp, max_size = Inf) {
event_bytes <- resp_boundary_pushback(
resp = resp,
max_size = max_size,
boundary_func = find_aws_event_boundary,
include_trailer = FALSE
)

if (!is.null(event_bytes)) {
parse_aws_event(event_bytes)
} else {
return(NULL)
}
}

find_aws_event_boundary <- function(buffer) {
# No valid AWS event message is less than 16 bytes
if (length(buffer) < 16) {
return(NULL)
}

# Read first 4 bytes as a big endian number
event_size <- parse_int(buffer[1:4])
if (event_size > length(buffer)) {
return(NULL)
}

event_size + 1
}

# Implementation from https://github.com/lifion/lifion-aws-event-stream/blob/develop/lib/index.js
# This is technically buggy because it takes the header_length as a lower bound
# but this shouldn't cause problems in practive
parse_aws_event <- function(bytes) {
i <- 1
read_bytes <- function(n) {
if (n == 0) {
return(raw())
}
out <- bytes[i:(i + n - 1)]
i <<- i + n
out
}

# prelude
total_length <- parse_int(read_bytes(4))
if (total_length != length(bytes)) {
cli::cli_abort("AWS event metadata doesn't match supplied bytes", .internal = TRUE)
}

header_length <- parse_int(read_bytes(4))
prelude_crc <- read_bytes(4)
# TODO: use this value to check prelude lengths

# headers
headers <- list()
while(i <= 12 + header_length) {
name_length <- as.integer(read_bytes(1))
name <- rawToChar(read_bytes(name_length))
type <- as.integer(read_bytes(1))

delayedAssign("length", parse_int(read_bytes(2)))
value <- switch(type_enum(type),
'TRUE' = TRUE,
'FALSE' = FALSE,
BYTE = parse_int(read_bytes(1)),
SHORT = parse_int(read_bytes(2)),
INTEGER = parse_int(read_bytes(4)),
LONG = parse_int64(read_bytes(8)),
BYTE_ARRAY = read_bytes(length),
CHARACTER = rawToChar(read_bytes(length)),
TIMESTAMP = parse_int64(read_bytes(8)),
UUID = raw_to_hex(read_bytes(16)),
)
headers[[name]] <- value
}

# body
body_raw <- read_bytes(total_length - i - 4 + 1)
crc_raw <- read_bytes(4)
# TODO: use this value to check data

body <- rawToChar(body_raw)
if (identical(headers$`:content-type`, "application/json")) {
body <- jsonlite::parse_json(body)
}

list(headers = headers, body = body)
}


# Helpers ----------------------------------------------------------------

parse_int <- function(x) {
sum(as.integer(x) * 256 ^ rev(seq_along(x) - 1))
}

parse_int64 <- function(x) {
y <- readBin(x, "double", n = 1, size = length(x), endian = "big")
class(y) <- "integer64"
y
}

type_enum <- function(value) {
if (value < 0 || value > 10) {
cli::cli_abort("Unsupported type {value}.", .internal = TRUE)
}

switch(value + 1,
"TRUE",
"FALSE",
"BYTE",
"SHORT",
"INTEGER",
"LONG",
"BYTE_ARRAY",
"CHARACTER",
"TIMESTAMP",
"UUID",
)
}

hex_to_raw <- function(x) {
x <- gsub("(\\s|\n)+", "", x)

pairs <- substring(x, seq(1, nchar(x), by = 2), seq(2, nchar(x), by = 2))
as.raw(strtoi(pairs, 16L))
}

raw_to_hex <- function(x) {
paste(as.character(x), collapse = "")
}
23 changes: 16 additions & 7 deletions R/resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@
#'
#' * `resp_stream_raw()` retrieves bytes (`raw` vectors).
#' * `resp_stream_lines()` retrieves lines of text (`character` vectors).
#' * `resp_stream_sse()` retrieves [server-sent
#' events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
#' from the stream. It currently only works with text mode connections so when calling
#' `req_perform_connection()` you must use `mode = "text"`.
#' * `resp_stream_sse()` retrieves a single [server-sent
#' event](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events).
#' * `resp_stream_aws()` retrieves a single event from an AWS stream
#' (i.e. mime type `application/vnd.amazon.eventstream``).
#'
#' @returns
#' * `resp_stream_raw()`: a raw vector.
#' * `resp_stream_lines()`: a character vector.
#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`; or
#' `NULL`, signifying that the end of the stream has been reached or--if in
#' nonblocking mode--that no event is currently available.
#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`
#' * `resp_stream_aws()`: a list with components `headers` and `body`.
#' `body` will be automatically parsed if the event contents a `:content-type`
#' header with `application/json`.
#'
#' `resp_stream_sse()` and `resp_stream_aws()` will return `NULL` to signal that
#' the end of the stream has been reached or, if in nonblocking mode, that
#' no event is currently available.
#' @export
#' @param resp,con A streaming [response] created by [req_perform_connection()].
#' @param kb How many kilobytes (1024 bytes) of data to read.
#' @order 1
resp_stream_raw <- function(resp, kb = 32) {
check_streaming_response(resp)
conn <- resp$body
Expand All @@ -31,6 +37,7 @@ resp_stream_raw <- function(resp, kb = 32) {
#' @param lines The maximum number of lines to return at once.
#' @param warn Like [readLines()]: warn if the connection ends without a final
#' EOL.
#' @order 1
resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
check_streaming_response(resp)
check_number_whole(lines, min = 0, allow_infinite = TRUE)
Expand Down Expand Up @@ -63,6 +70,7 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
#' bytes has been exceeded without a line/event boundary, an error is thrown.
#' @export
#' @rdname resp_stream_raw
#' @order 1
resp_stream_sse <- function(resp, max_size = Inf) {
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
if (!is.null(event_bytes)) {
Expand All @@ -75,6 +83,7 @@ resp_stream_sse <- function(resp, max_size = Inf) {
#' @export
#' @param ... Not used; included for compatibility with generic.
#' @rdname resp_stream_raw
#' @order 3
close.httr2_response <- function(con, ...) {
check_response(con)

Expand Down
22 changes: 15 additions & 7 deletions man/resp_stream_raw.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions tests/testthat/_snaps/resp-stream-aws.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# unknown header triggers error

Code
parse_aws_event(bytes)
Condition
Error in `type_enum()`:
! Unsupported type 255.
i This is an internal error that was detected in the httr2 package.
Please report it at <https://github.com/r-lib/httr2/issues> with a reprex (<https://tidyverse.org/help/>) and the full backtrace.

75 changes: 75 additions & 0 deletions tests/testthat/test-resp-stream-aws.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Tests copied from
# https://github.com/lifion/lifion-aws-event-stream/blob/develop/lib/index.test.js
# https://github.com/lifion/lifion-aws-event-stream/blob/develop/lib/index.test.json

test_that("can parse empty object", {
bytes <- hex_to_raw("000000100000000005c248eb7d98c8ff")
expect_equal(
parse_aws_event(bytes),
list(headers = list(), body = "")
)
})

test_that("can return various types of header", {
bytes <- hex_to_raw("0000001500000001ba25f70d03666f6f013aa3e0d6")
expect_equal(parse_aws_event(bytes)$headers, list(foo = FALSE))

bytes <- hex_to_raw("0000001500000001ba25f70d03666f6f004da4d040")
expect_equal(parse_aws_event(bytes)$headers, list(foo = TRUE))

# byte
bytes <- hex_to_raw("0000001600000001fd858ddd03666f6f02ffa44bfd93")
expect_equal(parse_aws_event(bytes)$headers, list(foo = 255))

# short
bytes <- hex_to_raw("0000001700000001c0e5a46d03666f6f03fffff3b59291")
expect_equal(parse_aws_event(bytes)$headers, list(foo = 65535))

# integer
bytes <- hex_to_raw("00000019000000017fd51a0c03666f6f04ffffffff853b65dd")
expect_equal(parse_aws_event(bytes)$headers, list(foo = 4294967295))

# long
bytes <- hex_to_raw("0000001d000000018a55bccc03666f6f050000ffffffffffff6b03c255")
expected <- structure(1.390671161567e-309, class = "integer64")
expect_equal(parse_aws_event(bytes)$headers, list(foo = expected))

# byte array
bytes <- hex_to_raw("0000001c00000001b735957c03666f6f0600050102030405cdda4038")
expect_equal(parse_aws_event(bytes)$headers, list(foo = as.raw(1:5)))

# character
bytes <- hex_to_raw("0000001a00000001387560dc03666f6f0700036261725bb3cecf")
expect_equal(parse_aws_event(bytes)$headers, list(foo = "bar"))

# UUID
bytes <- hex_to_raw("00000025000000011b044f8b03666f6f093bfdac5cfe6c402983bfc1de7819f5316056148a")
expect_equal(parse_aws_event(
bytes
)$headers, list(foo = "3bfdac5cfe6c402983bfc1de7819f531"))

})

test_that("unknown header triggers error", {
bytes <- hex_to_raw("0000001500000001ba25f70d03666f6fff60a63fcd")
expect_snapshot(parse_aws_event(bytes), error = TRUE)
})

test_that("json content type automatically parsed", {
bytes <- hex_to_raw("
000001c20000005bc1123f0b0b3a6576656e742d74797065070015537562736372696265546f
53686172644576656e740d3a636f6e74656e742d747970650700106170706c69636174696f6e
2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b22436f6e74696e756174
696f6e53657175656e63654e756d626572223a22343935383836333037393634323435313235
3936363136333437353239313133373435393934373336323937343734373039373832353330
222c224d696c6c6973426568696e644c6174657374223a302c225265636f726473223a5b7b22
417070726f78696d6174654172726976616c54696d657374616d70223a312e35333831363032
313936333645392c2244617461223a225632567a62475635222c22456e6372797074696f6e54
797065223a6e756c6c2c22506172746974696f6e4b6579223a2231306463633930322d633839
632d343036372d623433362d303566383863306662356566222c2253657175656e63654e756d
626572223a223439353838363330373936343234353132353936363136333437353239313133
373435393934373336323937343734373039373832353330227d5d7dd84c02f3
")
parsed <- parse_aws_event(bytes)
expect_type(parsed$body, "list")
})

0 comments on commit 44b5067

Please sign in to comment.