diff --git a/R/resp-stream-aws.R b/R/resp-stream-aws.R index 2163e65a..47ca5473 100644 --- a/R/resp-stream-aws.R +++ b/R/resp-stream-aws.R @@ -26,19 +26,24 @@ resp_stream_aws <- function(resp, max_size = Inf) { event } -find_aws_event_boundary <- function(buffer) { +find_aws_event_boundary <- function(rb) { # No valid AWS event message is less than 16 bytes - if (length(buffer) < 16) { + if (rb$size() < 16) { return(NULL) } - # Read first 4 bytes as a big endian number - event_size <- parse_int(buffer[1:4]) - if (event_size > length(buffer)) { + # Read first 4 bytes + event_size_raw <- raw(4) + for (i in 1:4) { + event_size_raw[i] <- rb$peek(i) + } + + event_size <- parse_int(event_size_raw) + if (event_size > rb$size()) { return(NULL) } - event_size + 1 + event_size } # Implementation from https://github.com/lifion/lifion-aws-event-stream/blob/develop/lib/index.js diff --git a/R/resp-stream.R b/R/resp-stream.R index 71606038..38746d6f 100644 --- a/R/resp-stream.R +++ b/R/resp-stream.R @@ -167,7 +167,6 @@ resp_stream_oneline <- function(resp, max_size, warn, encoding) { if (is.null(line_bytes)) { return(character()) } - eat_next_lf <- resp$cache$resp_stream_oneline_eat_next_lf resp$cache$resp_stream_oneline_eat_next_lf <- FALSE @@ -197,77 +196,66 @@ resp_stream_oneline <- function(resp, max_size, warn, encoding) { } } -find_line_boundary <- function(buffer) { - if (length(buffer) == 0) { +find_line_boundary <- function(rb) { + if (rb$is_empty()) { return(NULL) } - # Look left 1 byte - right1 <- c(utils::tail(buffer, -1), 0x00) + cur <- rb$peek(1) + for (i in seq_len(rb$size() - 1)) { + nxt <- rb$peek(i + 1) - crlf <- buffer == 0x0D & right1 == 0x0A - cr <- buffer == 0x0D - lf <- buffer == 0x0A + # Check for CRLF sequence + if (is_crlf(cur, nxt)) { + return(i + 1) + } + # Check for single CR or LF + if (is_cr(cur) || is_lf(cur)) { + return(i) + } - all <- which(crlf | cr | lf) - if (length(all) == 0) { - return(NULL) + cur <- nxt } - first <- all[[1]] - if (crlf[first]) { - return(first + 2) - } else { - return(first + 1) + # Check the last byte + if (is_cr(cur) || is_lf(cur)) { + return(rb$size()) } + + NULL } # Function to find the first double line ending in a buffer, or NULL if no # double line ending is found -# -# Example: -# find_event_boundary(charToRaw("data: 1\n\nid: 12345")) -# Returns: -# list( -# matched = charToRaw("data: 1\n\n"), -# remaining = charToRaw("id: 12345") -# ) -find_event_boundary <- function(buffer) { - if (length(buffer) < 2) { +find_event_boundary <- function(rb) { + if (rb$size() < 2) { return(NULL) } - # leftX means look behind by X bytes. For example, left1[2] equals buffer[1]. - # Any attempt to read past the beginning of the buffer results in 0x00. - left1 <- c(0x00, utils::head(buffer, -1)) - left2 <- c(0x00, utils::head(left1, -1)) - left3 <- c(0x00, utils::head(left2, -1)) + cur <- rb$peek(1) + for (i in 1:(rb$size() - 1)) { + nxt <- rb$peek(i + 1) - boundary_end <- which( - (left1 == 0x0A & buffer == 0x0A) | # \n\n - (left1 == 0x0D & buffer == 0x0D) | # \r\r - (left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n - ) + # Check for \n\n or \r\r + if ((is_lf(cur) && is_lf(nxt)) || (is_cr(cur) && is_cr(nxt))) { + return(i + 1) + } - if (length(boundary_end) == 0) { - return(NULL) # No event boundary found + # Check for \r\n\r\n sequence + if (i <= rb$size() - 3) { + byte3 <- rb$peek(i + 2) + byte4 <- rb$peek(i + 3) + if (is_crlf(cur, nxt) && is_crlf(byte3, byte4)) { + return(i + 3) + } + } + + cur <- nxt } - boundary_end <- boundary_end[1] # Take the first occurrence - split_at <- boundary_end + 1 # Split at one after the boundary - split_at + NULL } -# Splits a buffer into the part before `split_at`, and the part starting at -# `split_at`. It's possible for either of the returned parts to be zero-length -# (i.e. if `split_at` is 1 or length(buffer)+1). -split_buffer <- function(buffer, split_at) { - # Return a list with the event data and the remaining buffer - list( - matched = slice(buffer, end = split_at), - remaining = slice(buffer, start = split_at) - ) -} # @param max_size Maximum number of bytes to look for a boundary before throwing an error # @param boundary_func A function that takes a raw vector and returns NULL if no @@ -285,11 +273,8 @@ resp_boundary_pushback <- function( check_streaming_response(resp) check_number_whole(max_size, min = 1, allow_infinite = TRUE) - chunk_size <- min(max_size + 1, 1024) - - # Grab data left over from last resp_stream_sse() call (if any) - buffer <- resp$cache$push_back %||% raw() - resp$cache$push_back <- raw() + chunk_size <- if (is.infinite(max_size)) 1024 else max_size + 1 + buffer <- env_cache(resp$cache, "buffer", RingBuffer$new(chunk_size)) if (resp_stream_show_buffer(resp)) { log_stream(cli::rule("Buffer"), prefix = "* ") @@ -309,23 +294,21 @@ resp_boundary_pushback <- function( # Read chunks until we find an event or reach the end of input repeat { # Try to find an event boundary using the data we have - print_buffer(buffer, "Buffer to parse") - split_at <- boundary_func(buffer) - - if (!is.null(split_at)) { - result <- split_buffer(buffer, split_at) - # We found a complete event - print_buffer(result$matched, "Matched data") - print_buffer(result$remaining, "Remaining buffer") - resp$cache$push_back <- result$remaining - return(result$matched) + print_buffer(buffer$peek_all(), "Buffer to parse") + boundary_pos <- boundary_func(buffer) + + if (!is.null(boundary_pos)) { + matched <- buffer$pop(boundary_pos) + + print_buffer(matched, "Matched data") + print_buffer(buffer$peek_all(), "Remaining buffer") + return(matched) } - if (length(buffer) > max_size) { + if (buffer$size() > max_size) { # Keep the buffer in place, so that if the user tries resp_stream_sse # again, they'll get the same error rather than reading the stream # having missed a bunch of bytes. - resp$cache$push_back <- buffer cli::cli_abort( "Streaming read exceeded size limit of {max_size}", class = "httr2_streaming_error" @@ -333,23 +316,21 @@ resp_boundary_pushback <- function( } # We didn't have enough data. Attempt to read more - chunk <- readBin( - resp$body, - raw(), - # Don't let us exceed the max size by more than one byte; we do allow the - # one extra byte so we know to error. - n = min(chunk_size, max_size - length(buffer) + 1) - ) + # Don't let us exceed the max size by more than one byte; we do allow the + # one extra byte so we know to error. + next_size <- min(chunk_size, max_size - buffer$size() + 1) + chunk <- readBin(resp$body, raw(), n = next_size) + buffer$push(chunk) print_buffer(chunk, "Received chunk") if (length(chunk) == 0) { if (!isIncomplete(resp$body)) { # We've truly reached the end of the connection; no more data is coming - if (length(buffer) == 0) { + if (buffer$is_empty()) { return(NULL) } else { if (include_trailer) { - return(buffer) + return(buffer$pop()) } else { cli::cli_warn( "Premature end of input; ignoring final partial chunk" @@ -359,16 +340,14 @@ resp_boundary_pushback <- function( } } else { # More data might come later; store the buffer and return NULL - print_buffer(buffer, "Storing incomplete buffer") - resp$cache$push_back <- buffer + print_buffer(buffer$peek_all(), "Storing incomplete buffer") return(NULL) } } # More data was received; combine it with existing buffer and continue the # loop to try parsing again - buffer <- c(buffer, chunk) - print_buffer(buffer, "Combined buffer") + print_buffer(buffer$peek_all(), "Combined buffer") } } diff --git a/R/ring-buffer.r b/R/ring-buffer.r new file mode 100644 index 00000000..cba7fac3 --- /dev/null +++ b/R/ring-buffer.r @@ -0,0 +1,198 @@ +#' RingBuffer Class +#' +#' An implementation of a ring buffer using a raw vector as the underlying storage. +#' The buffer has a user-specified initial size and can grow when full. +#' +#' @examples +#' rb <- RingBuffer$new(10) +#' rb$push(as.raw(1:5)) +#' data <- rb$pop(3) +#' @noRd +RingBuffer <- R6::R6Class( + "RingBuffer", + private = list( + .buffer = raw(), + + # Next position to write to + .head = 0, + # Next position to read from + .tail = 0, + # Elements currently in the buffer + .count = 0, + # Current capacity of the buffer + .capacity = 0, + + .resize = function(required_size = NULL) { + # If required_size is provided, ensure we grow to at least that size + new_capacity <- max(1, required_size, private$.capacity * 2) + new_buffer <- raw(new_capacity) + + # Copy data from old buffer to new buffer, starting from tail + if (private$.count > 0) { + if (private$.tail < private$.head) { + # Simple case: tail to head is contiguous + idx <- seq(private$.tail, length.out = private$.count) + new_buffer[1:private$.count] <- private$.buffer[idx] + } else { + # Wrapped case: tail to end, then start to head + n_end <- private$.capacity - private$.tail + 1 + idx1 <- seq(private$.tail, private$.capacity) + new_buffer[1:n_end] <- private$.buffer[idx1] + + if (private$.head > 1) { + idx2 <- seq_len(private$.head - 1) + new_buffer[(n_end + 1):private$.count] <- private$.buffer[idx2] + } + } + } + + # Update buffer and pointers + private$.buffer <- new_buffer + private$.tail <- 1 + private$.head <- private$.count + 1 + if (private$.count == 0) private$.head <- 1 + private$.capacity <- new_capacity + } + ), + + public = list( + initialize = function(initial_capacity = 32 * 1024) { + check_number_whole(initial_capacity, min = 1) + + private$.capacity <- as.integer(initial_capacity) + private$.buffer <- raw(private$.capacity) + private$.head <- 1 + private$.tail <- 1 + private$.count <- 0 + }, + + push = function(data) { + data_length <- length(data) + + # Check if we need to resize + if (data_length + private$.count > private$.capacity) { + private$.resize(data_length + private$.count) + } + + # Add data to buffer using vectorized operations where possible + if (private$.head + data_length - 1 <= private$.capacity) { + # Contiguous space available + idx <- seq(private$.head, length.out = data_length) + private$.buffer[idx] <- data + private$.head <- (private$.head + data_length - 1) %% + private$.capacity + + 1 + } else { + # Need to wrap around + space_to_end <- private$.capacity - private$.head + 1 + + # First part - fill to the end + idx1 <- seq(from = private$.head, to = private$.capacity) + private$.buffer[idx1] <- data[1:space_to_end] + + # Second part - start from beginning + remaining <- data_length - space_to_end + if (remaining > 0) { + idx2 <- seq(from = 1, length.out = remaining) + private$.buffer[idx2] <- data[(space_to_end + 1):data_length] + private$.head <- remaining + 1 + } else { + private$.head <- 1 + } + } + + private$.count <- private$.count + data_length + invisible(self) + }, + + pop = function(n = self$size()) { + n <- as.integer(n) + if (n <= 0) return(raw(0)) + + # Limit to available items + n <- min(n, private$.count) + if (n == 0) return(raw(0)) + + # Create result vector + result <- raw(n) + + # Extract data using vectorized operations + if (private$.tail + n - 1 <= private$.capacity) { + # Contiguous read + idx <- seq(from = private$.tail, length.out = n) + result <- private$.buffer[idx] + private$.tail <- (private$.tail + n - 1) %% private$.capacity + 1 + } else { + # Wrapped read + elements_to_end <- private$.capacity - private$.tail + 1 + + # First part - read to the end + idx1 <- seq(from = private$.tail, to = private$.capacity) + result[1:elements_to_end] <- private$.buffer[idx1] + + # Second part - read from beginning + remaining <- n - elements_to_end + if (remaining > 0) { + idx2 <- seq(from = 1, length.out = remaining) + result[(elements_to_end + 1):n] <- private$.buffer[idx2] + private$.tail <- remaining + 1 + } else { + private$.tail <- 1 + } + } + + private$.count <- private$.count - n + result + }, + + # Peek at a single byte + peek = function(i = 1) { + if (i > private$.count) { + return(NULL) # Not enough data + } + + pos <- (private$.tail + i - 2) %% private$.capacity + 1 + private$.buffer[pos] + }, + + peek_all = function() { + if (private$.count == 0) return(raw(0)) + + result <- raw(private$.count) + + if (private$.tail < private$.head) { + # Simple case: data is contiguous + idx <- seq(from = private$.tail, length.out = private$.count) + result <- private$.buffer[idx] + } else { + # Wrapped case: need to read in two parts + elements_to_end <- private$.capacity - private$.tail + 1 + + # First part - read to the end + idx1 <- seq(from = private$.tail, to = private$.capacity) + result[1:elements_to_end] <- private$.buffer[idx1] + + # Second part - read from beginning + remaining <- private$.count - elements_to_end + if (remaining > 0) { + idx2 <- seq(from = 1, length.out = remaining) + result[(elements_to_end + 1):private$.count] <- private$.buffer[idx2] + } + } + + result + }, + + is_empty = function() { + private$.count == 0 + }, + + size = function() { + private$.count + }, + + capacity = function() { + private$.capacity + } + ) +) diff --git a/R/utils.R b/R/utils.R index c589d7df..5b557b71 100644 --- a/R/utils.R +++ b/R/utils.R @@ -345,3 +345,7 @@ log_stream <- function(..., prefix = "<< ") { paste_c <- function(..., collapse = "") { paste0(c(...), collapse = collapse) } + +is_cr <- function(byte) byte == 0x0D +is_lf <- function(byte) byte == 0x0A +is_crlf <- function(byte1, byte2) is_cr(byte1) && is_lf(byte2) diff --git a/tests/testthat/_snaps/resp-stream.md b/tests/testthat/_snaps/resp-stream.md index 51767f03..b4061184 100644 --- a/tests/testthat/_snaps/resp-stream.md +++ b/tests/testthat/_snaps/resp-stream.md @@ -20,7 +20,6 @@ stream_all(req, resp_stream_lines, 1) Output << line 1 - << line 2 Code stream_all(req, resp_stream_raw, 5 / 1024) Output @@ -46,11 +45,6 @@ * Matched data: 6c 69 6e 65 20 31 0a * Remaining buffer: 6c 69 6e 65 20 32 0a << line 1 - * -- Buffer ---------------------------------------------------------------------- - * Buffer to parse: 6c 69 6e 65 20 32 0a - * Matched data: 6c 69 6e 65 20 32 0a - * Remaining buffer: - << line 2 # verbosity = 3 shows raw sse events diff --git a/tests/testthat/test-resp-stream.R b/tests/testthat/test-resp-stream.R index 9e72046b..5a6e1686 100644 --- a/tests/testthat/test-resp-stream.R +++ b/tests/testthat/test-resp-stream.R @@ -15,6 +15,20 @@ test_that("can stream bytes from a connection", { expect_length(out, 0) }) +test_that("can stream lines from a connection", { + resp <- request_test("/stream/10") %>% req_perform_connection() + withr::defer(close(resp)) + + out <- resp_stream_lines(resp, 1) + expect_length(out, 1) + + out <- resp_stream_lines(resp, 10) + expect_length(out, 9) + + out <- resp_stream_lines(resp, 1) + expect_length(out, 0) +}) + test_that("can determine if a stream is complete (blocking)", { resp <- request_test("/stream-bytes/2048") %>% req_perform_connection() withr::defer(close(resp)) @@ -78,7 +92,7 @@ test_that("can join lines across multiple reads", { out <- resp_stream_lines(resp1) expect_equal(out, character()) - expect_equal(resp1$cache$push_back, charToRaw("This is a ")) + expect_equal(resp1$cache$buffer$peek_all(), charToRaw("This is a ")) out <- resp_stream_lines(resp1) expect_equal(out, character()) @@ -231,7 +245,7 @@ test_that("can join sse events across multiple reads", { out <- resp_stream_sse(resp1) expect_equal(out, NULL) - expect_equal(resp1$cache$push_back, charToRaw("data: 1\n")) + expect_equal(resp1$cache$buffer$peek_all(), charToRaw("data: 1\n")) sync() out <- resp_stream_sse(resp1) @@ -240,7 +254,7 @@ test_that("can join sse events across multiple reads", { sync() out <- resp_stream_sse(resp1) expect_equal(out, list(type = "message", data = "1\n2", id = "")) - expect_equal(resp1$cache$push_back, charToRaw("data: 3\n\n")) + expect_equal(resp1$cache$buffer$peek_all(), charToRaw("data: 3\n\n")) out <- resp_stream_sse(resp1) expect_equal(out, list(type = "message", data = "3", id = "")) @@ -275,7 +289,7 @@ test_that("sse always interprets data as UTF-8", { Encoding(s) <- "UTF-8" expect_equal(out, list(type = "message", data = s, id = "")) expect_equal(Encoding(out$data), "UTF-8") - expect_equal(resp1$cache$push_back, raw()) + expect_equal(resp1$cache$buffer$peek_all(), raw()) }) test_that("streaming size limits enforced", { @@ -358,17 +372,26 @@ test_that("verbosity = 3 shows raw sse events", { test_that("has a working find_event_boundary", { boundary_test <- function(x, matched, remaining) { - buffer <- charToRaw(x) - split_at <- find_event_boundary(buffer) - result <- if (is.null(split_at)) { - NULL + if (is.null(matched)) { + exp <- list(matched = NULL, remaining = NULL) + } else { + exp <- list( + matched = charToRaw(matched), + remaining = charToRaw(remaining) + ) + } + + buffer <- RingBuffer$new() + buffer$push(charToRaw(x)) + + loc <- find_event_boundary(buffer) + if (is.null(loc)) { + act <- list(matched = NULL, remaining = NULL) } else { - split_buffer(buffer, split_at) + act <- list(matched = buffer$pop(loc), remaining = buffer$pop()) } - expect_identical( - result, - list(matched = charToRaw(matched), remaining = charToRaw(remaining)) - ) + + expect_equal(act, exp) } # Basic matches @@ -389,12 +412,12 @@ test_that("has a working find_event_boundary", { boundary_test("\r\r\n\n", matched = "\r\r", remaining = "\n\n") # Non-matches - expect_null(find_event_boundary(charToRaw("\n\r\n\r"))) - expect_null(find_event_boundary(charToRaw("hello\ngoodbye\n"))) - expect_null(find_event_boundary(charToRaw(""))) - expect_null(find_event_boundary(charToRaw("1"))) - expect_null(find_event_boundary(charToRaw("12"))) - expect_null(find_event_boundary(charToRaw("\r\n\r"))) + boundary_test("\n\r\n\r", matched = NULL) + boundary_test("hello\ngoodbye\n", matched = NULL) + boundary_test("", matched = NULL) + boundary_test("1", matched = NULL) + boundary_test("12", matched = NULL) + boundary_test("\r\n\r", matched = NULL) }) # parse_event ---------------------------------------------------------------- diff --git a/tests/testthat/test-ring-buffer.R b/tests/testthat/test-ring-buffer.R new file mode 100644 index 00000000..5482fcaf --- /dev/null +++ b/tests/testthat/test-ring-buffer.R @@ -0,0 +1,349 @@ +test_that("RingBuffer initializes correctly", { + # Test with default capacity + rb <- RingBuffer$new() + expect_equal(rb$size(), 0) + expect_true(rb$is_empty()) + + # Test with custom capacity + rb <- RingBuffer$new(32) + expect_equal(rb$capacity(), 32) +}) + +test_that("push and pop operations work correctly", { + rb <- RingBuffer$new(10) + + # Push single byte + rb$push(as.raw(0x01)) + expect_equal(rb$size(), 1) + expect_false(rb$is_empty()) + + # Pop single byte + data <- rb$pop() + expect_equal(data, as.raw(0x01)) + expect_equal(rb$size(), 0) + expect_true(rb$is_empty()) +}) + +test_that("pushing multiple bytes works correctly", { + rb <- RingBuffer$new(10) + + # Push multiple bytes + test_data <- as.raw(c(0x01, 0x02, 0x03, 0x04, 0x05)) + rb$push(test_data) + expect_equal(rb$size(), 5) + + # Pop and verify + result <- rb$pop(5) + expect_equal(result, test_data) + expect_true(rb$is_empty()) +}) + +test_that("popping from empty buffer returns empty raw vector", { + rb <- RingBuffer$new(10) + + # Pop from empty buffer + result <- rb$pop() + expect_equal(result, raw(0)) + + # Pop multiple from empty buffer + result <- rb$pop(5) + expect_equal(result, raw(0)) +}) + +test_that("pop respects available bytes", { + rb <- RingBuffer$new(10) + + # Add 3 bytes + rb$push(as.raw(c(0x01, 0x02, 0x03))) + + # Try to pop 5 bytes (should only get 3) + result <- rb$pop(5) + expect_equal(result, as.raw(c(0x01, 0x02, 0x03))) + expect_equal(rb$size(), 0) +}) + +test_that("buffer wraps around correctly", { + rb <- RingBuffer$new(5) + + # Fill buffer + rb$push(as.raw(c(0x01, 0x02, 0x03))) + + # Pop one byte + rb$pop() + + # Push more to force wrap + rb$push(as.raw(c(0x04, 0x05, 0x06))) + + # Pop all and verify correct order + result <- rb$pop(5) + expect_equal(result, as.raw(c(0x02, 0x03, 0x04, 0x05, 0x06))) +}) + +test_that("buffer grows automatically", { + rb <- RingBuffer$new(4) + + # Initial capacity + expect_equal(rb$capacity(), 4) + + # Push data that fills buffer + rb$push(as.raw(c(0x01, 0x02, 0x03, 0x04))) + + # Verify full but not grown + expect_equal(rb$capacity(), 4) + expect_equal(rb$size(), 4) + + # Push one more byte to force growth + rb$push(as.raw(0x05)) + + # Verify growth + expect_equal(rb$capacity(), 8) # Should double + expect_equal(rb$size(), 5) + + # Verify data integrity + result <- rb$pop(5) + expect_equal(result, as.raw(c(0x01, 0x02, 0x03, 0x04, 0x05))) +}) + +test_that("buffer grows to required size", { + rb <- RingBuffer$new(4) + + # Push large chunk at once + large_data <- as.raw(1:10) + rb$push(large_data) + + # Verify grew enough to accommodate data + expect_true(rb$capacity() >= 10) + expect_equal(rb$size(), 10) + + # Verify data integrity + result <- rb$pop(10) + expect_equal(result, large_data) +}) + +test_that("wrapping with partial reads and writes works", { + rb <- RingBuffer$new(6) + + # Fill buffer partially + rb$push(as.raw(c(0x01, 0x02, 0x03, 0x04))) + + # Read part + data1 <- rb$pop(2) + expect_equal(data1, as.raw(c(0x01, 0x02))) + + # Add more to force wrap + rb$push(as.raw(c(0x05, 0x06, 0x07, 0x08))) + + # Read across wrap boundary + data2 <- rb$pop(4) + expect_equal(data2, as.raw(c(0x03, 0x04, 0x05, 0x06))) + + # Read remaining + data3 <- rb$pop(2) + expect_equal(data3, as.raw(c(0x07, 0x08))) +}) + +test_that("push returns invisibly for method chaining", { + rb <- RingBuffer$new(10) + + # Method chaining + rb$push(as.raw(0x01))$push(as.raw(0x02)) + + expect_equal(rb$size(), 2) + expect_equal(rb$pop(2), as.raw(c(0x01, 0x02))) +}) + +test_that("extreme growth works", { + # Start with tiny buffer + rb <- RingBuffer$new(2) + + # Push large amount of data + large_data <- rep(as.raw(1), 1000) + rb$push(large_data) + + # Verify capacity and data + expect_true(rb$capacity() >= 1000) + expect_equal(rb$size(), 1000) + expect_equal(rb$pop(1000), large_data) +}) + +test_that("buffer doubles in size when growing from capacity 1", { + # Create a ring buffer with capacity 1 to test the max(1, private$.capacity * 2) line + rb <- RingBuffer$new(1) + expect_equal(rb$capacity(), 1) + + # Adding two bytes should trigger resize and double capacity from 1 to 2 + rb$push(as.raw(c(0x01, 0x02))) + expect_equal(rb$capacity(), 2) + + # Data should remain intact + expect_equal(rb$pop(2), as.raw(c(0x01, 0x02))) +}) + +test_that("contiguous data is correctly preserved when resizing", { + rb <- RingBuffer$new(4) + + test_data <- as.raw(c(0x01, 0x02, 0x03, 0x04)) + rb$push(test_data) + expect_equal(rb$size(), 4) + expect_equal(rb$capacity(), 4) + + rb$push(as.raw(0x05)) + expect_equal(rb$capacity(), 8) + expect_equal(rb$size(), 5) + + # Pop all data to verify it was preserved in correct order + expect_equal(rb$pop(5), as.raw(c(0x01, 0x02, 0x03, 0x04, 0x05))) +}) + +test_that("contiguous data with head and tail in middle is preserved when resizing", { + rb <- RingBuffer$new(10) + + # Add some initial data + rb$push(as.raw(c(0x01, 0x02, 0x03, 0x04, 0x05))) + # Remove some from the beginning to move the tail pointer + rb$pop(2) + # Add more data but not enough to wrap + rb$push(as.raw(c(0x06, 0x07))) + + # Current state: [-, -, 0x03, 0x04, 0x05, 0x06, 0x07, -, -, -] + # ^tail ^head + # Internally: .tail = 3, .count = 5 + + # Now add enough data to force resize + rb$push(as.raw(c(0x08, 0x09, 0x0A, 0x0B, 0x0C))) + + # Verify data preserved after resize + result <- rb$pop(10) + expect_equal( + result, + as.raw(c(0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C)) + ) +}) + +test_that("sequence indexing works correctly in contiguous case", { + # This specifically tests the idx <- seq(from = private$.tail, length.out = private$.count) line + rb <- RingBuffer$new(10) + + # Add some data + test_data <- as.raw(c(0xA1, 0xA2, 0xA3, 0xA4, 0xA5)) + rb$push(test_data) + + # Pop some to move tail + rb$pop(2) + + # Add more data that triggers resize and tests sequence indexing + rb$push(as.raw(c(0xB1, 0xB2, 0xB3, 0xB4, 0xB5, 0xB6, 0xB7, 0xB8))) + + # Data should be: [0xA3, 0xA4, 0xA5, 0xB1, 0xB2, 0xB3, 0xB4, 0xB5, 0xB6, 0xB7, 0xB8] + # Verify with partial pops to test sequence indexing worked correctly + + result1 <- rb$pop(3) + expect_equal(result1, as.raw(c(0xA3, 0xA4, 0xA5))) + + result2 <- rb$pop(8) + expect_equal( + result2, + as.raw(c(0xB1, 0xB2, 0xB3, 0xB4, 0xB5, 0xB6, 0xB7, 0xB8)) + ) +}) + +test_that("direct vector assignment in push works correctly", { + # This tests the new_buffer[1:private$.count] <- private$.buffer[idx] line + rb <- RingBuffer$new(5) + + # Fill buffer + rb$push(as.raw(c(0x01, 0x02, 0x03, 0x04, 0x05))) + + # Pop some to move tail + rb$pop(2) + + # Push more to cause resize + rb$push(as.raw(c(0x06, 0x07, 0x08))) + + # Pop everything and check order + result <- rb$pop(6) + expect_equal(result, as.raw(c(0x03, 0x04, 0x05, 0x06, 0x07, 0x08))) + + # Check capacity doubled + expect_equal(rb$capacity(), 10) +}) + +test_that("multiple resize operations maintain data integrity", { + # This tests repeated execution of the resize method + rb <- RingBuffer$new(2) + + # Initial data + rb$push(as.raw(c(0x01, 0x02))) + + # First resize + rb$push(as.raw(0x03)) + expect_equal(rb$capacity(), 4) + + # Second resize + rb$push(as.raw(c(0x04, 0x05))) + expect_equal(rb$capacity(), 8) + + # Third resize + rb$push(as.raw(c(0x06, 0x07, 0x08, 0x09, 0x0A))) + expect_equal(rb$capacity(), 16) + + # Verify all data maintained correctly + result <- rb$pop(10) + expect_equal( + result, + as.raw(c(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A)) + ) +}) + +test_that("buffer resizes to exact required size", { + # This tests the resize with required_size parameter + rb <- RingBuffer$new(5) + + # Add data until we have 3 elements + rb$push(as.raw(c(0x01, 0x02, 0x03))) + + # Add a chunk that requires specific capacity (15 more elements, total 18) + large_chunk <- as.raw(seq(0x10, 0x1E)) # 15 elements + rb$push(large_chunk) + + # Verify capacity is large enough + expect_true(rb$capacity() >= 18) + + # Verify data integrity + result <- rb$pop(18) + expect_equal(result[1:3], as.raw(c(0x01, 0x02, 0x03))) + expect_equal(result[4:18], large_chunk) +}) + +test_that("peek works correctly", { + rb <- RingBuffer$new() + + # Test empty buffer + expect_null(rb$peek(1)) + + # Test single byte peek + rb$push(as.raw(c(1, 2, 3, 4))) + expect_equal(rb$peek(1), as.raw(1)) + expect_equal(rb$peek(2), as.raw(2)) + expect_equal(rb$peek(4), as.raw(4)) + + # Test peek beyond available data + expect_null(rb$peek(5)) + + # Test peek after some data removed + rb$pop(2) # Remove first two bytes + expect_equal(rb$peek(1), as.raw(3)) + expect_equal(rb$peek(2), as.raw(4)) + expect_null(rb$peek(3)) + + # Test peek with wrapped buffer + rb <- RingBuffer$new(5) + rb$push(as.raw(c(1, 2, 3))) # [1,2,3,_,_] + rb$pop(2) # [_,_,3,_,_] + rb$push(as.raw(c(4, 5, 6))) # [5,6,3,4,_] + + expect_equal(rb$peek(1), as.raw(3)) + expect_equal(rb$peek(2), as.raw(4)) + expect_equal(rb$peek(3), as.raw(5)) + expect_equal(rb$peek(4), as.raw(6)) +})