Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate Sys.sleep() in streaming tests #683

Open
hadley opened this issue Feb 12, 2025 · 9 comments · May be fixed by #703
Open

Eliminate Sys.sleep() in streaming tests #683

hadley opened this issue Feb 12, 2025 · 9 comments · May be fixed by #703

Comments

@hadley
Copy link
Member

hadley commented Feb 12, 2025

By using nanoext to sychronise:

test_that("can join lines across multiple reads", {
  speaker <- nanonext::socket("req", dial = "ipc://httr2-test")
  withr::defer(unlink("httr2-test"))
  
  req <- local_app_request(function(req, res) {
    listener <- nanonext::socket("rep", listen = "ipc://httr2-test")

    res$send_chunk("This is a ")
    nanonext::recv(listener, block = TRUE)
    res$send_chunk("complete sentence.\n")
  })

  # Non-blocking returns NULL until data is ready
  resp1 <- req_perform_connection(req, blocking = FALSE)
  withr::defer(close(resp1))

  out <- resp_stream_lines(resp1)
  expect_equal(out, character())
  expect_equal(resp1$cache$push_back, charToRaw("This is a "))

  nanonext::send(speaker, "next", block = TRUE)
  out <- resp_stream_lines(resp1)
  expect_equal(out, "This is a complete sentence.")
})

Would need to figure out how to make a bit easier to reuse so we don't need so much boilerplate in each function.

@shikokuchuo
Copy link
Member

To synchronise 2 processes, the typical pattern is one req/rep cycle: send - recv - ack(nowledgement).

So in the below I've added an ack by the app once it's sent the next chunk, and the test waits for that before trying to read it.

Some other small cleanups to make it CRAN-ready:

  • Skip if nanonext not installed.
  • Use tempfile(). Technically the unlink is unnecessary as it's just a temporary file for unix domain sockets, but safer to keep it for CRAN "black swan" testing. On Windows this is just a name for a pipe and is not created on the filesystem.
  • Use a maximum block time (again CRAN-paranoia, shouldn't be an issue when using IPC).
test_that("can join lines across multiple reads", {
  skip_if_not_installed("nanonext")
  tmp <- tempfile()
  url <- sprintf("ipc://%s", tmp)
  speaker <- nanonext::socket("req", dial = url)
  withr::defer(unlink(tmp))
  
  req <- local_app_request(function(req, res) {
    listener <- nanonext::socket("rep", listen = url)
    
    res$send_chunk("This is a ")
    nanonext::recv(listener, block = 1000)
    res$send_chunk("complete sentence.\n")
    nanonext::send(listener, NULL, block = 1000)
  })
  
  # Non-blocking returns NULL until data is ready
  resp1 <- req_perform_connection(req, blocking = FALSE)
  withr::defer(close(resp1))
  
  out <- resp_stream_lines(resp1)
  expect_equal(out, character())
  expect_equal(resp1$cache$push_back, charToRaw("This is a "))
  
  nanonext::send(speaker, "next", block = 1000)
  nanonext::recv(speaker, block = 1000)
  out <- resp_stream_lines(resp1)
  expect_equal(out, "This is a complete sentence.")
})

@shikokuchuo
Copy link
Member

And this is an equivalent simplified version that uses nanonext::request() and nanonext::reply(), which implements the logic of the RPC model in a systematic way:

test_that("can join lines across multiple reads", {
  skip_if_not_installed("nanonext")
  tmp <- tempfile()
  url <- sprintf("ipc://%s", tmp)
  speaker <- nanonext::socket("req", dial = url)
  withr::defer(unlink(tmp))
  
  req <- local_app_request(function(req, res) {
    listener <- nanonext::socket("rep", listen = url)
    
    res$send_chunk("This is a ")
    nanonext::reply(nanonext::.context(listener), res$send_chunk, timeout = 1000)
  })
  
  # Non-blocking returns NULL until data is ready
  resp1 <- req_perform_connection(req, blocking = FALSE)
  withr::defer(close(resp1))
  
  out <- resp_stream_lines(resp1)
  expect_equal(out, character())
  expect_equal(resp1$cache$push_back, charToRaw("This is a "))
  
  nanonext::request(nanonext::.context(speaker), "complete sentence.\n", timeout = 1000)[]
  out <- resp_stream_lines(resp1)
  expect_equal(out, "This is a complete sentence.")
})

Some details:

  • .context() creates a context which ensures replies are received for the correct request - not relevant here but will be for anything a bit more complex.
  • request() is async, and hence you wait for it using the [ method, which is just syntactic sugar for collect_aio().
  • reply() executes a function with what is received as the argument, and sends back the return value.

@hadley
Copy link
Member Author

hadley commented Feb 13, 2025

I think these will just skip_on_cran(), so we can simplify with that in mind.

@shikokuchuo
Copy link
Member

shikokuchuo commented Feb 14, 2025

I've had a go at simplification.

The sync_req() and sync_rep() helper functions may even belong in nanonext if there's a general use for this.

test_that("can join lines across multiple reads", {
  skip_if_not_installed("nanonext")
  
  #' The `$sync()` method creates a synchronization point - it sends a request and
  #' cannot proceed until the other side has responded (subject to the timeout).
  #' Executes `expr` after the request is sent and before the ack is received.
  sync_req <- function(name = "default") {
    sock <- nanonext::socket("req", listen = sprintf("ipc:///tmp/%s", name))
    list(
      sync = function(expr = {}, timeout = 1000L) {
        ctx <- nanonext::.context(sock)
        nanonext::send(ctx, 0L, mode = 2L, block = TRUE)
        expr
        nanonext::recv(ctx, mode = 8L, block = timeout)
      },
      close = function() nanonext::reap(sock)
    )
  }
  
  #' The `$sync()` method creates a synchronization point - it waits for a sync
  #' request (subject to the timeout) and sends an acknowledgement. Executes
  #' `expr` after the request is received and before the ack is sent.
  sync_rep <- function(name = "default") {
    sock <- nanonext::socket("rep", dial = sprintf("ipc:///tmp/%s", name))
    list(
      sync = function(expr = {}, timeout = 1000L) {
        ctx <- nanonext::.context(sock)
        nanonext::recv(ctx, mode = 8L, block = timeout)
        expr
        nanonext::send(ctx, 0L, mode = 2L, block = TRUE)
      },
      close = function() nanonext::reap(sock)
    )
  }
  
  speaker <- sync_req()
  withr::defer(speaker$close())

  req <- local_app_request(function(req, res) {
    listener <- sync_rep()
    
    res$send_chunk("This is a ")
    listener$sync(res$send_chunk("complete sentence.\n"))
  })
  
  # Non-blocking returns NULL until data is ready
  resp1 <- req_perform_connection(req, blocking = FALSE)
  withr::defer(close(resp1))
  
  out <- resp_stream_lines(resp1)
  expect_equal(out, character())
  expect_equal(resp1$cache$push_back, charToRaw("This is a "))
  
  speaker$sync()
  out <- resp_stream_lines(resp1)
  expect_equal(out, "This is a complete sentence.")
})

@hadley
Copy link
Member Author

hadley commented Feb 14, 2025

This might be getting too clever, but you could make them auto-close, and then there's only one function to return:

sync_req <- function(name = "default", .env = parent.frame()) {
  sock <- nanonext::socket("req", listen = sprintf("ipc:///tmp/%s", name))
  withr::defer(nanonext::reap(sock), envir = .env)

  function(expr = {}, timeout = 1000L) {
    ctx <- nanonext::.context(sock)
    nanonext::send(ctx, 0L, mode = 2L, block = TRUE)
    expr
    nanonext::recv(ctx, mode = 8L, block = timeout)
  }
}

Not sure of the implications of this on sync_rep(), since that would tie the scope to a single request. (But that seems likely to be fine?)

@shikokuchuo
Copy link
Member

Yeah that's clever - and I think makes for a much cleaner design!

You'd mirror it on the rep side, and the full example would then look like this:

test_that("can join lines across multiple reads", {
  skip_if_not_installed("nanonext")

  sync_req <- function(name = "default", .env = parent.frame()) {
    sock <- nanonext::socket("req", listen = sprintf("ipc:///tmp/%s", name))
    withr::defer(nanonext::reap(sock), envir = .env)
    
    function(expr = {}, timeout = 1000L) {
      ctx <- nanonext::.context(sock)
      nanonext::send(ctx, 0L, mode = 2L, block = TRUE)
      expr
      nanonext::recv(ctx, mode = 8L, block = timeout)
    }
  }

  sync_rep <- function(name = "default", .env = parent.frame()) {
    sock <- nanonext::socket("rep", dial = sprintf("ipc:///tmp/%s", name))
    withr::defer(nanonext::reap(sock), envir = .env)
    
    function(expr = {}, timeout = 1000L) {
      ctx <- nanonext::.context(sock)
      nanonext::recv(ctx, mode = 8L, block = timeout)
      expr
      nanonext::send(ctx, 0L, mode = 2L, block = TRUE)
    }
  }

  sync <- sync_req()
  
  req <- local_app_request(function(req, res) {
    sync <- sync_rep()
    
    res$send_chunk("This is a ")
    sync(res$send_chunk("complete sentence.\n"))
  })
  
  # Non-blocking returns NULL until data is ready
  resp1 <- req_perform_connection(req, blocking = FALSE)
  withr::defer(close(resp1))
  
  out <- resp_stream_lines(resp1)
  expect_equal(out, character())
  expect_equal(resp1$cache$push_back, charToRaw("This is a "))
  
  sync()
  out <- resp_stream_lines(resp1)
  expect_equal(out, "This is a complete sentence.")
})

@hadley hadley added this to the v1.1.1 milestone Feb 14, 2025
@hadley
Copy link
Member Author

hadley commented Feb 14, 2025

One small problem: if I do sync_req()(), I can't terminate the waiting with Ctrl + C.

@shikokuchuo
Copy link
Member

We can just switch it to async with an interruptible wait. I'll update it when I'm back at a screen.

@shikokuchuo
Copy link
Member

These should be robust enough.

  sync_req <- function(name = "default", .env = parent.frame()) {
    sock <- nanonext::socket("req", listen = sprintf("ipc:///tmp/nanonext%s", name))
    withr::defer(nanonext::reap(sock), envir = .env)
    
    function(expr = {}, timeout = 1000L) {
      ctx <- nanonext::.context(sock)
      saio <- nanonext::send_aio(ctx, 0L, mode = 2L)
      expr
      nanonext::recv_aio(ctx, mode = 8L, timeout = timeout)[]
    }
  }

  sync_rep <- function(name = "default", .env = parent.frame()) {
    sock <- nanonext::socket("rep", dial = sprintf("ipc:///tmp/nanonext%s", name))
    withr::defer(nanonext::reap(sock), envir = .env)
    
    function(expr = {}, timeout = 1000L) {
      ctx <- nanonext::.context(sock)
      nanonext::recv_aio(ctx, mode = 8L, timeout = timeout)[]
      expr
      nanonext::send(ctx, 0L, mode = 2L, block = TRUE)
    }
  }

I keep the final send() synchronous and blocking in sync_rep() to ensure it fully completes in case it's in an ephemeral session that's exiting. It's safe do to this as if the previous receive timed out, then the send will fail with an 'incorrect state' errorValue as a 'rep' socket can only respond after receiving a request.

hadley added a commit that referenced this issue Feb 14, 2025
* Use manual syncing instead of sleeping
* Automatically track number of calls

Fixes #683
@hadley hadley modified the milestone: v1.1.1 Feb 14, 2025
@hadley hadley removed this from the v1.1.1 milestone Mar 5, 2025
@shikokuchuo shikokuchuo linked a pull request Mar 5, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants