Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronous Outlet for zero-copying socket writing #153

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ addlslexample(ReceiveDataInChunks cpp)
addlslexample(ReceiveDataSimple cpp)
addlslexample(ReceiveStringMarkers cpp)
addlslexample(ReceiveStringMarkersC c)
addlslexample(SendData cpp)
addlslexample(SendDataC c)
addlslexample(SendDataInChunks cpp)
addlslexample(SendDataSimple cpp)
Expand Down
8 changes: 6 additions & 2 deletions examples/ReceiveDataInChunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
int main(int argc, char **argv) {
std::cout << "ReceiveDataInChunks" << std::endl;
std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl;
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer in the receiver" << std::endl;
std::cout << "- flush -- set non-zero to flush data instead of pulling; useful for testing throughput" << std::endl;

try {

std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360;
double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.;
bool flush = argc > 3;
// resolve the stream of interest & make an inlet
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen);
lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0);
lsl::stream_inlet inlet(inlet_info,(int32_t)(max_buffered * 1000),
0, true, transp_bufsize_thousandths);

// Use set_postprocessing to get the timestamps in a common base clock.
// Do not use if this application will record timestamps to disk -- it is better to
Expand Down
164 changes: 95 additions & 69 deletions examples/SendData.cpp
Original file line number Diff line number Diff line change
@@ -1,69 +1,95 @@
#include "lsl_cpp.h"
#include <iostream>
#include <stdlib.h>
#include <time.h>
using namespace std;

/**
* This example program offers an 8-channel stream, float-formatted, that resembles EEG data.
* The example demonstrates also how per-channel meta-data can be specified using the .desc() field
* of the stream information object.
*
* Note that the timer used in the send loop of this program is not particularly accurate.
*/


const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"};

int main(int argc, char *argv[]) {
string name, type;
if (argc != 3) {
cout << "This opens a stream under some user-defined name and with a user-defined content "
"type."
<< endl;
cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without "
"the quotes)):"
<< endl;
cin >> name >> type;
} else {
name = argv[1];
type = argv[2];
}

try {

// make a new stream_info (100 Hz)
lsl::stream_info info(name, type, 8, 100, lsl::cf_float32, string(name) += type);

// add some description fields
info.desc().append_child_value("manufacturer", "BioSemi");
lsl::xml_element chns = info.desc().append_child("channels");
for (int k = 0; k < 8; k++)
chns.append_child("channel")
.append_child_value("label", channels[k])
.append_child_value("unit", "microvolts")
.append_child_value("type", "EEG");

// make a new outlet
lsl::stream_outlet outlet(info);

// send data forever
cout << "Now sending data... " << endl;
double starttime = ((double)clock()) / CLOCKS_PER_SEC;
for (unsigned t = 0;; t++) {

// wait a bit and create random data
while (((double)clock()) / CLOCKS_PER_SEC < starttime + t * 0.01)
;
float sample[8];
for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5);

// send the sample
outlet.push_sample(sample);
}

} catch (std::exception &e) { cerr << "Got an exception: " << e.what() << endl; }
cout << "Press any key to exit. " << endl;
cin.get();
return 0;
}
#include "lsl_cpp.h"
#include <iostream>
#include <stdlib.h>
#include <time.h>
#include <array>
using namespace std;

/**
* This example program offers an 8-channel stream, float-formatted, that resembles EEG data.
* The example demonstrates also how per-channel meta-data can be specified using the .desc() field
* of the stream information object.
*
* Note that the timer used in the send loop of this program is not particularly accurate.
*/


const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"};

int main(int argc, char *argv[]) {
string name, type;
if (argc < 3) {
cout << "This opens a stream under some user-defined name and with a user-defined content "
"type." << endl;
cout << "SendData Name Type n_channels[8] srate[100] max_buffered[360] sync[false] contig[true]" << endl;
cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without "
"the quotes)):"
<< endl;
cin >> name >> type;
} else {
name = argv[1];
type = argv[2];
}
int n_channels = argc > 3 ? std::stol(argv[3]) : 8;
n_channels = n_channels < 8 ? 8 : n_channels;
int samplingrate = argc > 4 ? std::stol(argv[4]) : 100;
int max_buffered = argc > 5 ? std::stol(argv[5]) : 360;
bool sync = argc > 6 ? std::stol(argv[6]) > 0 : false;
bool contig = argc > 7 ? std::stol(argv[7]) > 0 : true;

try {

// make a new stream_info (100 Hz)
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_float32, string(name) += type);

// add some description fields
info.desc().append_child_value("manufacturer", "LSL");
lsl::xml_element chns = info.desc().append_child("channels");
for (int k = 0; k < n_channels; k++)
chns.append_child("channel")
.append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k+1))
.append_child_value("unit", "microvolts")
.append_child_value("type", type);

// make a new outlet
lsl::stream_outlet outlet(info, 0, max_buffered, sync ? transp_sync_blocking : transp_default);

// send data forever
cout << "Now sending data... " << endl;
double starttime = ((double)clock()) / CLOCKS_PER_SEC;

// Initialize 2 discontiguous data arrays.
vector<float> sample(8, 0.0);
vector<float> extra(n_channels - 8, 0.0);
if (contig) {
// If this is contiguous mode (default) then we combine the arrays.
sample.insert(
sample.end(),
make_move_iterator(extra.begin()),
cboulay marked this conversation as resolved.
Show resolved Hide resolved
make_move_iterator(extra.end()));
}
// bytes is used in !contig mode because we need to know how big each buffer is.
array<uint32_t, 2> bytes = {8 * sizeof(float), static_cast<uint32_t>((n_channels - 8) * sizeof(float))};
for (unsigned t = 0;; t++) {

// wait a bit and create random data
while (((double)clock()) / CLOCKS_PER_SEC < starttime + t * 0.01)
cboulay marked this conversation as resolved.
Show resolved Hide resolved
;
for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5);
tstenner marked this conversation as resolved.
Show resolved Hide resolved

// send the sample
if (contig)
outlet.push_sample(sample);
else {
// Advanced: Push set of discontiguous buffers.
array<float *, 2> bufs = {sample.data(), extra.data()};
outlet.push_numeric_bufs(reinterpret_cast<const char **>(const_cast<const float**>(bufs.data())),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new push_numeric_bufs should be part of a separate PR; as it is it's easy to misuse the API / provoke undefined behavior and the casts should be handled more safely by the C++ API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree, but unfortunately I need this functionality from pure C. So it was either this, or add 14 functions to the C API. We can do that eventually, after all the inner workings are settled on.

bytes.data(), 2, lsl::local_clock(), true);
}
}

} catch (exception &e) { cerr << "Got an exception: " << e.what() << endl; }
cout << "Press any key to exit. " << endl;
cin.get();
return 0;
}
55 changes: 39 additions & 16 deletions examples/SendDataInChunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ struct fake_device {
pattern.reserve(pattern_samples * n_channels);
for (auto sample_ix = 0; sample_ix < pattern_samples; ++sample_ix) {
for (auto chan_ix = 0; chan_ix < n_channels; ++chan_ix) {
// sin(2*pi*f*t), where f cycles from 1 Hz to Nyquist: srate / 2
double f = (chan_ix + 1) % (int)(srate / 2);
pattern.emplace_back(
offset_0 + chan_ix * offset_step +
magnitude * static_cast<int16_t>(sin(M_PI * chan_ix * sample_ix / n_channels)));
magnitude * static_cast<int16_t>(sin(2 * M_PI * f * sample_ix / srate)));
}
}
last_time = std::chrono::steady_clock::now();
Expand All @@ -64,15 +66,15 @@ struct fake_device {
return output;
}

std::size_t get_data(std::vector<int16_t> &buffer) {
std::size_t get_data(std::vector<int16_t> &buffer, bool nodata = false) {
auto now = std::chrono::steady_clock::now();
auto elapsed_nano =
std::chrono::duration_cast<std::chrono::nanoseconds>(now - last_time).count();
int64_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK.
elapsed_samples = std::min(elapsed_samples, (int64_t)(buffer.size() / n_channels));
if (false) {
if (nodata) {
// The fastest but no patterns.
memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]);
// memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]);
} else {
std::size_t end_sample = head + elapsed_samples;
std::size_t nowrap_samples = std::min(pattern_samples - head, elapsed_samples);
Expand All @@ -89,57 +91,78 @@ struct fake_device {

int main(int argc, char **argv) {
std::cout << "SendDataInChunks" << std::endl;
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl;
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate nodata use_sync" << std::endl;
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl;
std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl;

std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into the buffer." << std::endl;
std::cout << "- use_sync -- Set to non-zero to use blocking send." << std::endl;

std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"};
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device.
int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device.
int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360;
double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.;
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
bool nodata = argc > 7;
bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true;
bool b_contig = true && do_sync; // Set true to test gather-write operations.

int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk

try {
// Prepare the LSL stream.
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16);
lsl::stream_outlet outlet(info, 0, max_buffered);
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16, "example-SendDataInChunks");
lsl::xml_element desc = info.desc();
desc.append_child_value("manufacturer", "LSL");
lsl::xml_element chns = desc.append_child("channels");
for (int c = 0; c < n_channels; c++) {
lsl::xml_element chn = chns.append_child("channel");
chn.append_child_value("label", "Chan-" + std::to_string(c));
chn.append_child_value("unit", "microvolts");
chn.append_child_value("type", "EEG");
chn.append_child_value("type", type);
}
int32_t buf_samples = max_buffered * samplingrate;
lsl::stream_outlet outlet(info, chunk_samples, buf_samples,
transp_bufsize_samples | (do_sync ? transp_sync_blocking: transp_default));
info = outlet.info(); // Refresh info with whatever the outlet captured.
std::cout << "Stream UID: " << info.uid() << std::endl;

// Create a connection to our device.
fake_device my_device(n_channels, (float)samplingrate);
int dev_chans = b_contig ? n_channels : n_channels + 1;
fake_device my_device(dev_chans, (float)samplingrate);

// Prepare buffer to get data from 'device'.
// The buffer should be larger than you think you need. Here we make it 4x as large.
std::vector<int16_t> chunk_buffer(4 * chunk_samples * n_channels);
std::vector<int16_t> dev_buffer(4 * chunk_samples * dev_chans);
std::fill(dev_buffer.begin(), dev_buffer.end(), 0);

std::cout << "Now sending data..." << std::endl;

// Your device might have its own timer. Or you can decide how often to poll
// your device, as we do here.
auto next_chunk_time = std::chrono::high_resolution_clock::now();
auto t_start = std::chrono::high_resolution_clock::now();
auto next_chunk_time = t_start;
for (unsigned c = 0;; c++) {
// wait a bit
next_chunk_time += std::chrono::milliseconds(chunk_duration);
std::this_thread::sleep_until(next_chunk_time);

// Get data from device
std::size_t returned_samples = my_device.get_data(chunk_buffer);
std::size_t returned_samples = my_device.get_data(dev_buffer, nodata);

// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
// other push_chunk methods are easier but slightly slower.
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, 0.0, true);
double ts = lsl::local_clock();
if (b_contig) {
// Push a chunk of a contiguous buffer.
outlet.push_chunk_multiplexed(dev_buffer.data(), returned_samples * n_channels, ts, true);
} else {
std::cout << "Discontiguous push_chunk not yet supported." << std::endl;
std::cout << "See SendData.cpp for discontiguous push_sample, then set " << std::endl;
std::cout << "timestamps as LSL_DEDUCED_TIMESTAMP and pushtrough as false " << std::endl;
std::cout << "for all samples except the the first or last in a chunk." << std::endl;
}
}

} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
std::cout << "Press any key to exit. " << std::endl;
std::cin.get();
Expand Down
3 changes: 3 additions & 0 deletions include/lsl/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ typedef enum {
/// The supplied max_buf should be scaled by 0.001.
transp_bufsize_thousandths = 2,

/// The outlet will use synchronous (blocking) calls to asio to push data
transp_sync_blocking = 4,

// prevent compilers from assuming an instance fits in a single byte
_lsl_transport_options_maxval = 0x7f000000
} lsl_transport_options_t;
Expand Down
7 changes: 6 additions & 1 deletion include/lsl/outlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ extern LIBLSL_C_API int32_t lsl_push_sample_vtp(lsl_outlet out, const void *data
* @see lsl_push_sample_ftp
* @param out The lsl_outlet object through which to push the data.
* @param data A pointer to values to push. The number of values pointed to must be no less than the number of channels in the sample.
* @param lengths A pointer the number of elements to push for each channel (string lengths).
* @param lengths A pointer the number of elements to push for each channel (string lengths, or number of bytes).
cboulay marked this conversation as resolved.
Show resolved Hide resolved
*/
extern LIBLSL_C_API int32_t lsl_push_sample_buf(lsl_outlet out, const char **data, const uint32_t *lengths);
/** @copydoc lsl_push_sample_buf
Expand All @@ -108,6 +108,11 @@ extern LIBLSL_C_API int32_t lsl_push_sample_buft(lsl_outlet out, const char **da
/** @copydoc lsl_push_sample_buft
* @param pushthrough @see lsl_push_sample_ftp */
extern LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough);
/** @copydoc lsl_push_sample_buftp
* @param nbufs Number of values pointed to in `data` and number of items in `lengths` -- doesn't assume one buffer
* per channel but each array in data must be longer than each item in lengths.
*/
extern LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hard to understand without an example. The n suffix implies that it's just like lsl_push_sample_buftp with an extra flag, even though it does something different (i.e., lsl_push_sample_buftp is for pushing variable-sized elements (one per channel) to (hopefully) a string outlet, while lsl_push_sample_buftpn is for pushing multiple groups of (fixed sized) channels). Also, the type of data should be const void ** to communicate that any kind of data is meant to be pushed


/** Push a chunk of multiplexed samples into the outlet. One timestamp per sample is provided.
*
Expand Down
16 changes: 15 additions & 1 deletion include/lsl_cpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ class stream_outlet {
}

/** Push a pointer to raw numeric data as one sample into the outlet.
* This is the lowest-level function; performns no checking whatsoever. Can not be used for
* This is the lowest-level function; performs no checking whatsoever. Cannot be used for
* variable-size / string-formatted channels.
* @param sample A pointer to the raw sample data to push.
* @param timestamp Optionally the capture time of the sample, in agreement with local_clock();
Expand All @@ -512,6 +512,20 @@ class stream_outlet {
lsl_push_sample_vtp(obj.get(), (sample), timestamp, pushthrough);
}

/**
* Push a pointer to an array of buffers of variable size as one sample into the outlet.
*
* @param bufs A pointer to an array of data buffers.
* @param bytes An array of sizes (number of bytes) of buffers in bufs.
* @param nbufs Total number of buffers.
* @param timestamp Optionally the capture time of the sample, in agreement with local_clock();
* @param pushthrough Whether to push the sample through to the receivers immediately instead of
* concatenating with subsequent samples.
*/
void push_numeric_bufs(const char **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0, bool pushthrough = true) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bufs should be const void**

lsl_push_sample_buftpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs);
}


// ===================================================
// === Pushing an chunk of samples into the outlet ===
Expand Down
Loading