From e59ffc33616fdaf9c2cb7f581f911e7e1ccceaf8 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Sat, 9 Oct 2021 11:07:58 -0400 Subject: [PATCH] gather-write directly to asio when using sync mode and pushing a sample of buffers. --- examples/ReceiveDataInChunks.cpp | 10 +++--- examples/SendDataInChunks.cpp | 38 +++++++++++++++++---- include/lsl/outlet.h | 7 +++- include/lsl_cpp.h | 16 ++++++++- src/lsl_outlet_c.cpp | 30 +++++++++++++---- src/stream_outlet_impl.cpp | 57 +++++++++++++++++--------------- src/stream_outlet_impl.h | 24 +++++++++++++- src/tcp_server.cpp | 13 +++++--- 8 files changed, 143 insertions(+), 52 deletions(-) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 5b5c4d5f..fb1ed5cf 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -8,18 +8,18 @@ int main(int argc, char **argv) { std::cout << "ReceiveDataInChunks" << std::endl; std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl; - std::cout << "- max_buffered -- duration in msec to buffer" << std::endl; + std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer in the receiver" << std::endl; std::cout << "- flush -- set non-zero to flush data instead of pulling; useful for testing throughput" << std::endl; try { std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; - double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.; + double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.; bool flush = argc > 3; // resolve the stream of interest & make an inlet - int32_t buf_samples = (int32_t)(max_buflen * 1000); - lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen, - transp_bufsize_thousandths); + lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0); + lsl::stream_inlet inlet(inlet_info,(int32_t)(max_buffered * 1000), + 0, true, transp_bufsize_thousandths); // Use set_postprocessing to get the timestamps in a common base clock. // Do not use if this application will record timestamps to disk -- it is better to diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index 846fe676..a3807c1b 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -104,6 +104,7 @@ int main(int argc, char **argv) { int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. bool nodata = argc > 7; bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true; + bool b_discontig = true && do_sync; // Set true to test gather-write operations. int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk @@ -127,32 +128,55 @@ int main(int argc, char **argv) { std::cout << "Stream UID: " << info.uid() << std::endl; // Create a connection to our device. - fake_device my_device(n_channels, (float)samplingrate); + int dev_chans = b_discontig ? n_channels + 1 : n_channels; + fake_device my_device(dev_chans, (float)samplingrate); // Prepare buffer to get data from 'device'. // The buffer should be larger than you think you need. Here we make it 4x as large. - std::vector chunk_buffer(4 * chunk_samples * n_channels); - std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0); + std::vector dev_buffer(4 * chunk_samples * dev_chans); + std::fill(dev_buffer.begin(), dev_buffer.end(), 0); std::cout << "Now sending data..." << std::endl; // Your device might have its own timer. Or you can decide how often to poll // your device, as we do here. - auto next_chunk_time = std::chrono::high_resolution_clock::now(); + auto t_start = std::chrono::high_resolution_clock::now(); + auto next_chunk_time = t_start; + int64_t samples_pushed = 0; for (unsigned c = 0;; c++) { // wait a bit next_chunk_time += std::chrono::milliseconds(chunk_duration); std::this_thread::sleep_until(next_chunk_time); // Get data from device - std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata); + std::size_t returned_samples = my_device.get_data(dev_buffer, nodata); // send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches. // other push_chunk methods are easier but slightly slower. double ts = lsl::local_clock(); - outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, ts, true); + if (!b_discontig) { + outlet.push_chunk_multiplexed(dev_buffer.data(), returned_samples * n_channels, ts, true); + } else { + // The data on the device are discontiguous. Grab a buffer to the first channel, + // skip the second, then a buffer to the reset of the samples. + int16_t *discont[2]; + uint32_t bytes[2]; + for (int samp_ix = 0; samp_ix < returned_samples; samp_ix++) { + discont[0] = &dev_buffer[dev_chans * samp_ix]; + bytes[0] = 1*sizeof(int16_t); + discont[1] = &dev_buffer[dev_chans * samp_ix + 2]; + bytes[1] = (dev_chans - 2)*sizeof(int16_t); + bool pushthrough = samp_ix == (returned_samples - 1); + outlet.push_numeric_bufs( + reinterpret_cast(const_cast(&discont[0])), + &bytes[0], 2, pushthrough ? ts : LSL_DEDUCED_TIMESTAMP, pushthrough); + samples_pushed++; + } + // std::cout << "Pushed " << returned_samples << " samples." << std::endl; + // std::chrono::seconds elapsed = std::chrono::duration_cast(next_chunk_time - t_start); + // std::cout << double(samples_pushed) / elapsed.count() << std::endl; + } } - } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } std::cout << "Press any key to exit. " << std::endl; std::cin.get(); diff --git a/include/lsl/outlet.h b/include/lsl/outlet.h index eba0a90b..c03fc72f 100644 --- a/include/lsl/outlet.h +++ b/include/lsl/outlet.h @@ -99,7 +99,7 @@ extern LIBLSL_C_API int32_t lsl_push_sample_vtp(lsl_outlet out, const void *data * @see lsl_push_sample_ftp * @param out The lsl_outlet object through which to push the data. * @param data A pointer to values to push. The number of values pointed to must be no less than the number of channels in the sample. - * @param lengths A pointer the number of elements to push for each channel (string lengths). + * @param lengths A pointer the number of elements to push for each channel (string lengths, or number of bytes). */ extern LIBLSL_C_API int32_t lsl_push_sample_buf(lsl_outlet out, const char **data, const uint32_t *lengths); /** @copydoc lsl_push_sample_buf @@ -108,6 +108,11 @@ extern LIBLSL_C_API int32_t lsl_push_sample_buft(lsl_outlet out, const char **da /** @copydoc lsl_push_sample_buft * @param pushthrough @see lsl_push_sample_ftp */ extern LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough); +/** @copydoc lsl_push_sample_buftp + * @param nbufs Number of values pointed to in `data` and number of items in `lengths` -- doesn't assume one buffer + * per channel but each array in data must be longer than each item in lengths. + */ +extern LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs); /** Push a chunk of multiplexed samples into the outlet. One timestamp per sample is provided. * diff --git a/include/lsl_cpp.h b/include/lsl_cpp.h index a0711639..faf583cf 100644 --- a/include/lsl_cpp.h +++ b/include/lsl_cpp.h @@ -499,7 +499,7 @@ class stream_outlet { } /** Push a pointer to raw numeric data as one sample into the outlet. - * This is the lowest-level function; performns no checking whatsoever. Can not be used for + * This is the lowest-level function; performs no checking whatsoever. Cannot be used for * variable-size / string-formatted channels. * @param sample A pointer to the raw sample data to push. * @param timestamp Optionally the capture time of the sample, in agreement with local_clock(); @@ -512,6 +512,20 @@ class stream_outlet { lsl_push_sample_vtp(obj.get(), (sample), timestamp, pushthrough); } + /** + * Push a pointer to an array of buffers of variable size as one sample into the outlet. + * + * @param bufs A pointer to an array of data buffers. + * @param bytes An array of sizes (number of bytes) of buffers in bufs. + * @param nbufs Total number of buffers. + * @param timestamp Optionally the capture time of the sample, in agreement with local_clock(); + * @param pushthrough Whether to push the sample through to the receivers immediately instead of + * concatenating with subsequent samples. + */ + void push_numeric_bufs(const char **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0, bool pushthrough = true) { + lsl_push_sample_buftpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs); + } + // =================================================== // === Pushing an chunk of samples into the outlet === diff --git a/src/lsl_outlet_c.cpp b/src/lsl_outlet_c.cpp index b652c53b..45621470 100644 --- a/src/lsl_outlet_c.cpp +++ b/src/lsl_outlet_c.cpp @@ -28,7 +28,7 @@ LIBLSL_C_API lsl_outlet lsl_create_outlet_ex( buf_samples /= 1000; buf_samples = (buf_samples > 0) ? buf_samples : 1; return create_object_noexcept( - *info, chunk_size, buf_samples); + *info, chunk_size, buf_samples, flags); } LIBLSL_C_API lsl_outlet lsl_create_outlet( @@ -171,14 +171,32 @@ LIBLSL_C_API int32_t lsl_push_sample_buft( LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough) { + stream_outlet_impl *outimpl = out; + return lsl_push_sample_buftpn(out, data, lengths, timestamp, pushthrough, + (uint32_t)outimpl->info().channel_count()); +} + +LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data, + const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs) { + stream_outlet_impl *outimpl = out; try { - stream_outlet_impl *outimpl = out; - std::vector tmp; - for (uint32_t k = 0; k < (uint32_t)outimpl->info().channel_count(); k++) - tmp.emplace_back(data[k], lengths[k]); - return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + if (outimpl->is_sync_blocking()) { + // Convert input to a vector of asio buffers + std::vector buffs; + for (auto buf_ix = 0; buf_ix < nbufs; buf_ix++) { + buffs.push_back(asio::buffer(data[buf_ix], lengths[buf_ix])); + } + return outimpl->push_sample_gather(buffs, timestamp, pushthrough); + } else { + std::vector tmp; + for (uint32_t k = 0; k < nbufs; k++) + tmp.emplace_back(data[k], lengths[k]); + return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + } } catch (std::exception &e) { LOG_F(WARNING, "Unexpected error during push_sample: %s", e.what()); + if (!outimpl->is_sync_blocking() && outimpl->info().channel_format() != cft_string) + LOG_F(ERROR, "lsl_push_sample_buftpn only compatible with string type or when outlet is using sync writes."); return lsl_internal_error; } } diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index b1986ae9..149898c2 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -23,7 +23,7 @@ stream_outlet_impl::stream_outlet_impl( send_buffer_(std::make_shared(max_capacity)), do_sync_(flags & transp_sync_blocking) { - if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) { + if ((info.channel_format() == cft_string) && do_sync_) { LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async."); do_sync_ = false; } @@ -154,20 +154,8 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo smp->assign_untyped(data); // Note: Makes a copy! send_buffer_->push_sample(smp); } else { - if (timestamp == DEDUCED_TIMESTAMP) { - sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); - } else { - sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); - sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); - } - sync_buffs_.push_back(asio::buffer(data, smp->datasize())); - if (pushthrough) { - for (auto &tcp_server : tcp_servers_) - tcp_server->write_all_blocking(sync_buffs_); - sync_buffs_.clear(); - } + enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough); } - } bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); } @@ -176,6 +164,28 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) { return send_buffer_->wait_for_consumers(timeout); } +void stream_outlet_impl::push_timestamp_sync(double timestamp) { + if (timestamp == DEDUCED_TIMESTAMP) { + sync_buffs_.emplace_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); + } else { + sync_buffs_.emplace_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); + sync_buffs_.emplace_back(asio::buffer(×tamp, sizeof(timestamp))); + } +} + +void stream_outlet_impl::pushthrough_sync() { + // LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size()); + for (auto &tcp_server : tcp_servers_) + tcp_server->write_all_blocking(sync_buffs_); + sync_buffs_.clear(); +} + +void stream_outlet_impl::enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough) { + push_timestamp_sync(timestamp); + sync_buffs_.push_back(buff); + if (pushthrough) pushthrough_sync(); +} + template void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrough) { if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; @@ -185,18 +195,7 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou smp->assign_typed(data); send_buffer_->push_sample(smp); } else { - if (timestamp == DEDUCED_TIMESTAMP) { - sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); - } else { - sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); - sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); - } - sync_buffs_.push_back(asio::buffer(data, smp->datasize())); - if (pushthrough) { - for (auto &tcp_server : tcp_servers_) - tcp_server->write_all_blocking(sync_buffs_); - sync_buffs_.clear(); - } + enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough); } } @@ -208,4 +207,10 @@ template void stream_outlet_impl::enqueue(const float *data, double, bool template void stream_outlet_impl::enqueue(const double *data, double, bool); template void stream_outlet_impl::enqueue(const std::string *data, double, bool); +void stream_outlet_impl::enqueue_sync_multi(std::vector buffs, double timestamp, bool pushthrough) { + push_timestamp_sync(timestamp); + sync_buffs_.insert( sync_buffs_.end(), buffs.begin(), buffs.end() ); + if (pushthrough) pushthrough_sync(); +} + } // namespace lsl diff --git a/src/stream_outlet_impl.h b/src/stream_outlet_impl.h index 32b76b8a..c92a3443 100644 --- a/src/stream_outlet_impl.h +++ b/src/stream_outlet_impl.h @@ -137,7 +137,10 @@ class stream_outlet_impl { void push_sample(const std::string *data, double timestamp = 0.0, bool pushthrough = true) { enqueue(data, timestamp, pushthrough); } - + lsl_error_code_t push_sample_gather(std::vector buffs, double timestamp = 0.0, bool pushthrough = true) { + enqueue_sync_multi(buffs, timestamp, pushthrough); + return lsl_no_error; + } template inline lsl_error_code_t push_sample_noexcept( @@ -295,6 +298,9 @@ class stream_outlet_impl { /// Wait until some consumer shows up. bool wait_for_consumers(double timeout = FOREVER); + /// If the outlet is intended to use synchronous blocking transfers + bool is_sync_blocking() { return do_sync_; }; + private: /// Instantiate a new server stack. void instantiate_stack(tcp tcp_protocol, udp udp_protocol); @@ -302,6 +308,22 @@ class stream_outlet_impl { /// Allocate and enqueue a new sample into the send buffer. template void enqueue(const T *data, double timestamp, bool pushthrough); + /// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single timestamp. + void push_timestamp_sync(double timestamp); + + /// push sync_buffs_ through each tcp server. + void pushthrough_sync(); + + /// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the server. + void enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough); + + /** + * Append a single timestamp and multiple within-sample buffers to sync_buffs_. + * This is useful when a sample is discontiguous in memory. It makes no assumptions about how + * many channels are included in each buffer. + */ + void enqueue_sync_multi(std::vector buffs, double timestamp, bool pushthrough); + /** * Check whether some given number of channels matches the stream's channel_count. * Throws an error if not. diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 21404b7f..83b7b326 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -226,12 +226,15 @@ void tcp_server::handle_accept_outcome(std::shared_ptr newsessio void tcp_server::write_all_blocking(std::vector buffs) { std::lock_guard lock(inflight_mut_); - std::size_t bytes_sent; - asio::error_code ec; + std::size_t bytes_sent = 0; for (const auto &x : inflight_ready_) { if (x.second && x.first->is_open()) { - bytes_sent = x.first->send(buffs, 0, ec); - if (ec) { + try { + // I couldn't figure out how to get the correct overload while providing + // error_code& ec to the write function. So we use try-catch instead. + bytes_sent = asio::write(*x.first, buffs); + } catch (const asio::system_error &err) { // std::exception &e + asio::error_code ec = err.code(); switch(ec.value()) { case asio::error::broken_pipe: case asio::error::connection_reset: @@ -243,7 +246,7 @@ void tcp_server::write_all_blocking(std::vector buffs) { // We leave it up to the client_session destructor to remove the socket. break; default: - LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str()); + LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", err.what()); } } }