Skip to content

Commit

Permalink
gather-write directly to asio when using sync mode and pushing a samp…
Browse files Browse the repository at this point in the history
…le of buffers.
  • Loading branch information
cboulay committed Oct 11, 2021
1 parent 89252cd commit e59ffc3
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 52 deletions.
10 changes: 5 additions & 5 deletions examples/ReceiveDataInChunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
int main(int argc, char **argv) {
std::cout << "ReceiveDataInChunks" << std::endl;
std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl;
std::cout << "- max_buffered -- duration in msec to buffer" << std::endl;
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer in the receiver" << std::endl;
std::cout << "- flush -- set non-zero to flush data instead of pulling; useful for testing throughput" << std::endl;

try {

std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.;
double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.;
bool flush = argc > 3;
// resolve the stream of interest & make an inlet
int32_t buf_samples = (int32_t)(max_buflen * 1000);
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen,
transp_bufsize_thousandths);
lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0);
lsl::stream_inlet inlet(inlet_info,(int32_t)(max_buffered * 1000),
0, true, transp_bufsize_thousandths);

// Use set_postprocessing to get the timestamps in a common base clock.
// Do not use if this application will record timestamps to disk -- it is better to
Expand Down
38 changes: 31 additions & 7 deletions examples/SendDataInChunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ int main(int argc, char **argv) {
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
bool nodata = argc > 7;
bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true;
bool b_discontig = true && do_sync; // Set true to test gather-write operations.

int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk
Expand All @@ -127,32 +128,55 @@ int main(int argc, char **argv) {
std::cout << "Stream UID: " << info.uid() << std::endl;

// Create a connection to our device.
fake_device my_device(n_channels, (float)samplingrate);
int dev_chans = b_discontig ? n_channels + 1 : n_channels;
fake_device my_device(dev_chans, (float)samplingrate);

// Prepare buffer to get data from 'device'.
// The buffer should be larger than you think you need. Here we make it 4x as large.
std::vector<int16_t> chunk_buffer(4 * chunk_samples * n_channels);
std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0);
std::vector<int16_t> dev_buffer(4 * chunk_samples * dev_chans);
std::fill(dev_buffer.begin(), dev_buffer.end(), 0);

std::cout << "Now sending data..." << std::endl;

// Your device might have its own timer. Or you can decide how often to poll
// your device, as we do here.
auto next_chunk_time = std::chrono::high_resolution_clock::now();
auto t_start = std::chrono::high_resolution_clock::now();
auto next_chunk_time = t_start;
int64_t samples_pushed = 0;
for (unsigned c = 0;; c++) {
// wait a bit
next_chunk_time += std::chrono::milliseconds(chunk_duration);
std::this_thread::sleep_until(next_chunk_time);

// Get data from device
std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata);
std::size_t returned_samples = my_device.get_data(dev_buffer, nodata);

// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
// other push_chunk methods are easier but slightly slower.
double ts = lsl::local_clock();
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, ts, true);
if (!b_discontig) {
outlet.push_chunk_multiplexed(dev_buffer.data(), returned_samples * n_channels, ts, true);
} else {
// The data on the device are discontiguous. Grab a buffer to the first channel,
// skip the second, then a buffer to the reset of the samples.
int16_t *discont[2];
uint32_t bytes[2];
for (int samp_ix = 0; samp_ix < returned_samples; samp_ix++) {
discont[0] = &dev_buffer[dev_chans * samp_ix];
bytes[0] = 1*sizeof(int16_t);
discont[1] = &dev_buffer[dev_chans * samp_ix + 2];
bytes[1] = (dev_chans - 2)*sizeof(int16_t);
bool pushthrough = samp_ix == (returned_samples - 1);
outlet.push_numeric_bufs(
reinterpret_cast<const char **>(const_cast<const int16_t**>(&discont[0])),
&bytes[0], 2, pushthrough ? ts : LSL_DEDUCED_TIMESTAMP, pushthrough);
samples_pushed++;
}
// std::cout << "Pushed " << returned_samples << " samples." << std::endl;
// std::chrono::seconds elapsed = std::chrono::duration_cast<std::chrono::seconds>(next_chunk_time - t_start);
// std::cout << double(samples_pushed) / elapsed.count() << std::endl;
}
}

} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
std::cout << "Press any key to exit. " << std::endl;
std::cin.get();
Expand Down
7 changes: 6 additions & 1 deletion include/lsl/outlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ extern LIBLSL_C_API int32_t lsl_push_sample_vtp(lsl_outlet out, const void *data
* @see lsl_push_sample_ftp
* @param out The lsl_outlet object through which to push the data.
* @param data A pointer to values to push. The number of values pointed to must be no less than the number of channels in the sample.
* @param lengths A pointer the number of elements to push for each channel (string lengths).
* @param lengths A pointer the number of elements to push for each channel (string lengths, or number of bytes).
*/
extern LIBLSL_C_API int32_t lsl_push_sample_buf(lsl_outlet out, const char **data, const uint32_t *lengths);
/** @copydoc lsl_push_sample_buf
Expand All @@ -108,6 +108,11 @@ extern LIBLSL_C_API int32_t lsl_push_sample_buft(lsl_outlet out, const char **da
/** @copydoc lsl_push_sample_buft
* @param pushthrough @see lsl_push_sample_ftp */
extern LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough);
/** @copydoc lsl_push_sample_buftp
* @param nbufs Number of values pointed to in `data` and number of items in `lengths` -- doesn't assume one buffer
* per channel but each array in data must be longer than each item in lengths.
*/
extern LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs);

/** Push a chunk of multiplexed samples into the outlet. One timestamp per sample is provided.
*
Expand Down
16 changes: 15 additions & 1 deletion include/lsl_cpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ class stream_outlet {
}

/** Push a pointer to raw numeric data as one sample into the outlet.
* This is the lowest-level function; performns no checking whatsoever. Can not be used for
* This is the lowest-level function; performs no checking whatsoever. Cannot be used for
* variable-size / string-formatted channels.
* @param sample A pointer to the raw sample data to push.
* @param timestamp Optionally the capture time of the sample, in agreement with local_clock();
Expand All @@ -512,6 +512,20 @@ class stream_outlet {
lsl_push_sample_vtp(obj.get(), (sample), timestamp, pushthrough);
}

/**
* Push a pointer to an array of buffers of variable size as one sample into the outlet.
*
* @param bufs A pointer to an array of data buffers.
* @param bytes An array of sizes (number of bytes) of buffers in bufs.
* @param nbufs Total number of buffers.
* @param timestamp Optionally the capture time of the sample, in agreement with local_clock();
* @param pushthrough Whether to push the sample through to the receivers immediately instead of
* concatenating with subsequent samples.
*/
void push_numeric_bufs(const char **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0, bool pushthrough = true) {
lsl_push_sample_buftpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs);
}


// ===================================================
// === Pushing an chunk of samples into the outlet ===
Expand Down
30 changes: 24 additions & 6 deletions src/lsl_outlet_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ LIBLSL_C_API lsl_outlet lsl_create_outlet_ex(
buf_samples /= 1000;
buf_samples = (buf_samples > 0) ? buf_samples : 1;
return create_object_noexcept<stream_outlet_impl>(
*info, chunk_size, buf_samples);
*info, chunk_size, buf_samples, flags);
}

LIBLSL_C_API lsl_outlet lsl_create_outlet(
Expand Down Expand Up @@ -171,14 +171,32 @@ LIBLSL_C_API int32_t lsl_push_sample_buft(

LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data,
const uint32_t *lengths, double timestamp, int32_t pushthrough) {
stream_outlet_impl *outimpl = out;
return lsl_push_sample_buftpn(out, data, lengths, timestamp, pushthrough,
(uint32_t)outimpl->info().channel_count());
}

LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data,
const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs) {
stream_outlet_impl *outimpl = out;
try {
stream_outlet_impl *outimpl = out;
std::vector<std::string> tmp;
for (uint32_t k = 0; k < (uint32_t)outimpl->info().channel_count(); k++)
tmp.emplace_back(data[k], lengths[k]);
return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough);
if (outimpl->is_sync_blocking()) {
// Convert input to a vector of asio buffers
std::vector<asio::const_buffer> buffs;
for (auto buf_ix = 0; buf_ix < nbufs; buf_ix++) {
buffs.push_back(asio::buffer(data[buf_ix], lengths[buf_ix]));
}
return outimpl->push_sample_gather(buffs, timestamp, pushthrough);
} else {
std::vector<std::string> tmp;
for (uint32_t k = 0; k < nbufs; k++)
tmp.emplace_back(data[k], lengths[k]);
return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough);
}
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected error during push_sample: %s", e.what());
if (!outimpl->is_sync_blocking() && outimpl->info().channel_format() != cft_string)
LOG_F(ERROR, "lsl_push_sample_buftpn only compatible with string type or when outlet is using sync writes.");
return lsl_internal_error;
}
}
Expand Down
57 changes: 31 additions & 26 deletions src/stream_outlet_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ stream_outlet_impl::stream_outlet_impl(
send_buffer_(std::make_shared<send_buffer>(max_capacity)),
do_sync_(flags & transp_sync_blocking) {

if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) {
if ((info.channel_format() == cft_string) && do_sync_) {
LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async.");
do_sync_ = false;
}
Expand Down Expand Up @@ -154,20 +154,8 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo
smp->assign_untyped(data); // Note: Makes a copy!
send_buffer_->push_sample(smp);
} else {
if (timestamp == DEDUCED_TIMESTAMP) {
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
} else {
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
}
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
if (pushthrough) {
for (auto &tcp_server : tcp_servers_)
tcp_server->write_all_blocking(sync_buffs_);
sync_buffs_.clear();
}
enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough);
}

}

bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); }
Expand All @@ -176,6 +164,28 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) {
return send_buffer_->wait_for_consumers(timeout);
}

void stream_outlet_impl::push_timestamp_sync(double timestamp) {
if (timestamp == DEDUCED_TIMESTAMP) {
sync_buffs_.emplace_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
} else {
sync_buffs_.emplace_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
sync_buffs_.emplace_back(asio::buffer(&timestamp, sizeof(timestamp)));
}
}

void stream_outlet_impl::pushthrough_sync() {
// LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size());
for (auto &tcp_server : tcp_servers_)
tcp_server->write_all_blocking(sync_buffs_);
sync_buffs_.clear();
}

void stream_outlet_impl::enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough) {
push_timestamp_sync(timestamp);
sync_buffs_.push_back(buff);
if (pushthrough) pushthrough_sync();
}

template <class T>
void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrough) {
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
Expand All @@ -185,18 +195,7 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou
smp->assign_typed(data);
send_buffer_->push_sample(smp);
} else {
if (timestamp == DEDUCED_TIMESTAMP) {
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
} else {
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
}
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
if (pushthrough) {
for (auto &tcp_server : tcp_servers_)
tcp_server->write_all_blocking(sync_buffs_);
sync_buffs_.clear();
}
enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough);
}
}

Expand All @@ -208,4 +207,10 @@ template void stream_outlet_impl::enqueue<float>(const float *data, double, bool
template void stream_outlet_impl::enqueue<double>(const double *data, double, bool);
template void stream_outlet_impl::enqueue<std::string>(const std::string *data, double, bool);

void stream_outlet_impl::enqueue_sync_multi(std::vector<asio::const_buffer> buffs, double timestamp, bool pushthrough) {
push_timestamp_sync(timestamp);
sync_buffs_.insert( sync_buffs_.end(), buffs.begin(), buffs.end() );
if (pushthrough) pushthrough_sync();
}

} // namespace lsl
24 changes: 23 additions & 1 deletion src/stream_outlet_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ class stream_outlet_impl {
void push_sample(const std::string *data, double timestamp = 0.0, bool pushthrough = true) {
enqueue(data, timestamp, pushthrough);
}

lsl_error_code_t push_sample_gather(std::vector<asio::const_buffer> buffs, double timestamp = 0.0, bool pushthrough = true) {
enqueue_sync_multi(buffs, timestamp, pushthrough);
return lsl_no_error;
}

template <typename T>
inline lsl_error_code_t push_sample_noexcept(
Expand Down Expand Up @@ -295,13 +298,32 @@ class stream_outlet_impl {
/// Wait until some consumer shows up.
bool wait_for_consumers(double timeout = FOREVER);

/// If the outlet is intended to use synchronous blocking transfers
bool is_sync_blocking() { return do_sync_; };

private:
/// Instantiate a new server stack.
void instantiate_stack(tcp tcp_protocol, udp udp_protocol);

/// Allocate and enqueue a new sample into the send buffer.
template <class T> void enqueue(const T *data, double timestamp, bool pushthrough);

/// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single timestamp.
void push_timestamp_sync(double timestamp);

/// push sync_buffs_ through each tcp server.
void pushthrough_sync();

/// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the server.
void enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough);

/**
* Append a single timestamp and multiple within-sample buffers to sync_buffs_.
* This is useful when a sample is discontiguous in memory. It makes no assumptions about how
* many channels are included in each buffer.
*/
void enqueue_sync_multi(std::vector<asio::const_buffer> buffs, double timestamp, bool pushthrough);

/**
* Check whether some given number of channels matches the stream's channel_count.
* Throws an error if not.
Expand Down
13 changes: 8 additions & 5 deletions src/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,15 @@ void tcp_server::handle_accept_outcome(std::shared_ptr<client_session> newsessio

void tcp_server::write_all_blocking(std::vector<asio::const_buffer> buffs) {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
std::size_t bytes_sent;
asio::error_code ec;
std::size_t bytes_sent = 0;
for (const auto &x : inflight_ready_) {
if (x.second && x.first->is_open()) {
bytes_sent = x.first->send(buffs, 0, ec);
if (ec) {
try {
// I couldn't figure out how to get the correct overload while providing
// error_code& ec to the write function. So we use try-catch instead.
bytes_sent = asio::write(*x.first, buffs);
} catch (const asio::system_error &err) { // std::exception &e
asio::error_code ec = err.code();
switch(ec.value()) {
case asio::error::broken_pipe:
case asio::error::connection_reset:
Expand All @@ -243,7 +246,7 @@ void tcp_server::write_all_blocking(std::vector<asio::const_buffer> buffs) {
// We leave it up to the client_session destructor to remove the socket.
break;
default:
LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str());
LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", err.what());
}
}
}
Expand Down

0 comments on commit e59ffc3

Please sign in to comment.