From e24da3422c74d45fc2d0cc998cb067e21b742959 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Tue, 16 Nov 2021 12:20:33 +0100 Subject: [PATCH 01/13] Implement synchronous outlet for zero-copy writes. --- examples/ReceiveDataInChunks.cpp | 5 +-- examples/SendDataInChunks.cpp | 20 +++++++++--- include/lsl/common.h | 3 ++ src/stream_outlet_impl.cpp | 44 ++++++++++++++++++++++--- src/stream_outlet_impl.h | 5 +++ src/tcp_server.cpp | 56 ++++++++++++++++++++++++++++++-- src/tcp_server.h | 20 +++++++++++- 7 files changed, 138 insertions(+), 15 deletions(-) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 139f6cfd..9b58f98c 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -18,11 +18,12 @@ int main(int argc, char **argv) { try { std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; - double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.; + double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.; bool flush = argc > 3; // resolve the stream of interest & make an inlet + int32_t buf_samples = (int32_t)(max_buflen * 1000); lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0); - lsl::stream_inlet inlet(inlet_info, (int32_t)max_buffered); + lsl::stream_inlet inlet(inlet_info, max_buflen, transp_bufsize_thousandths); // Use set_postprocessing to get the timestamps in a common base clock. // Do not use if this application will record timestamps to disk -- it is better to diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index 5e83ce7d..03c1b8c7 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -92,15 +92,24 @@ struct fake_device { int main(int argc, char **argv) { std::cout << "SendDataInChunks" << std::endl; - std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl; + std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered " + "chunk_rate nodata use_sync" + << std::endl; std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl; std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl; - + std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into " + "the buffer." + << std::endl; + std::cout << "- use_sync -- Set to non-zero to use blocking send." << std::endl; + std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"}; int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device. int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device. double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.; int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. + bool nodata = argc > 7; + bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true; + int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk @@ -118,7 +127,9 @@ int main(int argc, char **argv) { chn.append_child_value("type", type); } int32_t buf_samples = (int32_t)(max_buffered * samplingrate); - lsl::stream_outlet outlet(info, chunk_samples, buf_samples); + auto flags = static_cast( + (do_sync ? transp_sync_blocking : transp_default) | transp_bufsize_samples); + lsl::stream_outlet outlet(info, chunk_samples, buf_samples, flags); info = outlet.info(); // Refresh info with whatever the outlet captured. std::cout << "Stream UID: " << info.uid() << std::endl; @@ -128,6 +139,7 @@ int main(int argc, char **argv) { // Prepare buffer to get data from 'device'. // The buffer should be larger than you think you need. Here we make it 4x as large. std::vector chunk_buffer(4 * chunk_samples * n_channels); + std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0); std::cout << "Now sending data..." << std::endl; @@ -141,7 +153,7 @@ int main(int argc, char **argv) { std::this_thread::sleep_until(next_chunk_time); // Get data from device - std::size_t returned_samples = my_device.get_data(chunk_buffer); + std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata); // send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches. // other push_chunk methods are easier but slightly slower. diff --git a/include/lsl/common.h b/include/lsl/common.h index 3c81c798..153df274 100644 --- a/include/lsl/common.h +++ b/include/lsl/common.h @@ -161,6 +161,9 @@ typedef enum { /// The supplied max_buf should be scaled by 0.001. transp_bufsize_thousandths = 2, + /// The outlet will use synchronous (blocking) calls to asio to push data + transp_sync_blocking = 4, + // prevent compilers from assuming an instance fits in a single byte _lsl_transport_options_maxval = 0x7f000000 } lsl_transport_options_t; diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index 299414d1..ceb9f9b7 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -22,8 +22,14 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu chunk_size_(info.calc_transport_buf_samples(requested_bufsize, flags)), info_(std::make_shared(info)), send_buffer_(std::make_shared(chunk_size_)), - io_ctx_data_(std::make_shared(1)), + do_sync_(flags & transp_sync_blocking), io_ctx_data_(std::make_shared(1)), io_ctx_service_(std::make_shared(1)) { + + if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) { + LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async."); + do_sync_ = false; + } + ensure_lsl_initialized(); const api_config *cfg = api_config::get_instance(); @@ -147,8 +153,22 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; sample_p smp( sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough)); - smp->assign_untyped(data); - send_buffer_->push_sample(smp); + if (!do_sync_) { + smp->assign_untyped(data); // Note: Makes a copy! + send_buffer_->push_sample(smp); + } else { + if (timestamp == DEDUCED_TIMESTAMP) { + sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); + } else { + sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); + sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); + } + sync_buffs_.push_back(asio::buffer(data, smp->datasize())); + if (pushthrough) { + tcp_server_->write_all_blocking(sync_buffs_); + sync_buffs_.clear(); + } + } } bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); } @@ -162,8 +182,22 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; sample_p smp( sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough)); - smp->assign_typed(data); - send_buffer_->push_sample(smp); + if (!do_sync_) { + smp->assign_typed(data); + send_buffer_->push_sample(smp); + } else { + if (timestamp == DEDUCED_TIMESTAMP) { + sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); + } else { + sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); + sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); + } + sync_buffs_.push_back(asio::buffer(data, smp->datasize())); + if (pushthrough) { + tcp_server_->write_all_blocking(sync_buffs_); + sync_buffs_.clear(); + } + } } template void stream_outlet_impl::enqueue(const char *data, double, bool); diff --git a/src/stream_outlet_impl.h b/src/stream_outlet_impl.h index 92488006..ec3d49c6 100644 --- a/src/stream_outlet_impl.h +++ b/src/stream_outlet_impl.h @@ -4,6 +4,7 @@ #include "common.h" #include "forward.h" #include "stream_info_impl.h" +#include #include #include #include @@ -319,6 +320,8 @@ class stream_outlet_impl { stream_info_impl_p info_; /// the single-producer, multiple-receiver send buffer send_buffer_p send_buffer_; + /// Flag to indicate that push_* operations should be blocking synchronous. false by default. + bool do_sync_; /// the IO service objects io_context_p io_ctx_data_, io_ctx_service_; @@ -331,6 +334,8 @@ class stream_outlet_impl { std::vector responders_; /// threads that handle the I/O operations (two per stack: one for UDP and one for TCP) std::vector io_threads_; + /// buffers used in synchronous call to gather-write data directly to the socket. + std::vector sync_buffs_; }; } // namespace lsl diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 90dc9531..4d449014 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -143,9 +143,9 @@ class client_session : public std::enable_shared_from_this { }; tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, - factory_p factory, int chunk_size, bool allow_v4, bool allow_v6) + factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync) : chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)), - factory_(std::move(factory)), send_buffer_(std::move(sendbuf)) { + factory_(std::move(factory)), send_buffer_(std::move(sendbuf)), transfer_is_sync_(do_sync) { // assign connection-dependent fields info_->session_id(api_config::get_instance()->session_id()); info_->reset_uid(); @@ -232,7 +232,47 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) { } -// === graceful cancellation of in-flight sockets === +// === synchronous transfer + +void tcp_server::write_all_blocking(std::vector bufs) { + int writes_outstanding = 0; + bool any_session_broken = false; + + for (auto &sock : sync_sockets_) { + asio::async_write(*sock, bufs, + [this, sock, &writes_outstanding]( + const asio::error_code &ec, size_t bytes_transferred) { + writes_outstanding--; + switch (ec.value()) { + case 0: break; // success + case asio::error::broken_pipe: + case asio::error::connection_reset: + LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket."); + { + asio::error_code close_ec; + sock->close(close_ec); + } + break; + default: + LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str()); + } + }); + writes_outstanding++; + } + try { + assert(sync_transfer_io_ctx_); + sync_transfer_io_ctx_->restart(); + while (writes_outstanding) sync_transfer_io_ctx_->run_one(); + if (any_session_broken) { + // remove sessions whose socket was closed + auto new_end_it = std::remove_if(sync_sockets_.begin(), sync_sockets_.end(), + [](const tcp_socket_p &sock) { + return !sock->is_open(); + }); + sync_sockets_.erase(new_end_it, sync_sockets_.end()); + } + } catch (std::exception &e) { LOG_F(ERROR, "Error during write_all_blocking: %s", e.what()); } +} void tcp_server::register_inflight_session(const std::shared_ptr &session) { std::lock_guard lock(inflight_mut_); @@ -535,6 +575,16 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) { // convenient for unit tests if (max_buffered_ <= 0) return; + if (serv->transfer_is_sync_) { + LOG_F(INFO, "Using synchronous blocking transfers for new client session."); + asio::post(*serv->sync_transfer_io_ctx_, + [serv, sock_p = std::make_shared(std::move(sock_))]() { + serv->sync_sockets_.emplace_back(std::move(sock_p)); + }); + serv->unregister_inflight_session(this); + return; + } + // determine transfer parameters auto queue = serv->send_buffer_->new_consumer(max_buffered_); diff --git a/src/tcp_server.h b/src/tcp_server.h index 37314a15..e973440e 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -47,9 +47,11 @@ class tcp_server : public std::enable_shared_from_this { * @param protocol The protocol (IPv4 or IPv6) that shall be serviced by this server. * @param chunk_size The preferred chunk size, in samples. If 0, the pushthrough flag determines * the effective chunking. + * @param do_sync Set true to indicate data transfer should happen synchronously in a blocking + * call. Default false -- asynchronous transfer in a thread (copies data). */ tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory, - int chunk_size, bool allow_v4, bool allow_v6); + int chunk_size, bool allow_v4, bool allow_v6, bool do_sync = false); /** * Begin serving TCP connections. @@ -67,6 +69,12 @@ class tcp_server : public std::enable_shared_from_this { */ void end_serving(); + /** + * Write directly to each socket. This should only be used when server initialized with + * do_async = false. + */ + void write_all_blocking(std::vector buffs); + private: friend class client_session; @@ -95,6 +103,16 @@ class tcp_server : public std::enable_shared_from_this { // acceptor socket tcp_acceptor_p acceptor_v4_, acceptor_v6_; // our server socket + + // sync mode fields + + // Flag to indicate that new client_sessions should use synchronous blocking data transfer. + bool transfer_is_sync_; + // sockets that should receive data in sync mode + std::vector sync_sockets_; + // io context for sync mode, app is responsible for running it + std::unique_ptr sync_transfer_io_ctx_; + // registry of in-flight asessions (for cancellation) std::map> inflight_; std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access From fd2a85fef5f56a56c3581363f87aad982ea01f5d Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Sat, 9 Oct 2021 11:07:58 -0400 Subject: [PATCH 02/13] gather-write directly to asio when using sync mode and pushing a sample of buffers. raw buffs use void**; incorporate other suggestions from tstenner. Currently not working properly! --- examples/ReceiveDataInChunks.cpp | 6 +- examples/SendData.cpp | 205 ++++++++++++++------------ examples/SendDataCBlocking.c | 239 +++++++++++++++++++++++++++++++ examples/SendDataInChunks.cpp | 25 +++- include/lsl/outlet.h | 19 ++- include/lsl_cpp.h | 15 ++ src/lsl_outlet_c.cpp | 70 ++++++++- src/stream_outlet_impl.cpp | 61 ++++---- src/stream_outlet_impl.h | 28 +++- src/tcp_server.cpp | 9 +- src/tcp_server.h | 4 +- 11 files changed, 543 insertions(+), 138 deletions(-) create mode 100644 examples/SendDataCBlocking.c diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 9b58f98c..1e28785d 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -18,12 +18,12 @@ int main(int argc, char **argv) { try { std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; - double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.; + double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.; bool flush = argc > 3; // resolve the stream of interest & make an inlet - int32_t buf_samples = (int32_t)(max_buflen * 1000); + int32_t buf_samples = (int32_t)(max_buffered * 1000); lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0); - lsl::stream_inlet inlet(inlet_info, max_buflen, transp_bufsize_thousandths); + lsl::stream_inlet inlet(inlet_info, buf_samples, transp_bufsize_thousandths); // Use set_postprocessing to get the timestamps in a common base clock. // Do not use if this application will record timestamps to disk -- it is better to diff --git a/examples/SendData.cpp b/examples/SendData.cpp index f81ce9c4..b0f1a350 100644 --- a/examples/SendData.cpp +++ b/examples/SendData.cpp @@ -1,89 +1,116 @@ -#include "lsl_cpp.h" -#include -#include -#include -#include -#include - -/** - * This example program offers an 8-channel stream, float-formatted, that resembles EEG data. - * The example demonstrates also how per-channel meta-data can be specified using the .desc() field - * of the stream information object. - * - * Note that the timer used in the send loop of this program is not particularly accurate. - */ - - -const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"}; - -int main(int argc, char *argv[]) { - std::string name, type; - if (argc < 3) { - std::cout - << "This opens a stream under some user-defined name and with a user-defined content " - "type." - << std::endl; - std::cout << "SendData Name Type [n_channels=8] [srate=100] [max_buffered=360]" - << std::endl; - std::cout - << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without " - "the quotes)):" - << std::endl; - std::cin >> name >> type; - } else { - name = argv[1]; - type = argv[2]; - } - int n_channels = argc > 3 ? std::stol(argv[3]) : 8; - n_channels = n_channels < 8 ? 8 : n_channels; - int samplingrate = argc > 4 ? std::stol(argv[4]) : 100; - int max_buffered = argc > 5 ? std::stol(argv[5]) : 360; - - try { - - // make a new stream_info (100 Hz) - lsl::stream_info info( - name, type, n_channels, samplingrate, lsl::cf_float32, std::string(name) += type); - - // add some description fields - info.desc().append_child_value("manufacturer", "LSL"); - lsl::xml_element chns = info.desc().append_child("channels"); - for (int k = 0; k < n_channels; k++) - chns.append_child("channel") - .append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k + 1)) - .append_child_value("unit", "microvolts") - .append_child_value("type", type); - - // make a new outlet - lsl::stream_outlet outlet(info, 0, max_buffered); - std::vector sample(n_channels, 0.0); - - // Your device might have its own timer. Or you can decide how often to poll - // your device, as we do here. - int32_t sample_dur_us = 1000000 / (samplingrate > 0 ? samplingrate : 100); - auto t_start = std::chrono::high_resolution_clock::now(); - auto next_sample_time = t_start; - - // send data forever - std::cout << "Now sending data... " << std::endl; - double starttime = ((double)clock()) / CLOCKS_PER_SEC; - for (unsigned t = 0;; t++) { - // Create random data for the first 8 channels. - for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5); - // For the remaining channels, fill them with a sample counter (wraps at 1M). - std::fill(sample.begin() + 8, sample.end(), (float)(t % 1000000)); - - // Wait until the next expected sample time. - next_sample_time += std::chrono::microseconds(sample_dur_us); - std::this_thread::sleep_until(next_sample_time); - - // send the sample - std::cout << sample[0] << "\t" << sample[n_channels-1] << std::endl; - outlet.push_sample(sample); - } - - } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } - std::cout << "Press any key to exit. " << std::endl; - std::cin.get(); - return 0; -} +#include "lsl_cpp.h" +#include +#include +#include +#include +#include + +/** + * This example program offers an 8-channel stream, float-formatted, that resembles EEG data. + * The example demonstrates also how per-channel meta-data can be specified using the .desc() field + * of the stream information object. + * + * Note that the timer used in the send loop of this program is not particularly accurate. + */ + + +const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"}; + +int main(int argc, char *argv[]) { + std::string name, type; + if (argc < 3) { + std::cout + << "This opens a stream under some user-defined name and with a user-defined content " + "type." + << std::endl; + std::cout << "SendData Name Type n_channels[8] srate[100] max_buffered[360] sync[false] " + "contig[true]" + << std::endl; + std::cout + << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without " + "the quotes)):" + << std::endl; + std::cin >> name >> type; + } else { + name = argv[1]; + type = argv[2]; + } + int n_channels = argc > 3 ? std::stol(argv[3]) : 8; + n_channels = n_channels < 8 ? 8 : n_channels; + int samplingrate = argc > 4 ? std::stol(argv[4]) : 100; + int max_buffered = argc > 5 ? std::stol(argv[5]) : 360; + bool sync = argc > 6 ? std::stol(argv[6]) > 0 : false; + bool contig = argc > 7 ? std::stol(argv[7]) > 0 : true; + + try { + // if (!sync && !contig) { + // throw std::invalid_argument( "async is incompatible with discontig + //push_numeric_bufs (except for strings, not used here)." ); + // } + + // make a new stream_info (100 Hz) + lsl::stream_info info( + name, type, n_channels, samplingrate, lsl::cf_float32, std::string(name) += type); + + // add some description fields + info.desc().append_child_value("manufacturer", "LSL"); + lsl::xml_element chns = info.desc().append_child("channels"); + for (int k = 0; k < n_channels; k++) + chns.append_child("channel") + .append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k + 1)) + .append_child_value("unit", "microvolts") + .append_child_value("type", type); + + // make a new outlet + lsl::stream_outlet outlet( + info, 0, max_buffered, sync ? transp_sync_blocking : transp_default); + + // Initialize 2 discontiguous data arrays. + std::vector sample(8, 0.0); + std::vector extra(n_channels - 8, 0.0); + // If this is contiguous mode (default) then we combine the arrays. + if (contig) sample.insert(sample.end(), extra.begin(), extra.end()); + + // bytes is used in !contig mode because we need to know how big each buffer is. + std::array bytes = { + 8 * sizeof(float), static_cast((n_channels - 8) * sizeof(float))}; + + // Your device might have its own timer. Or you can decide how often to poll + // your device, as we do here. + int32_t sample_dur_us = 1000000 / (samplingrate > 0 ? samplingrate : 100); + auto t_start = std::chrono::high_resolution_clock::now(); + auto next_sample_time = t_start; + + // send data forever + std::cout << "Now sending data... " << std::endl; + for (unsigned t = 0;; t++) { + + // Create random data for the first 8 channels. + for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5); + // For the remaining channels, fill them with a sample counter (wraps at 1M). + if (contig) + std::fill(sample.begin() + 8, sample.end(), (float)(t % 1000000)); + else + std::fill(extra.begin(), extra.end(), (float)(t % 1000000)); + + // Wait until the next expected sample time. + next_sample_time += std::chrono::microseconds(sample_dur_us); + std::this_thread::sleep_until(next_sample_time); + + // send the sample + if (contig) { + std::cout << sample[0] << "\t" << sample[n_channels-1] << std::endl; + outlet.push_sample(sample); + } else { + // Advanced: Push set of discontiguous buffers. + std::array bufs = {sample.data(), extra.data()}; + outlet.push_numeric_bufs( + (void **)bufs.data(), bytes.data(), 2, lsl::local_clock(), true); + } + } + + } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } + std::cout << "Press any key to exit. " << std::endl; + std::cin.get(); + return 0; +} diff --git a/examples/SendDataCBlocking.c b/examples/SendDataCBlocking.c new file mode 100644 index 00000000..355e9c02 --- /dev/null +++ b/examples/SendDataCBlocking.c @@ -0,0 +1,239 @@ +#include +#include +#include +#include +#include +#include +#include + +/** + * This example program pushes a 16-bit stream with a constant value across all channels. The + * channel number and rate are configurable via command line arguments. The data are not copied! + * This requires an inconvenient application design using threads and synchronization, but the + * this kind of design is necessary when frequent copying of large data chunks is too expensive. + * The main thread takes data from the device and puts it into a transmit buffer, and the secondary + * thread pushes the data from the transmit buffer into the lsl outlet then releases the data from + * the buffer. This is a relatively new and unusual design pattern for lsl. It uses a new BLOCKING + * function `push_chunk_wait`. Presently this only supports int16 types. This function passes + * the data pointer directly to asio to write data to each consumer and waits for asio to return. + * Note: One would usually prefer to use C++ niceities and queueing libraries (e.g., + * moodycamel::concurrentqueue) when working with big data across threads. However, the high- + * throughput data demands might be come from embedded devices with compiler limitations, + * so we use C here. Any input on how to speed up this C code is greatly appreciated. + */ + +#define DEVICE_BUFFER_SIZE 4194304 + +typedef struct { + char name[100]; + char serial[100]; + double last_timestamp; + int srate; + int nchans; + int write_idx; + int read_idx; + int min_frames_per_chunk; + int16_t channel_data[DEVICE_BUFFER_SIZE / 2]; // (channels * samples) < 2 million +} fake_device; + +typedef struct { + int thread_status; + int chunk_size; + double buffer_dur; + int do_async; +} thread_params; + +// Linked-list queue +typedef struct chunk_info { + int16_t *buf; + int n_frames; + double timestamp; +} chunk_info; +typedef struct node { + chunk_info info; + struct node *next; +} node; +typedef struct queue { + int count; + node *front; + node *rear; +} queue; +void initialize(queue *q) { + q->count = 0; + q->front = NULL; + q->rear = NULL; +} +int isempty(queue *q) { return (q->front == NULL); } +void enqueue(queue *q, int16_t *data, int frames, double ts) { + node *tmp; + tmp = malloc(sizeof(node)); + tmp->info.buf = data; + tmp->info.n_frames = frames; + tmp->info.timestamp = ts; + tmp->next = NULL; + if (isempty(q)) + q->front = q->rear = tmp; + else { + q->rear->next = tmp; + q->rear = tmp; + } + q->count++; +} +chunk_info dequeue(queue *q) { + chunk_info info = q->front->info; + node *tmp = q->front; + free(tmp); + q->front = q->front->next; + q->count--; + return (info); +} + +// Globals +fake_device *device = 0; +sem_t sem; +queue *q; + +// fetch_data -- Normally something provided by Device SDK +uint64_t fetch_data(int16_t **buffer) { + static int buf_samples = sizeof(device->channel_data) / sizeof(device->channel_data[0]); + + if (device->last_timestamp < 0) device->last_timestamp = lsl_local_clock(); + double now = lsl_local_clock(); + // Calculate how many frames/timestamps have elapsed since the last call. + uint64_t elapsed_frames = (uint64_t)((now - device->last_timestamp) * device->srate); + if (elapsed_frames < device->min_frames_per_chunk) elapsed_frames = 0; + // Cut this fetch short if it would go past the buffer. Next fetch will start at first idx. + if ((device->write_idx + elapsed_frames * device->nchans) > buf_samples) + elapsed_frames = (buf_samples - device->write_idx) / device->nchans; + // Further limit elapsed_samples to not overtake the read point (tail) + // if ((device->write_idx < device->read_idx) && + // (device->write_idx + (elapsed_frames * device->nchans) >= device->read_idx)) + // elapsed_frames = (device->read_idx - device->write_idx) / device->nchans; + if (elapsed_frames > 0) { + // New elapsed_time after accounting for rounding to integer frames. + device->last_timestamp += (double)(elapsed_frames) / device->srate; + // I assume that the device has its own acquisition buffer and that it copies data + // to a separate data buffer for API purposes. + // We are using a model where the device SDK shares its buffer with the client application. + // This is a bit unusual but allows for fastest throughput. + *buffer = &(device->channel_data[device->write_idx]); + + // And we advance the head for the next data transfer. + device->write_idx = (device->write_idx + elapsed_frames * device->nchans) % buf_samples; + if ((buf_samples - device->write_idx) < device->nchans) device->write_idx = 0; + } + return elapsed_frames; +} + +// transmit_thread -- responsible for popping data off the queue and pushing it to LSL +void transmit_thread(void *vargp) { + // Initialize thread-local variables + thread_params *params = (thread_params *)vargp; + + /* declare a new streaminfo */ + lsl_streaminfo info = lsl_create_streaminfo( + device->name, "TestLSL", device->nchans, device->srate, cft_int16, device->serial); + + /* add some meta-data fields to it */ + /* (for more standard fields, see https://github.com/sccn/xdf/wiki/Meta-Data) */ + lsl_xml_ptr desc = lsl_get_desc(info); + lsl_append_child_value(desc, "manufacturer", "LSL"); + lsl_xml_ptr chns = lsl_append_child(desc, "channels"); + char chanlabel[12]; + for (int c = 0; c < device->nchans; c++) { + lsl_xml_ptr chn = lsl_append_child(chns, "channel"); + snprintf(chanlabel, 20, "Chan-%d", c); + lsl_append_child_value(chn, "label", chanlabel); + lsl_append_child_value(chn, "unit", "microvolts"); + lsl_append_child_value(chn, "type", "EEG"); + } + + /* make a new outlet */ + lsl_outlet outlet = + lsl_create_outlet_d(info, params->chunk_size, params->buffer_dur, params->do_async); + + printf("Now sending data...\n"); + params->thread_status = 1; + int buf_samples = sizeof(device->channel_data) / sizeof(device->channel_data[0]); + while (params->thread_status) { + sem_wait(&sem); + if (!isempty(q)) { + chunk_info chunk = dequeue(q); + int64_t chunk_samples = chunk.n_frames * device->nchans; + lsl_push_chunk_stp(outlet, chunk.buf, chunk_samples, chunk.timestamp, 1); + device->read_idx = (device->read_idx + chunk_samples) % buf_samples; + } + } + lsl_destroy_outlet(outlet); +} + +int main(int argc, char *argv[]) { + printf("SendDataCBlocking example program. Sends int16 data with minimal copies.\n"); + printf("Usage: %s [streamname] [streamuid] [srate] [nchans] [buff_dur] [do_async]\n", argv[0]); + printf("Using lsl %d, lsl_library_info: %s\n", lsl_library_version(), lsl_library_info()); + const char *name = argc > 1 ? argv[1] : "SendDataCBlocking"; + const char *uid = argc > 2 ? argv[2] : "6s45ahas321"; + int srate = argc > 3 ? strtol(argv[3], NULL, 10) : 512; + int n_chans = argc > 4 ? strtol(argv[4], NULL, 10) : 32; + double buff_dur = argc > 5 ? strtod(argv[5], NULL) : 60.; + int do_async = argc > 6 ? strtol(argv[6], NULL, 10) : 1; + int32_t samps_per_chunk = argc > 6 ? strtol(argv[6], NULL, 10) : 30; + + // Initialize our fake device and set its parameters. This would normally be taken care of + // by the device SDK. + device = (fake_device *)malloc(sizeof(fake_device)); + memset(device, 0, sizeof(fake_device)); + strcpy(device->name, name); + device->srate = srate; + device->nchans = n_chans; + device->last_timestamp = -1.; + device->min_frames_per_chunk = device->srate / 1000; + strcpy(device->serial, uid); + // Give the device buffer data some non-zero value. + memset(device->channel_data, 23, sizeof(device->channel_data)); + // write_idx and read_idx are OK at 0. + + thread_params params; + params.buffer_dur = buff_dur; + params.chunk_size = samps_per_chunk; + params.thread_status = 0; + params.do_async = do_async; + + // Initialize q + q = malloc(sizeof(queue)); + initialize(q); + + sem_init(&sem, 0, 0); + pthread_t thread_id; + if (pthread_create(&thread_id, NULL, (void *)&transmit_thread, ¶ms)) { + fprintf(stderr, "Error creating LSL transmit thread.\n"); + return 1; + } + + int exit_condition = 0; + int16_t *shared_buff; + double last_timestamp_received = -1.; + uint64_t n_frames_received = 0; + while (1) { + if (exit_condition) break; + + // Get data from device + n_frames_received = fetch_data(&shared_buff); + if (n_frames_received > 0) { + enqueue(q, shared_buff, n_frames_received, device->last_timestamp); + sem_post(&sem); + } + } + + if (params.thread_status) // Kill thread + { + params.thread_status = 0; + if (pthread_join(thread_id, NULL)) { + fprintf(stderr, "Error terminating LSL transmit thread.\n"); + } + } + sem_destroy(&sem); + free(device); + + return 0; +} diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index 03c1b8c7..5155138e 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -109,6 +109,7 @@ int main(int argc, char **argv) { int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. bool nodata = argc > 7; bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true; + bool b_contig = true && do_sync; // Set true to test gather-write operations. int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk @@ -134,12 +135,13 @@ int main(int argc, char **argv) { std::cout << "Stream UID: " << info.uid() << std::endl; // Create a connection to our device. - fake_device my_device(n_channels, (float)samplingrate); + int dev_chans = b_contig ? n_channels : n_channels + 1; + fake_device my_device(dev_chans, (float)samplingrate); // Prepare buffer to get data from 'device'. // The buffer should be larger than you think you need. Here we make it 4x as large. - std::vector chunk_buffer(4 * chunk_samples * n_channels); - std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0); + std::vector dev_buffer(4 * chunk_samples * dev_chans); + std::fill(dev_buffer.begin(), dev_buffer.end(), 0); std::cout << "Now sending data..." << std::endl; @@ -153,13 +155,24 @@ int main(int argc, char **argv) { std::this_thread::sleep_until(next_chunk_time); // Get data from device - std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata); + std::size_t returned_samples = my_device.get_data(dev_buffer, nodata); // send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches. // other push_chunk methods are easier but slightly slower. double ts = lsl::local_clock(); - outlet.push_chunk_multiplexed( - chunk_buffer.data(), returned_samples * n_channels, ts, true); + if (b_contig) { + // Push a chunk of a contiguous buffer. + outlet.push_chunk_multiplexed( + dev_buffer.data(), returned_samples * n_channels, ts, true); + } else { + std::cout << "Discontiguous push_chunk not yet supported." << std::endl; + std::cout << "See SendData.cpp for discontiguous push_sample, then set " + << std::endl; + std::cout << "timestamps as LSL_DEDUCED_TIMESTAMP and pushtrough as false " + << std::endl; + std::cout << "for all samples except the the first or last in a chunk." + << std::endl; + } } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } std::cout << "Press any key to exit. " << std::endl; diff --git a/include/lsl/outlet.h b/include/lsl/outlet.h index 87c32538..b79ebbdd 100644 --- a/include/lsl/outlet.h +++ b/include/lsl/outlet.h @@ -99,9 +99,11 @@ extern LIBLSL_C_API int32_t lsl_push_sample_vtp(lsl_outlet out, const void *data /** @copybrief lsl_push_sample_ftp * @see lsl_push_sample_ftp * @param out The lsl_outlet object through which to push the data. - * @param data A pointer to values to push. The number of values pointed to must be no less than the - * number of channels in the sample. - * @param lengths A pointer the number of elements to push for each channel (string lengths). + * @param data An array of data buffers to push. The number of buffers in the array must be no less + * than the number of channels in the sample. Each entry in data must be longer than the + * corresponding entry in `lengths`. + * @param lengths An array containing the lengths of each buffer in data. Units are string lengths + * or number of bytes. */ extern LIBLSL_C_API int32_t lsl_push_sample_buf(lsl_outlet out, const char **data, const uint32_t *lengths); /** @copydoc lsl_push_sample_buf @@ -111,6 +113,17 @@ extern LIBLSL_C_API int32_t lsl_push_sample_buft(lsl_outlet out, const char **da * @param pushthrough @see lsl_push_sample_ftp */ extern LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough); +/** @copydoc lsl_push_sample_buftp + * @param data An array of data buffers to push. The number of buffers in the array must be no less + * than `nbufs`. + * @param bytes An array comprising the number of bytes in each buffer. The number of entries in + * bytes must be no less than `nbufs`. + * @param nbufs The number of values pointed to in `data` and equivalently the number of items in + * `bytes`. + */ +extern LIBLSL_C_API int32_t lsl_push_sample_rawtpn(lsl_outlet out, void **data, + const uint32_t *bytes, double timestamp, int32_t pushthrough, uint32_t nbufs); + /** Push a chunk of multiplexed samples into the outlet. One timestamp per sample is provided. * * @attention Note that the provided buffer size is measured in channel values (e.g. floats) rather diff --git a/include/lsl_cpp.h b/include/lsl_cpp.h index 72d58819..0db1633b 100644 --- a/include/lsl_cpp.h +++ b/include/lsl_cpp.h @@ -513,6 +513,21 @@ class stream_outlet { lsl_push_sample_vtp(obj.get(), (sample), timestamp, pushthrough); } + /** + * Push a pointer to an array of buffers of variable size as one sample into the outlet. + * + * @param bufs A pointer to an array of data buffers. + * @param bytes An array of sizes (number of bytes) of buffers in bufs. + * @param nbufs Total number of buffers. + * @param timestamp Optionally the capture time of the sample, in agreement with local_clock(); + * @param pushthrough Whether to push the sample through to the receivers immediately instead of + * concatenating with subsequent samples. + */ + void push_numeric_bufs(void **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0, + bool pushthrough = true) { + lsl_push_sample_rawtpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs); + } + // =================================================== // === Pushing an chunk of samples into the outlet === diff --git a/src/lsl_outlet_c.cpp b/src/lsl_outlet_c.cpp index a8472407..411b1759 100644 --- a/src/lsl_outlet_c.cpp +++ b/src/lsl_outlet_c.cpp @@ -160,14 +160,74 @@ LIBLSL_C_API int32_t lsl_push_sample_buft( LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough) { + stream_outlet_impl *outimpl = out; + // As the number of bytes-per-buffer is the same as the number of chars-per-buffer, + // we can pass `lengths` through as `bytes`. + return lsl_push_sample_rawtpn(out, (void **)data, lengths, timestamp, pushthrough, + (uint32_t)outimpl->info().channel_count()); +} + +LIBLSL_C_API int32_t lsl_push_sample_rawtpn(lsl_outlet out, void **data, const uint32_t *bytes, + double timestamp, int32_t pushthrough, uint32_t nbufs) { + stream_outlet_impl *outimpl = out; try { - stream_outlet_impl *outimpl = out; - std::vector tmp; - for (uint32_t k = 0; k < (uint32_t)outimpl->info().channel_count(); k++) - tmp.emplace_back(data[k], lengths[k]); - return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + if (outimpl->is_sync_blocking()) { + // Convert input to a vector of asio buffers for a gather-write operation. + std::vector bufs; + bufs.reserve(nbufs); + for (auto buf_ix = 0; buf_ix < nbufs; buf_ix++) { + bufs.push_back(asio::buffer(data[buf_ix], bytes[buf_ix])); + } + return outimpl->push_sample_gather(bufs, timestamp, pushthrough); + } else { + // Make contiguous. + if (outimpl->info().channel_format() == cft_string) { + // For strings we place in std::string vector to make sure they are properly + // terminated. + std::vector tmp; + for (uint32_t k = 0; k < nbufs; k++) + tmp.emplace_back((const char *)data[k], bytes[k]); + return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + } else { + // Otherwise we put into new memory block. + uint32_t total_bytes = 0, byte_offset = 0; + for (size_t k = 0; k < nbufs; k++) { total_bytes += bytes[k]; } + char *tmp = (char *)malloc(total_bytes); + for (size_t k = 0; k < nbufs; k++) { + memcpy(&tmp[byte_offset], data[k], bytes[k]); + byte_offset += bytes[k]; + } + // TODO: I tried passing void buffer but eventually fail because the convert + // functions + // become ambiguous. + lsl_error_code_t ec; + switch (outimpl->info().channel_format()) { + case cft_int8: + ec = outimpl->push_sample_noexcept((const char *)tmp, timestamp, pushthrough); + case cft_int16: + ec = + outimpl->push_sample_noexcept((const int16_t *)tmp, timestamp, pushthrough); + case cft_int32: + ec = + outimpl->push_sample_noexcept((const int32_t *)tmp, timestamp, pushthrough); + case cft_int64: + ec = + outimpl->push_sample_noexcept((const int64_t *)tmp, timestamp, pushthrough); + case cft_float32: + ec = outimpl->push_sample_noexcept((const float *)tmp, timestamp, pushthrough); + case cft_double64: + ec = outimpl->push_sample_noexcept((const double *)tmp, timestamp, pushthrough); + case cft_undefined: ec = lsl_internal_error; + } + free(tmp); + return ec; + } + } } catch (std::exception &e) { LOG_F(WARNING, "Unexpected error during push_sample: %s", e.what()); + if (!outimpl->is_sync_blocking() && outimpl->info().channel_format() != cft_string) + LOG_F(ERROR, "lsl_push_sample_buftpn only compatible with string type or when outlet " + "is using sync writes."); return lsl_internal_error; } } diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index ceb9f9b7..d63d4282 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -25,10 +25,8 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu do_sync_(flags & transp_sync_blocking), io_ctx_data_(std::make_shared(1)), io_ctx_service_(std::make_shared(1)) { - if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) { - LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async."); - do_sync_ = false; - } + if ((info.channel_format() == cft_string) && do_sync_) + throw std::invalid_argument("Synchronous push not supported for string-formatted streams."); ensure_lsl_initialized(); const api_config *cfg = api_config::get_instance(); @@ -47,7 +45,7 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu // create TCP data server tcp_server_ = std::make_shared(info_, io_ctx_data_, send_buffer_, sample_factory_, - chunk_size_, cfg->allow_ipv4(), cfg->allow_ipv6()); + chunk_size_, cfg->allow_ipv4(), cfg->allow_ipv6(), do_sync_); // fail if both stacks failed to instantiate if (udp_servers_.empty()) @@ -157,17 +155,7 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo smp->assign_untyped(data); // Note: Makes a copy! send_buffer_->push_sample(smp); } else { - if (timestamp == DEDUCED_TIMESTAMP) { - sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); - } else { - sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); - sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); - } - sync_buffs_.push_back(asio::buffer(data, smp->datasize())); - if (pushthrough) { - tcp_server_->write_all_blocking(sync_buffs_); - sync_buffs_.clear(); - } + enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough); } } @@ -177,6 +165,28 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) { return send_buffer_->wait_for_consumers(timeout); } +void stream_outlet_impl::push_timestamp_sync(const double ×tamp) { + if (timestamp == DEDUCED_TIMESTAMP) { + sync_buffs_.emplace_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); + } else { + sync_buffs_.emplace_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); + sync_buffs_.emplace_back(asio::buffer(×tamp, sizeof(timestamp))); + } +} + +void stream_outlet_impl::pushthrough_sync() { + // LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size()); + tcp_server_->write_all_blocking(sync_buffs_); + sync_buffs_.clear(); +} + +void stream_outlet_impl::enqueue_sync( + asio::const_buffer buff, const double ×tamp, bool pushthrough) { + push_timestamp_sync(timestamp); + sync_buffs_.push_back(buff); + if (pushthrough) pushthrough_sync(); +} + template void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrough) { if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; @@ -186,17 +196,7 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou smp->assign_typed(data); send_buffer_->push_sample(smp); } else { - if (timestamp == DEDUCED_TIMESTAMP) { - sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); - } else { - sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); - sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); - } - sync_buffs_.push_back(asio::buffer(data, smp->datasize())); - if (pushthrough) { - tcp_server_->write_all_blocking(sync_buffs_); - sync_buffs_.clear(); - } + enqueue_sync(asio::buffer(data, smp->datasize()), smp->timestamp, smp->pushthrough); } } @@ -208,4 +208,11 @@ template void stream_outlet_impl::enqueue(const float *data, double, bool template void stream_outlet_impl::enqueue(const double *data, double, bool); template void stream_outlet_impl::enqueue(const std::string *data, double, bool); +void stream_outlet_impl::enqueue_sync_multi( + std::vector buffs, double timestamp, bool pushthrough) { + push_timestamp_sync(timestamp); + sync_buffs_.insert(sync_buffs_.end(), buffs.begin(), buffs.end()); + if (pushthrough) pushthrough_sync(); +} + } // namespace lsl diff --git a/src/stream_outlet_impl.h b/src/stream_outlet_impl.h index ec3d49c6..19e16eb1 100644 --- a/src/stream_outlet_impl.h +++ b/src/stream_outlet_impl.h @@ -137,7 +137,11 @@ class stream_outlet_impl { void push_sample(const std::string *data, double timestamp = 0.0, bool pushthrough = true) { enqueue(data, timestamp, pushthrough); } - + lsl_error_code_t push_sample_gather( + std::vector buffs, double timestamp = 0.0, bool pushthrough = true) { + enqueue_sync_multi(buffs, timestamp, pushthrough); + return lsl_no_error; + } template inline lsl_error_code_t push_sample_noexcept( @@ -295,6 +299,9 @@ class stream_outlet_impl { /// Wait until some consumer shows up. bool wait_for_consumers(double timeout = FOREVER); + /// If the outlet is intended to use synchronous blocking transfers + bool is_sync_blocking() { return do_sync_; }; + private: /// Instantiate a new server stack. void instantiate_stack(udp udp_protocol); @@ -302,6 +309,25 @@ class stream_outlet_impl { /// Allocate and enqueue a new sample into the send buffer. template void enqueue(const T *data, double timestamp, bool pushthrough); + /// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single + /// timestamp. + void push_timestamp_sync(const double ×tamp); + + /// push sync_buffs_ through each tcp server. + void pushthrough_sync(); + + /// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the + /// server. + void enqueue_sync(asio::const_buffer buff, const double ×tamp, bool pushthrough); + + /** + * Append a single timestamp and multiple within-sample buffers to sync_buffs_. + * This is useful when a sample is discontiguous in memory. It makes no assumptions about how + * many channels are included in each buffer. + */ + void enqueue_sync_multi( + std::vector buffs, double timestamp, bool pushthrough); + /** * Check whether some given number of channels matches the stream's channel_count. * Throws an error if not. diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 4d449014..70b5d655 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -65,7 +65,9 @@ class client_session : public std::enable_shared_from_this { public: /// Instantiate a new session & its socket. client_session(const tcp_server_p &serv, tcp_socket &&sock) - : io_(serv->io_), serv_(serv), sock_(std::move(sock)), requeststream_(&requestbuf_) {} + : io_(serv->io_), serv_(serv), sock_(std::move(sock)), requeststream_(&requestbuf_) { + LOG_F(1, "Initialized client session %p", this); + } /// Destructor. ~client_session(); @@ -146,6 +148,7 @@ tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p s factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync) : chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)), factory_(std::move(factory)), send_buffer_(std::move(sendbuf)), transfer_is_sync_(do_sync) { + if (transfer_is_sync_) sync_transfer_io_ctx_ = std::make_unique(1); // assign connection-dependent fields info_->session_id(api_config::get_instance()->session_id()); info_->reset_uid(); @@ -210,7 +213,9 @@ void tcp_server::end_serving() { void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) { try { // Select the IO context for handling the socket - auto &sock_io_ctx = *io_; + // for `transfer_is_sync`, IO is done in the thread calling `push_sample`, + // otherwise in the outlet's IO thread / IO context + auto &sock_io_ctx = transfer_is_sync_ ? *sync_transfer_io_ctx_ : *io_; // accept a connection on the session's socket acceptor->async_accept(sock_io_ctx, [shared_this = shared_from_this(), &acceptor]( diff --git a/src/tcp_server.h b/src/tcp_server.h index e973440e..3af5a0c2 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -71,9 +71,9 @@ class tcp_server : public std::enable_shared_from_this { /** * Write directly to each socket. This should only be used when server initialized with - * do_async = false. + * do_sync = true. */ - void write_all_blocking(std::vector buffs); + void write_all_blocking(std::vector bufs); private: friend class client_session; From 2e8221aed7099f6f38ce02ed2833e468ea4b77f4 Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Tue, 16 Nov 2021 15:24:25 +0100 Subject: [PATCH 03/13] Add some testing programs --- CMakeLists.txt | 6 +++++ testing/flood.c | 61 +++++++++++++++++++++++++++++++++++++++++++++++ testing/spike.cpp | 50 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+) create mode 100644 testing/flood.c create mode 100644 testing/spike.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1db6a5e1..95ec5d0a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -345,6 +345,12 @@ if(NOT WIN32 AND LSL_TOOLS) target_link_libraries(blackhole PRIVATE Threads::Threads) target_include_directories(blackhole PRIVATE "thirdparty/asio/") installLSLApp(blackhole) + add_executable(spike testing/spike.cpp) + target_link_libraries(spike PRIVATE lsl) + installLSLApp(spike) + add_executable(flood testing/flood.c) + target_link_libraries(flood PRIVATE lsl) + installLSLApp(flood) endif() set(LSL_INSTALL_ROOT ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/testing/flood.c b/testing/flood.c new file mode 100644 index 00000000..1eb175d4 --- /dev/null +++ b/testing/flood.c @@ -0,0 +1,61 @@ +#include +#include +#include +#include +#include +#include + +static double start_t; +static int64_t samples_pushed = 0; + +void handle_signal(int signal) { + double time = lsl_local_clock() - start_t; + printf("%ld samples pushed in %fs, %d samples/s\n", samples_pushed, time, + (int)(samples_pushed / time)); + if (signal == SIGTERM || signal == SIGINT) exit(0); + start_t = lsl_local_clock(); + samples_pushed = 0; +} + +int main(int argc, char *argv[]) { + signal(SIGTERM, handle_signal); + signal(SIGINT, handle_signal); +#ifdef SIGUSR1 + signal(SIGUSR1, handle_signal); +#endif +#ifdef SIGCONT + signal(SIGCONT, handle_signal); +#endif + printf("LSL inlet stress tester. Sends [nchan] uint16 channels as fast as possible.\n"); + printf("Usage: %s [streamname] [nchan=56] [chunksize=500]\n", argv[0]); + printf("Using lsl %d, lsl_library_info: %s\n\n", lsl_library_version(), lsl_library_info()); + + const char *name = argc > 1 ? argv[1] : "Flood"; + const int nchan = argc > 2 ? strtol(argv[2], NULL, 10) : 56; + const int samples_per_chunk = argc > 3 ? strtol(argv[3], NULL, 10) : 500; + const char uid[] = "325wqer4354"; + + /* declare a new streaminfo (name: SendDataC / argument 1, content type: EEG, 8 channels, 500 + * Hz, float values, some made-up device id (can also be empty) */ + lsl_streaminfo info = lsl_create_streaminfo(name, "Bench", nchan, 50000, cft_int16, uid); + + /* make a new outlet (chunking: default, buffering: 360 seconds) */ + lsl_outlet outlet = lsl_create_outlet_ex(info, 0, 360, transp_sync_blocking); + char *infoxml = lsl_get_xml(lsl_get_info(outlet)); + printf("Streaminfo: %s\n", infoxml); + lsl_destroy_string(infoxml); + + const int buf_elements = nchan * samples_per_chunk; + int16_t *buf = malloc(buf_elements * 2); + memset(buf, 0xab, buf_elements * sizeof(buf[0])); + + printf("Now sending data...\n"); + start_t = lsl_local_clock(); + + for (int t = 0;; t++) { + lsl_push_chunk_s(outlet, buf, buf_elements); + samples_pushed += samples_per_chunk; + } + lsl_destroy_outlet(outlet); + return 0; +} diff --git a/testing/spike.cpp b/testing/spike.cpp new file mode 100644 index 00000000..9d78b4db --- /dev/null +++ b/testing/spike.cpp @@ -0,0 +1,50 @@ +#include "lsl_cpp.h" +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"}; + +int main(int argc, char *argv[]) { + int n_channels = argc > 1 ? std::stoi(argv[1]) : 200; + int samplingrate = argc > 2 ? std::stoi(argv[2]) : 50000; + int max_buffered = argc > 3 ? std::stoi(argv[3]) : 360; + bool sync = argc > 4 ? std::stoi(argv[4]) > 0 : true; + + try { + lsl::stream_info info("Spike", "bench", n_channels, samplingrate, lsl::cf_int16, "Spike"); + lsl::stream_outlet outlet( + info, 0, max_buffered, sync ? transp_sync_blocking : transp_default); + + std::vector chunk(n_channels * samplingrate / 1000, 5); + + const auto t_start = std::chrono::high_resolution_clock::now(); + auto next_sample_time = t_start; + const auto time_per_chunk = std::chrono::microseconds(8 / samplingrate); + + // send data forever + std::cout << "Now sending data... " << std::endl; + for (unsigned t = 0;; t++) { + if (t % 3 == 0) { + for (int s = 0; s < 5; ++s) + outlet.push_chunk_multiplexed(chunk); + // Wait until the next expected sample time. + } else { + for (int s = 0; s < 10; ++s) { + outlet.push_sample(chunk.data()); + std::this_thread::sleep_for(100us); + } + } + next_sample_time += time_per_chunk; + std::this_thread::sleep_until(next_sample_time); + } + + } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } + std::cout << "Press any key to exit. " << std::endl; + std::cin.get(); + return 0; +} From 147874af93b8fc14af1a9735e999e7999a682a5b Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Tue, 16 Nov 2021 21:20:12 +0100 Subject: [PATCH 04/13] Name service threads properly --- src/stream_outlet_impl.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index d63d4282..5a0464b8 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -6,6 +6,7 @@ #include "tcp_server.h" #include "udp_server.h" #include +#include #include #include @@ -57,10 +58,8 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu for (auto &responder : responders_) responder->begin_serving(); // and start the IO threads to handle them - const std::string name{"IO_" + this->info().name().substr(0, 11)}; for (const auto &io : {io_ctx_data_, io_ctx_service_}) - io_threads_.emplace_back(std::make_shared([io, name]() { - loguru::set_thread_name(name.c_str()); + io_threads_.emplace_back(std::make_shared([io]() { while (true) { try { io->run(); @@ -70,6 +69,10 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu } } })); + + const std::string name{this->info().name().substr(0, 11)}; + asio::post(*io_ctx_data_, [name]() { loguru::set_thread_name(("IO_" + name).c_str()); }); + asio::post(*io_ctx_service_, [name]() { loguru::set_thread_name(("SVC_" + name).c_str()); }); } void stream_outlet_impl::instantiate_stack(udp udp_protocol) { From fe05ca4411f6386008c53cf5961b0ca4b5a64d2a Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Tue, 16 Nov 2021 21:26:05 +0100 Subject: [PATCH 05/13] Run handshake in general IO thread, move socket to sync io ctx when done --- src/tcp_server.cpp | 48 ++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 70b5d655..78b15832 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -212,14 +212,9 @@ void tcp_server::end_serving() { void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) { try { - // Select the IO context for handling the socket - // for `transfer_is_sync`, IO is done in the thread calling `push_sample`, - // otherwise in the outlet's IO thread / IO context - auto &sock_io_ctx = transfer_is_sync_ ? *sync_transfer_io_ctx_ : *io_; - - // accept a connection on the session's socket - acceptor->async_accept(sock_io_ctx, [shared_this = shared_from_this(), &acceptor]( - err_t err, tcp_socket sock) { + // accept a new connection + acceptor->async_accept(*io_, [shared_this = shared_from_this(), &acceptor]( + err_t err, tcp_socket sock) { if (err == asio::error::operation_aborted || err == asio::error::shut_down) return; // no error: create a new session and start processing @@ -240,34 +235,36 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) { // === synchronous transfer void tcp_server::write_all_blocking(std::vector bufs) { - int writes_outstanding = 0; bool any_session_broken = false; for (auto &sock : sync_sockets_) { asio::async_write(*sock, bufs, - [this, sock, &writes_outstanding]( + [this, &sock, &any_session_broken]( const asio::error_code &ec, size_t bytes_transferred) { - writes_outstanding--; switch (ec.value()) { case 0: break; // success case asio::error::broken_pipe: case asio::error::connection_reset: LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket."); - { + any_session_broken = true; + asio::post(*sync_transfer_io_ctx_, [sock]() { asio::error_code close_ec; sock->close(close_ec); - } + }); + break; + case asio::error::operation_aborted: + LOG_F(INFO, "Socket wasn't fast enough"); break; default: - LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str()); + LOG_F(ERROR, "Unhandled write_all_blocking error: %s.", ec.message().c_str()); } }); - writes_outstanding++; } try { - assert(sync_transfer_io_ctx_); + // prepare the io context for new work sync_transfer_io_ctx_->restart(); - while (writes_outstanding) sync_transfer_io_ctx_->run_one(); + sync_transfer_io_ctx_->run(); + if (any_session_broken) { // remove sessions whose socket was closed auto new_end_it = std::remove_if(sync_sockets_.begin(), sync_sockets_.end(), @@ -582,10 +579,19 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) { if (serv->transfer_is_sync_) { LOG_F(INFO, "Using synchronous blocking transfers for new client session."); - asio::post(*serv->sync_transfer_io_ctx_, - [serv, sock_p = std::make_shared(std::move(sock_))]() { - serv->sync_sockets_.emplace_back(std::move(sock_p)); - }); + auto &sock_io_ctx = *serv->sync_transfer_io_ctx_; + + // move the socket into the sync_transfer_io_ctx by releasing it from this + // io ctx and re-creating it with sync_transfer_io_ctx. + // See https://stackoverflow.com/q/52671836/73299 + // Then schedule the sync_transfer_io_ctx to add it to the list of sync sockets + auto protocol = sock_.local_endpoint().protocol(); + auto new_sock = std::make_shared(sock_io_ctx, protocol, sock_.release()); + + asio::post(sock_io_ctx, [serv, sock_p = std::move(new_sock)]() { + LOG_F(1, "Moved socket to new io_ctx"); + serv->sync_sockets_.emplace_back(std::move(sock_p)); + }); serv->unregister_inflight_session(this); return; } From ca5a4dc9b234c6ac831d954309ef36a7ec983d01 Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Wed, 17 Nov 2021 13:11:22 +0100 Subject: [PATCH 06/13] Refactor sync tcp server; add sync_transfer_handler --- src/tcp_server.cpp | 55 ++++++++++++++++++++++++++++++++-------------- src/tcp_server.h | 14 +++++------- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 78b15832..1b06cb50 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -144,11 +144,31 @@ class client_session : public std::enable_shared_from_this { std::condition_variable completion_cond_; }; +class sync_transfer_handler { + bool transfer_is_sync_; + // sockets that should receive data in sync mode + std::vector sync_sockets_; + // io context for sync mode, app is responsible for running it + asio::io_context io_ctx_; +public: + sync_transfer_handler(): io_ctx_(1) { + + } + + /// schedules a native socket handle to be added the next time a push operation is done + void add_socket(const tcp_socket::native_handle_type handle, tcp_socket::protocol_type protocol) { + asio::post(io_ctx_, [=](){ + sync_sockets_.push_back(std::make_unique(io_ctx_, protocol, handle)); + }); + } + void write_all_blocking(const std::vector &bufs); +}; + tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync) : chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)), - factory_(std::move(factory)), send_buffer_(std::move(sendbuf)), transfer_is_sync_(do_sync) { - if (transfer_is_sync_) sync_transfer_io_ctx_ = std::make_unique(1); + factory_(std::move(factory)), send_buffer_(std::move(sendbuf)) { + if (do_sync) sync_handler = std::make_unique(); // assign connection-dependent fields info_->session_id(api_config::get_instance()->session_id()); info_->reset_uid(); @@ -181,6 +201,11 @@ tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p s throw std::runtime_error("Failed to instantiate socket acceptors for the TCP server"); } +tcp_server::~tcp_server() noexcept +{ + // defined here so the compiler can generate the destructor for the sync_handler +} + // === externally issued asynchronous commands === @@ -234,7 +259,12 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) { // === synchronous transfer -void tcp_server::write_all_blocking(std::vector bufs) { +void tcp_server::write_all_blocking(const std::vector &bufs) +{ + sync_handler->write_all_blocking(bufs); +} + +void sync_transfer_handler::write_all_blocking(const std::vector &bufs) { bool any_session_broken = false; for (auto &sock : sync_sockets_) { @@ -247,7 +277,7 @@ void tcp_server::write_all_blocking(std::vector bufs) { case asio::error::connection_reset: LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket."); any_session_broken = true; - asio::post(*sync_transfer_io_ctx_, [sock]() { + asio::post(io_ctx_, [sock]() { asio::error_code close_ec; sock->close(close_ec); }); @@ -262,8 +292,8 @@ void tcp_server::write_all_blocking(std::vector bufs) { } try { // prepare the io context for new work - sync_transfer_io_ctx_->restart(); - sync_transfer_io_ctx_->run(); + io_ctx_.restart(); + io_ctx_.run(); if (any_session_broken) { // remove sessions whose socket was closed @@ -577,21 +607,14 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) { // convenient for unit tests if (max_buffered_ <= 0) return; - if (serv->transfer_is_sync_) { + if (serv->sync_handler) { LOG_F(INFO, "Using synchronous blocking transfers for new client session."); - auto &sock_io_ctx = *serv->sync_transfer_io_ctx_; - + auto protocol = sock_.local_endpoint().protocol(); // move the socket into the sync_transfer_io_ctx by releasing it from this // io ctx and re-creating it with sync_transfer_io_ctx. // See https://stackoverflow.com/q/52671836/73299 // Then schedule the sync_transfer_io_ctx to add it to the list of sync sockets - auto protocol = sock_.local_endpoint().protocol(); - auto new_sock = std::make_shared(sock_io_ctx, protocol, sock_.release()); - - asio::post(sock_io_ctx, [serv, sock_p = std::move(new_sock)]() { - LOG_F(1, "Moved socket to new io_ctx"); - serv->sync_sockets_.emplace_back(std::move(sock_p)); - }); + serv->sync_handler->add_socket(sock_.release(), protocol); serv->unregister_inflight_session(this); return; } diff --git a/src/tcp_server.h b/src/tcp_server.h index 3af5a0c2..94795cbf 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -53,6 +53,8 @@ class tcp_server : public std::enable_shared_from_this { tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync = false); + ~tcp_server() noexcept; + /** * Begin serving TCP connections. * @@ -73,7 +75,7 @@ class tcp_server : public std::enable_shared_from_this { * Write directly to each socket. This should only be used when server initialized with * do_sync = true. */ - void write_all_blocking(std::vector bufs); + void write_all_blocking(const std::vector& bufs); private: friend class client_session; @@ -104,14 +106,8 @@ class tcp_server : public std::enable_shared_from_this { tcp_acceptor_p acceptor_v4_, acceptor_v6_; // our server socket - // sync mode fields - - // Flag to indicate that new client_sessions should use synchronous blocking data transfer. - bool transfer_is_sync_; - // sockets that should receive data in sync mode - std::vector sync_sockets_; - // io context for sync mode, app is responsible for running it - std::unique_ptr sync_transfer_io_ctx_; + // optional pointer to a handler class for synchronous transfers + std::unique_ptr sync_handler; // registry of in-flight asessions (for cancellation) std::map> inflight_; From 14c13e676bf4f344f5d8a0782ddbce7b37bb7f6e Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Wed, 17 Nov 2021 13:12:34 +0100 Subject: [PATCH 07/13] Reduce enqueue overhead --- src/sample.h | 2 ++ src/stream_outlet_impl.cpp | 23 ++++++++++++++--------- src/stream_outlet_impl.h | 7 ++++--- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/sample.h b/src/sample.h index b951d7d7..e57c6d10 100644 --- a/src/sample.h +++ b/src/sample.h @@ -54,6 +54,8 @@ class factory { /// Reclaim a sample that's no longer used. void reclaim_sample(sample *s); + std::size_t datasize() const { return format_sizes[fmt_] * static_cast(num_chans_); } + private: /// Pop a sample from the freelist (multi-producer/single-consumer queue by Dmitry Vjukov) sample *pop_freelist(); diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index 5a0464b8..4c08af89 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -168,12 +168,16 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) { return send_buffer_->wait_for_consumers(timeout); } -void stream_outlet_impl::push_timestamp_sync(const double ×tamp) { +void stream_outlet_impl::push_timestamp_sync(double timestamp) { + static_assert(TAG_TRANSMITTED_TIMESTAMP == 2, "Unexpected TAG_TRANSMITTED_TIMESTAMP"); + const uint64_t ENDIAN_SAFE_TAG_TRANSMITTED = (2LL << 28) | 2LL; if (timestamp == DEDUCED_TIMESTAMP) { - sync_buffs_.emplace_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); + sync_buffs_.emplace_back(&TAG_DEDUCED_TIMESTAMP, 1); } else { - sync_buffs_.emplace_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); - sync_buffs_.emplace_back(asio::buffer(×tamp, sizeof(timestamp))); + sync_timestamps_.emplace_back(ENDIAN_SAFE_TAG_TRANSMITTED, timestamp); + // add a pointer to the memory region containing |TAG_TRANSMITTED_TIMESTAMP|timestamp + // one byte for the tag, 8 for the timestamp + sync_buffs_.emplace_back(reinterpret_cast(&sync_timestamps_.back()) + 7, 9); } } @@ -181,10 +185,11 @@ void stream_outlet_impl::pushthrough_sync() { // LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size()); tcp_server_->write_all_blocking(sync_buffs_); sync_buffs_.clear(); + sync_timestamps_.clear(); } void stream_outlet_impl::enqueue_sync( - asio::const_buffer buff, const double ×tamp, bool pushthrough) { + asio::const_buffer buff, double timestamp, bool pushthrough) { push_timestamp_sync(timestamp); sync_buffs_.push_back(buff); if (pushthrough) pushthrough_sync(); @@ -192,14 +197,14 @@ void stream_outlet_impl::enqueue_sync( template void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrough) { - if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; - sample_p smp( - sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough)); + if (timestamp == 0.0 || lsl::api_config::get_instance()->force_default_timestamps()) timestamp = lsl_local_clock(); if (!do_sync_) { + sample_p smp( + sample_factory_->new_sample(timestamp, pushthrough)); smp->assign_typed(data); send_buffer_->push_sample(smp); } else { - enqueue_sync(asio::buffer(data, smp->datasize()), smp->timestamp, smp->pushthrough); + enqueue_sync(asio::buffer(data, sample_factory_->datasize()), timestamp, pushthrough); } } diff --git a/src/stream_outlet_impl.h b/src/stream_outlet_impl.h index 19e16eb1..0d772bdb 100644 --- a/src/stream_outlet_impl.h +++ b/src/stream_outlet_impl.h @@ -256,7 +256,6 @@ class stream_outlet_impl { throw std::runtime_error("The number of buffer elements to send is not a multiple of " "the stream's channel count."); if (num_samples > 0) { - if (timestamp == 0.0) timestamp = lsl_clock(); if (info().nominal_srate() != IRREGULAR_RATE) timestamp = timestamp - (num_samples - 1) / info().nominal_srate(); push_sample(buffer, timestamp, pushthrough && (num_samples == 1)); @@ -311,14 +310,14 @@ class stream_outlet_impl { /// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single /// timestamp. - void push_timestamp_sync(const double ×tamp); + void push_timestamp_sync(double timestamp); /// push sync_buffs_ through each tcp server. void pushthrough_sync(); /// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the /// server. - void enqueue_sync(asio::const_buffer buff, const double ×tamp, bool pushthrough); + void enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough); /** * Append a single timestamp and multiple within-sample buffers to sync_buffs_. @@ -362,6 +361,8 @@ class stream_outlet_impl { std::vector io_threads_; /// buffers used in synchronous call to gather-write data directly to the socket. std::vector sync_buffs_; + /// timestamp buffer for sync transfers + std::vector> sync_timestamps_; }; } // namespace lsl From f34d65af987bac110f8191125f32cf63d656363c Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Wed, 17 Nov 2021 13:34:39 +0100 Subject: [PATCH 08/13] fixup! Reduce enqueue overhead --- src/stream_outlet_impl.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index 4c08af89..2206c6c9 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -29,6 +29,10 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu if ((info.channel_format() == cft_string) && do_sync_) throw std::invalid_argument("Synchronous push not supported for string-formatted streams."); + // reserver space for sync timestamps so `push_back` doesn't caused reallocations + // to invalidate pointers to elements + if(do_sync_) sync_timestamps_.reserve(chunk_size_); + ensure_lsl_initialized(); const api_config *cfg = api_config::get_instance(); From b44114d5b7eccbbdfb05abd956defed8e799902c Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Sun, 12 Jun 2022 21:00:17 -0400 Subject: [PATCH 09/13] Arguments to synchronous (blocking) methods should be passed by const-ref instead of by val. --- src/stream_outlet_impl.cpp | 6 +++--- src/stream_outlet_impl.h | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index 2206c6c9..e00c8a0c 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -172,7 +172,7 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) { return send_buffer_->wait_for_consumers(timeout); } -void stream_outlet_impl::push_timestamp_sync(double timestamp) { +void stream_outlet_impl::push_timestamp_sync(const double& timestamp) { static_assert(TAG_TRANSMITTED_TIMESTAMP == 2, "Unexpected TAG_TRANSMITTED_TIMESTAMP"); const uint64_t ENDIAN_SAFE_TAG_TRANSMITTED = (2LL << 28) | 2LL; if (timestamp == DEDUCED_TIMESTAMP) { @@ -193,7 +193,7 @@ void stream_outlet_impl::pushthrough_sync() { } void stream_outlet_impl::enqueue_sync( - asio::const_buffer buff, double timestamp, bool pushthrough) { + asio::const_buffer buff, const double& timestamp, bool pushthrough) { push_timestamp_sync(timestamp); sync_buffs_.push_back(buff); if (pushthrough) pushthrough_sync(); @@ -221,7 +221,7 @@ template void stream_outlet_impl::enqueue(const double *data, double, bo template void stream_outlet_impl::enqueue(const std::string *data, double, bool); void stream_outlet_impl::enqueue_sync_multi( - std::vector buffs, double timestamp, bool pushthrough) { + std::vector buffs, const double& timestamp, bool pushthrough) { push_timestamp_sync(timestamp); sync_buffs_.insert(sync_buffs_.end(), buffs.begin(), buffs.end()); if (pushthrough) pushthrough_sync(); diff --git a/src/stream_outlet_impl.h b/src/stream_outlet_impl.h index 0d772bdb..54d6c118 100644 --- a/src/stream_outlet_impl.h +++ b/src/stream_outlet_impl.h @@ -310,14 +310,14 @@ class stream_outlet_impl { /// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single /// timestamp. - void push_timestamp_sync(double timestamp); + void push_timestamp_sync(const double& timestamp); /// push sync_buffs_ through each tcp server. void pushthrough_sync(); /// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the /// server. - void enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough); + void enqueue_sync(asio::const_buffer buff, const double& timestamp, bool pushthrough); /** * Append a single timestamp and multiple within-sample buffers to sync_buffs_. @@ -325,7 +325,7 @@ class stream_outlet_impl { * many channels are included in each buffer. */ void enqueue_sync_multi( - std::vector buffs, double timestamp, bool pushthrough); + std::vector buffs, const double& timestamp, bool pushthrough); /** * Check whether some given number of channels matches the stream's channel_count. From 55be9835b78f9dece916e9549e156d3e1be8773d Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Sun, 12 Jun 2022 21:00:56 -0400 Subject: [PATCH 10/13] SendDataCBlocking fixes and add to examples list. --- examples/CMakeLists.txt | 4 ++++ examples/SendDataCBlocking.c | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index eca4d200..c6e47e9d 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -47,3 +47,7 @@ addlslexample(TestSyncWithoutData cpp) target_link_libraries(TestSyncWithoutData PRIVATE Threads::Threads) +if(NOT WIN32) + addlslexample(SendDataCBlocking c) + target_link_libraries(SendDataCBlocking PRIVATE Threads::Threads) +endif() diff --git a/examples/SendDataCBlocking.c b/examples/SendDataCBlocking.c index 355e9c02..56d7f37b 100644 --- a/examples/SendDataCBlocking.c +++ b/examples/SendDataCBlocking.c @@ -139,7 +139,7 @@ void transmit_thread(void *vargp) { lsl_xml_ptr desc = lsl_get_desc(info); lsl_append_child_value(desc, "manufacturer", "LSL"); lsl_xml_ptr chns = lsl_append_child(desc, "channels"); - char chanlabel[12]; + char chanlabel[20]; for (int c = 0; c < device->nchans; c++) { lsl_xml_ptr chn = lsl_append_child(chns, "channel"); snprintf(chanlabel, 20, "Chan-%d", c); @@ -150,7 +150,7 @@ void transmit_thread(void *vargp) { /* make a new outlet */ lsl_outlet outlet = - lsl_create_outlet_d(info, params->chunk_size, params->buffer_dur, params->do_async); + lsl_create_outlet_ex(info, params->chunk_size, params->buffer_dur, params->do_async ? transp_sync_blocking : 0); printf("Now sending data...\n"); params->thread_status = 1; From 5eb09491d00261fec59bfa7ce72fc2f0c255a7ba Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Sat, 5 Nov 2022 22:45:02 -0400 Subject: [PATCH 11/13] Fix index bug in SendData.cpp --- examples/SendData.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/SendData.cpp b/examples/SendData.cpp index b0f1a350..a66a88ba 100644 --- a/examples/SendData.cpp +++ b/examples/SendData.cpp @@ -84,7 +84,6 @@ int main(int argc, char *argv[]) { // send data forever std::cout << "Now sending data... " << std::endl; for (unsigned t = 0;; t++) { - // Create random data for the first 8 channels. for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5); // For the remaining channels, fill them with a sample counter (wraps at 1M). From 9f6211ea1c38e5837c797134202ce2adb9152644 Mon Sep 17 00:00:00 2001 From: Paul Maanen Date: Tue, 16 Aug 2022 15:35:24 +0200 Subject: [PATCH 12/13] Load log settings before all other settings. Loading the log settings last causes heaps of INFO messages to be displayed by lsl::get_local_interfaces() even if the user has set log level to -3. Changing the order causes less ui clutter and respects the users choices. --- src/api_config.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/api_config.cpp b/src/api_config.cpp index 7c7635ee..f0f93405 100644 --- a/src/api_config.cpp +++ b/src/api_config.cpp @@ -105,6 +105,12 @@ void api_config::load_from_file(const std::string &filename) { } else loguru::g_stderr_verbosity = log_level; + // log config filename only after setting the verbosity level + if (!filename.empty()) + LOG_F(INFO, "Configuration loaded from %s", filename.c_str()); + else + LOG_F(INFO, "Loaded default config"); + // read out the [ports] parameters multicast_port_ = pt.get("ports.MulticastPort", 16571); base_port_ = pt.get("ports.BasePort", 16572); @@ -263,12 +269,6 @@ void api_config::load_from_file(const std::string &filename) { smoothing_halftime_ = pt.get("tuning.SmoothingHalftime", 90.0F); force_default_timestamps_ = pt.get("tuning.ForceDefaultTimestamps", false); - // log config filename only after setting the verbosity level and all config has been read - if (!filename.empty()) - LOG_F(INFO, "Configuration loaded from %s", filename.c_str()); - else - LOG_F(INFO, "Loaded default config"); - } catch (std::exception &e) { LOG_F(ERROR, "Error parsing config file '%s': '%s', rolling back to defaults", filename.c_str(), e.what()); From 2c836d63c48b70bc782770f6cfc4bc1fbccc8c4c Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Mon, 13 Feb 2023 17:05:27 -0500 Subject: [PATCH 13/13] version bump --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 95ec5d0a..27e8f81c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required (VERSION 3.12) project (liblsl - VERSION 1.16.1 + VERSION 1.16.2 LANGUAGES C CXX DESCRIPTION "Labstreaminglayer C/C++ library" HOMEPAGE_URL "https://github.com/sccn/liblsl"