Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronous Outlet for zero-copying socket writing #153

Closed
wants to merge 5 commits into from

Conversation

cboulay
Copy link
Collaborator

@cboulay cboulay commented Oct 4, 2021

This implements a code path that allows for synchronous (blocked) calls to push_* by setting an outlet constructor argument do_async to false or 0. In this path, liblsl performs 0 (zero) copies on the data, compared to the 2 copies performed wen doing asynchronous writing. This can have dramatic speed / CPU usage improvements when using high bandwidth data (18%->5% in one of my tests).

Note that this contains some commits from the double_buflen branch. Currently the only relevant commit to examine is c64aeb9.

Some concerns:

  • The time spent in push_* depends on how many consumers are added. This can lead to unpredictable user experience unless the user application calls push_* from a non-interactive thread.
  • tcp_server -- I tried to skip launching the transfer_samples_thread but this did not work out. So now we have a thread doing nothing. I will try again.
  • I only enabled the do_async flag on the very new lsl_create_outlet_ex C method, and preserved the API of the existing lsl_create_outlet method.
  • stream_outlet_impl.h now has #include <asio/buffer.hpp> for private member std::vector<asio::const_buffer> sync_buffs_. A future revision should change this to std::vector<const_buffer_p> defined elsewhere.
  • I would like to implement an outlet method to push_buffers_directly(std::vector<buffer_type> buffers, double timestamp, std::vector<std::size_t> frame_indices) where frame_indices gives the indices in buffers that start a new frame (frame := multi-channel sample). The goal is to enable the user to push discontiguous frames (e.g., disabled channels still allocated in device buffer) without first copying to a contiguous frame. I'm not sure I actually need this.

@cboulay cboulay changed the base branch from master to cboulay/double_buflen October 4, 2021 19:54
@cboulay cboulay changed the base branch from cboulay/double_buflen to master October 4, 2021 19:54
@cboulay cboulay changed the base branch from master to cboulay/double_buflen October 4, 2021 19:57
@cboulay
Copy link
Collaborator Author

cboulay commented Oct 4, 2021

Disabling the transfer_samples_thread leads to errors on the inlet at chunk boundaries on line https://github.com/sccn/liblsl/blob/cboulay/outlet_sync/src/data_receiver.cpp#L317-L318 . If I leave the do-nothing thread in place then it seems to work fine. 🤷

@cboulay
Copy link
Collaborator Author

cboulay commented Oct 5, 2021

Note to self: disable synchronous-write-mode when data format is string or when byte order is not default.

@cboulay
Copy link
Collaborator Author

cboulay commented Oct 6, 2021

Thanks @tstenner for pointing out that the transfer_samples_thread becomes the sole owner of the client_session. Without the thread we would need the client_session to exist elsewhere. Maybe in a container with the socket? I don't know yet.

@cboulay cboulay force-pushed the cboulay/double_buflen branch from bad96d6 to 08024cc Compare October 8, 2021 14:53
@cboulay cboulay force-pushed the cboulay/outlet_sync branch from d66ef0c to 6cf0600 Compare October 8, 2021 14:53
@cboulay
Copy link
Collaborator Author

cboulay commented Oct 8, 2021

Rebased on #146 which was recently rebased on #155. Still a little work to do here.

@cboulay cboulay force-pushed the cboulay/outlet_sync branch 3 times, most recently from e59ffc3 to d40eacd Compare October 11, 2021 03:03
@cboulay cboulay force-pushed the cboulay/outlet_sync branch from d40eacd to 6b14a1f Compare October 11, 2021 03:10
examples/SendData.cpp Outdated Show resolved Hide resolved
examples/SendData.cpp Outdated Show resolved Hide resolved
examples/SendData.cpp Show resolved Hide resolved
else {
// Advanced: Push set of discontiguous buffers.
array<float *, 2> bufs = {sample.data(), extra.data()};
outlet.push_numeric_bufs(reinterpret_cast<const char **>(const_cast<const float**>(bufs.data())),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new push_numeric_bufs should be part of a separate PR; as it is it's easy to misuse the API / provoke undefined behavior and the casts should be handled more safely by the C++ API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree, but unfortunately I need this functionality from pure C. So it was either this, or add 14 functions to the C API. We can do that eventually, after all the inner workings are settled on.

include/lsl/outlet.h Outdated Show resolved Hide resolved
src/stream_outlet_impl.cpp Outdated Show resolved Hide resolved
@@ -170,4 +207,10 @@ template void stream_outlet_impl::enqueue<float>(const float *data, double, bool
template void stream_outlet_impl::enqueue<double>(const double *data, double, bool);
template void stream_outlet_impl::enqueue<std::string>(const std::string *data, double, bool);

void stream_outlet_impl::enqueue_sync_multi(std::vector<asio::const_buffer> buffs, double timestamp, bool pushthrough) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: remove enqueue_sync and make this a template that takes a pair of asio::const_buffer iterators/pointers

void tcp_server::write_all_blocking(std::vector<asio::const_buffer> buffs) {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
std::size_t bytes_sent = 0;
for (const auto &x : inflight_ready_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This waits for each preceding outlet to acknowledge that the complete data has been received before transmitting data to the next outlet. Suggestion: keep this code when there's only one inlet, start async writes to all inlets and wait for all writes to succeed before returning.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know of an easy way to do this without semaphores (C++20). Do you?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sync outlets you don't have a data transfer thread, so the calling thread is responsible for performing the IO, i.e. call send() or send_async()+io_context::run(). In pseudocode:

if consumers.size == 0:
    return
if consumers.size() == 1:
    res = consumers[0].send(data)
    if res != ok:
        handle_error()
else:
    io_ctx.restart()
    for consumer in consumers:
        consumer.send_async(data, [](err_t err) {
            if res != ok:
                handle_error()
            })
    steady_timer timer;
    timer.wait(3s, [](err_t err) { if err != ok: … });
    io_ctx.run();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming here that consumers is a tcp::socket, unfortunately socket.send(...) often fails to send the entire set of buffers. I was getting garbled data until I switched to asio::write(socket, bufs).

I don't know about socket.send_async vs asio::async_write. I hope the latter can successfully send everything (eventually) without having to monitor bytes_sent and re-entering the send condition.
I don't know if that changes the interaction with io_ctx. I'll give it a try when I get back to this. Thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right about socket.send_async(), so that would need to be asio::async_write() and also asio::write(). It doesn't change anything for the io_ctx, as asio::(async_)write just keeps track of the data already sent and starts a new send operation if it isn't finished yet.

src/tcp_server.cpp Outdated Show resolved Hide resolved
src/tcp_server.cpp Outdated Show resolved Hide resolved
incorporate other suggestions from tstenner.
Currently not working properly!
@cboulay
Copy link
Collaborator Author

cboulay commented Nov 5, 2021

I made some changes based on your suggestions. However, the multiple-client code is probably still broken. I had to deprioritize that due to time constraints. I hope to get back to this soon.

@tstenner tstenner self-assigned this Nov 12, 2021
@cboulay
Copy link
Collaborator Author

cboulay commented Dec 22, 2021

I discovered that outlet.have_consumers doesn't work properly in sync mode.

@cboulay
Copy link
Collaborator Author

cboulay commented Jun 12, 2022

Closing in favour of #170

@cboulay cboulay closed this Jun 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants