Skip to content

Commit

Permalink
Implement synchronous outlet for zero-copy writes.
Browse files Browse the repository at this point in the history
  • Loading branch information
cboulay committed Oct 8, 2021
1 parent 08024cc commit 6cf0600
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 42 deletions.
8 changes: 6 additions & 2 deletions examples/ReceiveDataInChunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
int main(int argc, char **argv) {
std::cout << "ReceiveDataInChunks" << std::endl;
std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl;
std::cout << "- max_buffered -- duration in msec to buffer" << std::endl;
std::cout << "- flush -- set non-zero to flush data instead of pulling; useful for testing throughput" << std::endl;

try {

std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360;
double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.;
bool flush = argc > 3;
// resolve the stream of interest & make an inlet
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen);
int32_t buf_samples = (int32_t)(max_buflen * 1000);
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen,
transp_bufsize_thousandths);

// Use set_postprocessing to get the timestamps in a common base clock.
// Do not use if this application will record timestamps to disk -- it is better to
Expand Down
35 changes: 24 additions & 11 deletions examples/SendDataInChunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ struct fake_device {
pattern.reserve(pattern_samples * n_channels);
for (auto sample_ix = 0; sample_ix < pattern_samples; ++sample_ix) {
for (auto chan_ix = 0; chan_ix < n_channels; ++chan_ix) {
// sin(2*pi*f*t), where f cycles from 1 Hz to Nyquist: srate / 2
double f = (chan_ix + 1) % (int)(srate / 2);
pattern.emplace_back(
offset_0 + chan_ix * offset_step +
magnitude * static_cast<int16_t>(sin(M_PI * chan_ix * sample_ix / n_channels)));
magnitude * static_cast<int16_t>(sin(2 * M_PI * f * sample_ix / srate)));
}
}
last_time = std::chrono::steady_clock::now();
Expand All @@ -64,15 +66,15 @@ struct fake_device {
return output;
}

std::size_t get_data(std::vector<int16_t> &buffer) {
std::size_t get_data(std::vector<int16_t> &buffer, bool nodata = false) {
auto now = std::chrono::steady_clock::now();
auto elapsed_nano =
std::chrono::duration_cast<std::chrono::nanoseconds>(now - last_time).count();
int64_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK.
elapsed_samples = std::min(elapsed_samples, (int64_t)(buffer.size() / n_channels));
if (false) {
if (nodata) {
// The fastest but no patterns.
memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]);
// memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]);
} else {
std::size_t end_sample = head + elapsed_samples;
std::size_t nowrap_samples = std::min(pattern_samples - head, elapsed_samples);
Expand All @@ -89,22 +91,26 @@ struct fake_device {

int main(int argc, char **argv) {
std::cout << "SendDataInChunks" << std::endl;
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl;
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate nodata use_sync" << std::endl;
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl;
std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl;

std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into the buffer." << std::endl;
std::cout << "- use_sync -- Set to non-zero to use blocking send." << std::endl;

std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"};
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device.
int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device.
int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360;
double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.;
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
bool nodata = argc > 7;
bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true;

int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk

try {
// Prepare the LSL stream.
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16);
lsl::stream_outlet outlet(info, 0, max_buffered);
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16, "example-SendDataInChunks");
lsl::xml_element desc = info.desc();
desc.append_child_value("manufacturer", "LSL");
lsl::xml_element chns = desc.append_child("channels");
Expand All @@ -114,13 +120,19 @@ int main(int argc, char **argv) {
chn.append_child_value("unit", "microvolts");
chn.append_child_value("type", "EEG");
}
int32_t buf_samples = max_buffered * samplingrate;
lsl::stream_outlet outlet(info, chunk_samples, buf_samples,
transp_bufsize_samples | (do_sync ? transp_sync_blocking: transp_default));
info = outlet.info(); // Refresh info with whatever the outlet captured.
std::cout << "Stream UID: " << info.uid() << std::endl;

// Create a connection to our device.
fake_device my_device(n_channels, (float)samplingrate);

// Prepare buffer to get data from 'device'.
// The buffer should be larger than you think you need. Here we make it 4x as large.
std::vector<int16_t> chunk_buffer(4 * chunk_samples * n_channels);
std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0);

std::cout << "Now sending data..." << std::endl;

Expand All @@ -133,11 +145,12 @@ int main(int argc, char **argv) {
std::this_thread::sleep_until(next_chunk_time);

// Get data from device
std::size_t returned_samples = my_device.get_data(chunk_buffer);
std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata);

// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
// other push_chunk methods are easier but slightly slower.
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, 0.0, true);
double ts = lsl::local_clock();
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, ts, true);
}

} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
Expand Down
3 changes: 3 additions & 0 deletions include/lsl/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ typedef enum {
/// The supplied max_buf should be scaled by 0.001.
transp_bufsize_thousandths = 2,

/// The outlet will use synchronous (blocking) calls to asio to push data
transp_sync_blocking = 4,

// prevent compilers from assuming an instance fits in a single byte
_lsl_transport_options_maxval = 0x7f000000
} lsl_transport_options_t;
Expand Down
50 changes: 44 additions & 6 deletions src/stream_outlet_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,22 @@
namespace lsl {

stream_outlet_impl::stream_outlet_impl(
const stream_info_impl &info, int32_t chunk_size, int32_t max_capacity)
const stream_info_impl &info, int32_t chunk_size, int32_t max_capacity, uint32_t flags)
: sample_factory_(std::make_shared<factory>(info.channel_format(), info.channel_count(),
static_cast<uint32_t>(
info.nominal_srate()
? info.nominal_srate() * api_config::get_instance()->outlet_buffer_reserve_ms() /
1000
: api_config::get_instance()->outlet_buffer_reserve_samples()))),
chunk_size_(chunk_size), info_(std::make_shared<stream_info_impl>(info)),
send_buffer_(std::make_shared<send_buffer>(max_capacity)) {
send_buffer_(std::make_shared<send_buffer>(max_capacity)),
do_sync_(flags & transp_sync_blocking) {

if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) {
LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async.");
do_sync_ = false;
}

ensure_lsl_initialized();
const api_config *cfg = api_config::get_instance();

Expand Down Expand Up @@ -143,8 +150,24 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
sample_p smp(
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
smp->assign_untyped(data);
send_buffer_->push_sample(smp);
if (!do_sync_) {
smp->assign_untyped(data); // Note: Makes a copy!
send_buffer_->push_sample(smp);
} else {
if (timestamp == DEDUCED_TIMESTAMP) {
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
} else {
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
}
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
if (pushthrough) {
for (auto &tcp_server : tcp_servers_)
tcp_server->write_all_blocking(sync_buffs_);
sync_buffs_.clear();
}
}

}

bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); }
Expand All @@ -158,8 +181,23 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
sample_p smp(
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
smp->assign_typed(data);
send_buffer_->push_sample(smp);
if (!do_sync_) {
smp->assign_typed(data);
send_buffer_->push_sample(smp);
} else {
if (timestamp == DEDUCED_TIMESTAMP) {
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
} else {
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
}
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
if (pushthrough) {
for (auto &tcp_server : tcp_servers_)
tcp_server->write_all_blocking(sync_buffs_);
sync_buffs_.clear();
}
}
}

template void stream_outlet_impl::enqueue<char>(const char *data, double, bool);
Expand Down
9 changes: 8 additions & 1 deletion src/stream_outlet_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <string>
#include <thread>
#include <vector>
#include <asio/buffer.hpp>

using asio::ip::tcp;
using asio::ip::udp;
Expand All @@ -35,9 +36,11 @@ class stream_outlet_impl {
* @param max_capacity The maximum number of samples buffered for unresponsive receivers. If
* more samples get pushed, the oldest will be dropped. The default is sufficient to hold a bit
* more than 15 minutes of data at 512Hz, while consuming not more than ca. 512MB of RAM.
* @param flags Bitwise-OR'd flags from lsl_transport_options_t
*/
stream_outlet_impl(
const stream_info_impl &info, int32_t chunk_size = 0, int32_t max_capacity = 512000);
const stream_info_impl &info, int32_t chunk_size = 0, int32_t max_capacity = 512000,
uint32_t flags = transp_default);

/**
* Destructor.
Expand Down Expand Up @@ -317,6 +320,8 @@ class stream_outlet_impl {
stream_info_impl_p info_;
/// the single-producer, multiple-receiver send buffer
send_buffer_p send_buffer_;
/// Flag to indicate that push_* operations should be blocking synchronous. false by default.
bool do_sync_;
/// the IO service objects (two per stack: one for UDP and one for TCP)
std::vector<io_context_p> ios_;

Expand All @@ -329,6 +334,8 @@ class stream_outlet_impl {
std::vector<udp_server_p> responders_;
/// threads that handle the I/O operations (two per stack: one for UDP and one for TCP)
std::vector<thread_p> io_threads_;
/// buffers used in synchronous call to gather-write data directly to the socket.
std::vector<asio::const_buffer> sync_buffs_;
};

} // namespace lsl
Expand Down
72 changes: 54 additions & 18 deletions src/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,36 +222,68 @@ void tcp_server::handle_accept_outcome(std::shared_ptr<client_session> newsessio
accept_next_connection();
}

// === synchronous transfer

void tcp_server::write_all_blocking(std::vector<asio::const_buffer> buffs) {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
std::size_t bytes_sent;
asio::error_code ec;
for (const auto &x : inflight_ready_) {
if (x.second && x.first->is_open()) {
bytes_sent = x.first->send(buffs, 0, ec);
if (ec) {
switch(ec.value()) {
case asio::error::broken_pipe:
case asio::error::connection_reset:
LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket.");
inflight_ready_[x.first] = false;
post(*io_, [x]() {
close_inflight_socket(x);
});
// We leave it up to the client_session destructor to remove the socket.
break;
default:
LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str());
}
}
}
}
}

// === graceful cancellation of in-flight sockets ===

void tcp_server::register_inflight_socket(const tcp_socket_p &sock) {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
inflight_.insert(sock);
inflight_ready_.insert({sock, false});
}

void tcp_server::unregister_inflight_socket(const tcp_socket_p &sock) {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
inflight_.erase(sock);
inflight_ready_[sock] = false;
inflight_ready_.erase(sock);
}

void tcp_server::close_inflight_sockets() {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
for (const auto &sock : inflight_)
post(*io_, [sock]() {
void tcp_server::close_inflight_socket(std::pair<tcp_socket_p, bool> x) {
try {
if (x.first->is_open()) {
try {
if (sock->is_open()) {
try {
// (in some cases shutdown may fail)
sock->shutdown(sock->shutdown_both);
} catch (...) {}
sock->close();
}
} catch (std::exception &e) {
LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what());
}
});
// (in some cases shutdown may fail)
x.first->shutdown(x.first->shutdown_both);
} catch (...) {}
x.first->close();
}
} catch (std::exception &e) {
LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what());
}
}

void tcp_server::close_inflight_sockets() {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
for (const auto &x : inflight_ready_) {
inflight_ready_[x.first] = false;
post(*io_, [x]() { close_inflight_socket(x); });
}
}

// === implementation of the client_session class ===

Expand Down Expand Up @@ -511,7 +543,11 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
feedbuf_.consume(n);
// register outstanding work at the server (will be unregistered at session destruction)
work_ = std::make_shared<work_p::element_type>(serv_->io_->get_executor());
// spawn a sample transfer thread
serv_->inflight_ready_[sock_] = true;
if (serv_->transfer_is_sync_)
LOG_F(WARNING, "Using synchronous blocking transfers for new client session.");
// spawn a sample transfer thread.
// TODO: only spawn thread in async, but then we need `this` to belong to something else.
std::thread(&client_session::transfer_samples_thread, this, shared_from_this()).detach();
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected error while handling the feedheader send outcome: %s", e.what());
Expand Down
Loading

0 comments on commit 6cf0600

Please sign in to comment.