diff --git a/CMakeLists.txt b/CMakeLists.txt index 1db6a5e1..27e8f81c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required (VERSION 3.12) project (liblsl - VERSION 1.16.1 + VERSION 1.16.2 LANGUAGES C CXX DESCRIPTION "Labstreaminglayer C/C++ library" HOMEPAGE_URL "https://github.com/sccn/liblsl" @@ -345,6 +345,12 @@ if(NOT WIN32 AND LSL_TOOLS) target_link_libraries(blackhole PRIVATE Threads::Threads) target_include_directories(blackhole PRIVATE "thirdparty/asio/") installLSLApp(blackhole) + add_executable(spike testing/spike.cpp) + target_link_libraries(spike PRIVATE lsl) + installLSLApp(spike) + add_executable(flood testing/flood.c) + target_link_libraries(flood PRIVATE lsl) + installLSLApp(flood) endif() set(LSL_INSTALL_ROOT ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index eca4d200..c6e47e9d 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -47,3 +47,7 @@ addlslexample(TestSyncWithoutData cpp) target_link_libraries(TestSyncWithoutData PRIVATE Threads::Threads) +if(NOT WIN32) + addlslexample(SendDataCBlocking c) + target_link_libraries(SendDataCBlocking PRIVATE Threads::Threads) +endif() diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 139f6cfd..1e28785d 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -21,8 +21,9 @@ int main(int argc, char **argv) { double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.; bool flush = argc > 3; // resolve the stream of interest & make an inlet + int32_t buf_samples = (int32_t)(max_buffered * 1000); lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0); - lsl::stream_inlet inlet(inlet_info, (int32_t)max_buffered); + lsl::stream_inlet inlet(inlet_info, buf_samples, transp_bufsize_thousandths); // Use set_postprocessing to get the timestamps in a common base clock. // Do not use if this application will record timestamps to disk -- it is better to diff --git a/examples/SendData.cpp b/examples/SendData.cpp index f81ce9c4..a66a88ba 100644 --- a/examples/SendData.cpp +++ b/examples/SendData.cpp @@ -1,89 +1,115 @@ -#include "lsl_cpp.h" -#include -#include -#include -#include -#include - -/** - * This example program offers an 8-channel stream, float-formatted, that resembles EEG data. - * The example demonstrates also how per-channel meta-data can be specified using the .desc() field - * of the stream information object. - * - * Note that the timer used in the send loop of this program is not particularly accurate. - */ - - -const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"}; - -int main(int argc, char *argv[]) { - std::string name, type; - if (argc < 3) { - std::cout - << "This opens a stream under some user-defined name and with a user-defined content " - "type." - << std::endl; - std::cout << "SendData Name Type [n_channels=8] [srate=100] [max_buffered=360]" - << std::endl; - std::cout - << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without " - "the quotes)):" - << std::endl; - std::cin >> name >> type; - } else { - name = argv[1]; - type = argv[2]; - } - int n_channels = argc > 3 ? std::stol(argv[3]) : 8; - n_channels = n_channels < 8 ? 8 : n_channels; - int samplingrate = argc > 4 ? std::stol(argv[4]) : 100; - int max_buffered = argc > 5 ? std::stol(argv[5]) : 360; - - try { - - // make a new stream_info (100 Hz) - lsl::stream_info info( - name, type, n_channels, samplingrate, lsl::cf_float32, std::string(name) += type); - - // add some description fields - info.desc().append_child_value("manufacturer", "LSL"); - lsl::xml_element chns = info.desc().append_child("channels"); - for (int k = 0; k < n_channels; k++) - chns.append_child("channel") - .append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k + 1)) - .append_child_value("unit", "microvolts") - .append_child_value("type", type); - - // make a new outlet - lsl::stream_outlet outlet(info, 0, max_buffered); - std::vector sample(n_channels, 0.0); - - // Your device might have its own timer. Or you can decide how often to poll - // your device, as we do here. - int32_t sample_dur_us = 1000000 / (samplingrate > 0 ? samplingrate : 100); - auto t_start = std::chrono::high_resolution_clock::now(); - auto next_sample_time = t_start; - - // send data forever - std::cout << "Now sending data... " << std::endl; - double starttime = ((double)clock()) / CLOCKS_PER_SEC; - for (unsigned t = 0;; t++) { - // Create random data for the first 8 channels. - for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5); - // For the remaining channels, fill them with a sample counter (wraps at 1M). - std::fill(sample.begin() + 8, sample.end(), (float)(t % 1000000)); - - // Wait until the next expected sample time. - next_sample_time += std::chrono::microseconds(sample_dur_us); - std::this_thread::sleep_until(next_sample_time); - - // send the sample - std::cout << sample[0] << "\t" << sample[n_channels-1] << std::endl; - outlet.push_sample(sample); - } - - } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } - std::cout << "Press any key to exit. " << std::endl; - std::cin.get(); - return 0; -} +#include "lsl_cpp.h" +#include +#include +#include +#include +#include + +/** + * This example program offers an 8-channel stream, float-formatted, that resembles EEG data. + * The example demonstrates also how per-channel meta-data can be specified using the .desc() field + * of the stream information object. + * + * Note that the timer used in the send loop of this program is not particularly accurate. + */ + + +const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"}; + +int main(int argc, char *argv[]) { + std::string name, type; + if (argc < 3) { + std::cout + << "This opens a stream under some user-defined name and with a user-defined content " + "type." + << std::endl; + std::cout << "SendData Name Type n_channels[8] srate[100] max_buffered[360] sync[false] " + "contig[true]" + << std::endl; + std::cout + << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without " + "the quotes)):" + << std::endl; + std::cin >> name >> type; + } else { + name = argv[1]; + type = argv[2]; + } + int n_channels = argc > 3 ? std::stol(argv[3]) : 8; + n_channels = n_channels < 8 ? 8 : n_channels; + int samplingrate = argc > 4 ? std::stol(argv[4]) : 100; + int max_buffered = argc > 5 ? std::stol(argv[5]) : 360; + bool sync = argc > 6 ? std::stol(argv[6]) > 0 : false; + bool contig = argc > 7 ? std::stol(argv[7]) > 0 : true; + + try { + // if (!sync && !contig) { + // throw std::invalid_argument( "async is incompatible with discontig + //push_numeric_bufs (except for strings, not used here)." ); + // } + + // make a new stream_info (100 Hz) + lsl::stream_info info( + name, type, n_channels, samplingrate, lsl::cf_float32, std::string(name) += type); + + // add some description fields + info.desc().append_child_value("manufacturer", "LSL"); + lsl::xml_element chns = info.desc().append_child("channels"); + for (int k = 0; k < n_channels; k++) + chns.append_child("channel") + .append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k + 1)) + .append_child_value("unit", "microvolts") + .append_child_value("type", type); + + // make a new outlet + lsl::stream_outlet outlet( + info, 0, max_buffered, sync ? transp_sync_blocking : transp_default); + + // Initialize 2 discontiguous data arrays. + std::vector sample(8, 0.0); + std::vector extra(n_channels - 8, 0.0); + // If this is contiguous mode (default) then we combine the arrays. + if (contig) sample.insert(sample.end(), extra.begin(), extra.end()); + + // bytes is used in !contig mode because we need to know how big each buffer is. + std::array bytes = { + 8 * sizeof(float), static_cast((n_channels - 8) * sizeof(float))}; + + // Your device might have its own timer. Or you can decide how often to poll + // your device, as we do here. + int32_t sample_dur_us = 1000000 / (samplingrate > 0 ? samplingrate : 100); + auto t_start = std::chrono::high_resolution_clock::now(); + auto next_sample_time = t_start; + + // send data forever + std::cout << "Now sending data... " << std::endl; + for (unsigned t = 0;; t++) { + // Create random data for the first 8 channels. + for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5); + // For the remaining channels, fill them with a sample counter (wraps at 1M). + if (contig) + std::fill(sample.begin() + 8, sample.end(), (float)(t % 1000000)); + else + std::fill(extra.begin(), extra.end(), (float)(t % 1000000)); + + // Wait until the next expected sample time. + next_sample_time += std::chrono::microseconds(sample_dur_us); + std::this_thread::sleep_until(next_sample_time); + + // send the sample + if (contig) { + std::cout << sample[0] << "\t" << sample[n_channels-1] << std::endl; + outlet.push_sample(sample); + } else { + // Advanced: Push set of discontiguous buffers. + std::array bufs = {sample.data(), extra.data()}; + outlet.push_numeric_bufs( + (void **)bufs.data(), bytes.data(), 2, lsl::local_clock(), true); + } + } + + } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } + std::cout << "Press any key to exit. " << std::endl; + std::cin.get(); + return 0; +} diff --git a/examples/SendDataCBlocking.c b/examples/SendDataCBlocking.c new file mode 100644 index 00000000..56d7f37b --- /dev/null +++ b/examples/SendDataCBlocking.c @@ -0,0 +1,239 @@ +#include +#include +#include +#include +#include +#include +#include + +/** + * This example program pushes a 16-bit stream with a constant value across all channels. The + * channel number and rate are configurable via command line arguments. The data are not copied! + * This requires an inconvenient application design using threads and synchronization, but the + * this kind of design is necessary when frequent copying of large data chunks is too expensive. + * The main thread takes data from the device and puts it into a transmit buffer, and the secondary + * thread pushes the data from the transmit buffer into the lsl outlet then releases the data from + * the buffer. This is a relatively new and unusual design pattern for lsl. It uses a new BLOCKING + * function `push_chunk_wait`. Presently this only supports int16 types. This function passes + * the data pointer directly to asio to write data to each consumer and waits for asio to return. + * Note: One would usually prefer to use C++ niceities and queueing libraries (e.g., + * moodycamel::concurrentqueue) when working with big data across threads. However, the high- + * throughput data demands might be come from embedded devices with compiler limitations, + * so we use C here. Any input on how to speed up this C code is greatly appreciated. + */ + +#define DEVICE_BUFFER_SIZE 4194304 + +typedef struct { + char name[100]; + char serial[100]; + double last_timestamp; + int srate; + int nchans; + int write_idx; + int read_idx; + int min_frames_per_chunk; + int16_t channel_data[DEVICE_BUFFER_SIZE / 2]; // (channels * samples) < 2 million +} fake_device; + +typedef struct { + int thread_status; + int chunk_size; + double buffer_dur; + int do_async; +} thread_params; + +// Linked-list queue +typedef struct chunk_info { + int16_t *buf; + int n_frames; + double timestamp; +} chunk_info; +typedef struct node { + chunk_info info; + struct node *next; +} node; +typedef struct queue { + int count; + node *front; + node *rear; +} queue; +void initialize(queue *q) { + q->count = 0; + q->front = NULL; + q->rear = NULL; +} +int isempty(queue *q) { return (q->front == NULL); } +void enqueue(queue *q, int16_t *data, int frames, double ts) { + node *tmp; + tmp = malloc(sizeof(node)); + tmp->info.buf = data; + tmp->info.n_frames = frames; + tmp->info.timestamp = ts; + tmp->next = NULL; + if (isempty(q)) + q->front = q->rear = tmp; + else { + q->rear->next = tmp; + q->rear = tmp; + } + q->count++; +} +chunk_info dequeue(queue *q) { + chunk_info info = q->front->info; + node *tmp = q->front; + free(tmp); + q->front = q->front->next; + q->count--; + return (info); +} + +// Globals +fake_device *device = 0; +sem_t sem; +queue *q; + +// fetch_data -- Normally something provided by Device SDK +uint64_t fetch_data(int16_t **buffer) { + static int buf_samples = sizeof(device->channel_data) / sizeof(device->channel_data[0]); + + if (device->last_timestamp < 0) device->last_timestamp = lsl_local_clock(); + double now = lsl_local_clock(); + // Calculate how many frames/timestamps have elapsed since the last call. + uint64_t elapsed_frames = (uint64_t)((now - device->last_timestamp) * device->srate); + if (elapsed_frames < device->min_frames_per_chunk) elapsed_frames = 0; + // Cut this fetch short if it would go past the buffer. Next fetch will start at first idx. + if ((device->write_idx + elapsed_frames * device->nchans) > buf_samples) + elapsed_frames = (buf_samples - device->write_idx) / device->nchans; + // Further limit elapsed_samples to not overtake the read point (tail) + // if ((device->write_idx < device->read_idx) && + // (device->write_idx + (elapsed_frames * device->nchans) >= device->read_idx)) + // elapsed_frames = (device->read_idx - device->write_idx) / device->nchans; + if (elapsed_frames > 0) { + // New elapsed_time after accounting for rounding to integer frames. + device->last_timestamp += (double)(elapsed_frames) / device->srate; + // I assume that the device has its own acquisition buffer and that it copies data + // to a separate data buffer for API purposes. + // We are using a model where the device SDK shares its buffer with the client application. + // This is a bit unusual but allows for fastest throughput. + *buffer = &(device->channel_data[device->write_idx]); + + // And we advance the head for the next data transfer. + device->write_idx = (device->write_idx + elapsed_frames * device->nchans) % buf_samples; + if ((buf_samples - device->write_idx) < device->nchans) device->write_idx = 0; + } + return elapsed_frames; +} + +// transmit_thread -- responsible for popping data off the queue and pushing it to LSL +void transmit_thread(void *vargp) { + // Initialize thread-local variables + thread_params *params = (thread_params *)vargp; + + /* declare a new streaminfo */ + lsl_streaminfo info = lsl_create_streaminfo( + device->name, "TestLSL", device->nchans, device->srate, cft_int16, device->serial); + + /* add some meta-data fields to it */ + /* (for more standard fields, see https://github.com/sccn/xdf/wiki/Meta-Data) */ + lsl_xml_ptr desc = lsl_get_desc(info); + lsl_append_child_value(desc, "manufacturer", "LSL"); + lsl_xml_ptr chns = lsl_append_child(desc, "channels"); + char chanlabel[20]; + for (int c = 0; c < device->nchans; c++) { + lsl_xml_ptr chn = lsl_append_child(chns, "channel"); + snprintf(chanlabel, 20, "Chan-%d", c); + lsl_append_child_value(chn, "label", chanlabel); + lsl_append_child_value(chn, "unit", "microvolts"); + lsl_append_child_value(chn, "type", "EEG"); + } + + /* make a new outlet */ + lsl_outlet outlet = + lsl_create_outlet_ex(info, params->chunk_size, params->buffer_dur, params->do_async ? transp_sync_blocking : 0); + + printf("Now sending data...\n"); + params->thread_status = 1; + int buf_samples = sizeof(device->channel_data) / sizeof(device->channel_data[0]); + while (params->thread_status) { + sem_wait(&sem); + if (!isempty(q)) { + chunk_info chunk = dequeue(q); + int64_t chunk_samples = chunk.n_frames * device->nchans; + lsl_push_chunk_stp(outlet, chunk.buf, chunk_samples, chunk.timestamp, 1); + device->read_idx = (device->read_idx + chunk_samples) % buf_samples; + } + } + lsl_destroy_outlet(outlet); +} + +int main(int argc, char *argv[]) { + printf("SendDataCBlocking example program. Sends int16 data with minimal copies.\n"); + printf("Usage: %s [streamname] [streamuid] [srate] [nchans] [buff_dur] [do_async]\n", argv[0]); + printf("Using lsl %d, lsl_library_info: %s\n", lsl_library_version(), lsl_library_info()); + const char *name = argc > 1 ? argv[1] : "SendDataCBlocking"; + const char *uid = argc > 2 ? argv[2] : "6s45ahas321"; + int srate = argc > 3 ? strtol(argv[3], NULL, 10) : 512; + int n_chans = argc > 4 ? strtol(argv[4], NULL, 10) : 32; + double buff_dur = argc > 5 ? strtod(argv[5], NULL) : 60.; + int do_async = argc > 6 ? strtol(argv[6], NULL, 10) : 1; + int32_t samps_per_chunk = argc > 6 ? strtol(argv[6], NULL, 10) : 30; + + // Initialize our fake device and set its parameters. This would normally be taken care of + // by the device SDK. + device = (fake_device *)malloc(sizeof(fake_device)); + memset(device, 0, sizeof(fake_device)); + strcpy(device->name, name); + device->srate = srate; + device->nchans = n_chans; + device->last_timestamp = -1.; + device->min_frames_per_chunk = device->srate / 1000; + strcpy(device->serial, uid); + // Give the device buffer data some non-zero value. + memset(device->channel_data, 23, sizeof(device->channel_data)); + // write_idx and read_idx are OK at 0. + + thread_params params; + params.buffer_dur = buff_dur; + params.chunk_size = samps_per_chunk; + params.thread_status = 0; + params.do_async = do_async; + + // Initialize q + q = malloc(sizeof(queue)); + initialize(q); + + sem_init(&sem, 0, 0); + pthread_t thread_id; + if (pthread_create(&thread_id, NULL, (void *)&transmit_thread, ¶ms)) { + fprintf(stderr, "Error creating LSL transmit thread.\n"); + return 1; + } + + int exit_condition = 0; + int16_t *shared_buff; + double last_timestamp_received = -1.; + uint64_t n_frames_received = 0; + while (1) { + if (exit_condition) break; + + // Get data from device + n_frames_received = fetch_data(&shared_buff); + if (n_frames_received > 0) { + enqueue(q, shared_buff, n_frames_received, device->last_timestamp); + sem_post(&sem); + } + } + + if (params.thread_status) // Kill thread + { + params.thread_status = 0; + if (pthread_join(thread_id, NULL)) { + fprintf(stderr, "Error terminating LSL transmit thread.\n"); + } + } + sem_destroy(&sem); + free(device); + + return 0; +} diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index 5e83ce7d..5155138e 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -92,15 +92,25 @@ struct fake_device { int main(int argc, char **argv) { std::cout << "SendDataInChunks" << std::endl; - std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl; + std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered " + "chunk_rate nodata use_sync" + << std::endl; std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl; std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl; - + std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into " + "the buffer." + << std::endl; + std::cout << "- use_sync -- Set to non-zero to use blocking send." << std::endl; + std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"}; int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device. int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device. double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.; int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. + bool nodata = argc > 7; + bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true; + bool b_contig = true && do_sync; // Set true to test gather-write operations. + int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk @@ -118,16 +128,20 @@ int main(int argc, char **argv) { chn.append_child_value("type", type); } int32_t buf_samples = (int32_t)(max_buffered * samplingrate); - lsl::stream_outlet outlet(info, chunk_samples, buf_samples); + auto flags = static_cast( + (do_sync ? transp_sync_blocking : transp_default) | transp_bufsize_samples); + lsl::stream_outlet outlet(info, chunk_samples, buf_samples, flags); info = outlet.info(); // Refresh info with whatever the outlet captured. std::cout << "Stream UID: " << info.uid() << std::endl; // Create a connection to our device. - fake_device my_device(n_channels, (float)samplingrate); + int dev_chans = b_contig ? n_channels : n_channels + 1; + fake_device my_device(dev_chans, (float)samplingrate); // Prepare buffer to get data from 'device'. // The buffer should be larger than you think you need. Here we make it 4x as large. - std::vector chunk_buffer(4 * chunk_samples * n_channels); + std::vector dev_buffer(4 * chunk_samples * dev_chans); + std::fill(dev_buffer.begin(), dev_buffer.end(), 0); std::cout << "Now sending data..." << std::endl; @@ -141,13 +155,24 @@ int main(int argc, char **argv) { std::this_thread::sleep_until(next_chunk_time); // Get data from device - std::size_t returned_samples = my_device.get_data(chunk_buffer); + std::size_t returned_samples = my_device.get_data(dev_buffer, nodata); // send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches. // other push_chunk methods are easier but slightly slower. double ts = lsl::local_clock(); - outlet.push_chunk_multiplexed( - chunk_buffer.data(), returned_samples * n_channels, ts, true); + if (b_contig) { + // Push a chunk of a contiguous buffer. + outlet.push_chunk_multiplexed( + dev_buffer.data(), returned_samples * n_channels, ts, true); + } else { + std::cout << "Discontiguous push_chunk not yet supported." << std::endl; + std::cout << "See SendData.cpp for discontiguous push_sample, then set " + << std::endl; + std::cout << "timestamps as LSL_DEDUCED_TIMESTAMP and pushtrough as false " + << std::endl; + std::cout << "for all samples except the the first or last in a chunk." + << std::endl; + } } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } std::cout << "Press any key to exit. " << std::endl; diff --git a/include/lsl/common.h b/include/lsl/common.h index 3c81c798..153df274 100644 --- a/include/lsl/common.h +++ b/include/lsl/common.h @@ -161,6 +161,9 @@ typedef enum { /// The supplied max_buf should be scaled by 0.001. transp_bufsize_thousandths = 2, + /// The outlet will use synchronous (blocking) calls to asio to push data + transp_sync_blocking = 4, + // prevent compilers from assuming an instance fits in a single byte _lsl_transport_options_maxval = 0x7f000000 } lsl_transport_options_t; diff --git a/include/lsl/outlet.h b/include/lsl/outlet.h index 87c32538..b79ebbdd 100644 --- a/include/lsl/outlet.h +++ b/include/lsl/outlet.h @@ -99,9 +99,11 @@ extern LIBLSL_C_API int32_t lsl_push_sample_vtp(lsl_outlet out, const void *data /** @copybrief lsl_push_sample_ftp * @see lsl_push_sample_ftp * @param out The lsl_outlet object through which to push the data. - * @param data A pointer to values to push. The number of values pointed to must be no less than the - * number of channels in the sample. - * @param lengths A pointer the number of elements to push for each channel (string lengths). + * @param data An array of data buffers to push. The number of buffers in the array must be no less + * than the number of channels in the sample. Each entry in data must be longer than the + * corresponding entry in `lengths`. + * @param lengths An array containing the lengths of each buffer in data. Units are string lengths + * or number of bytes. */ extern LIBLSL_C_API int32_t lsl_push_sample_buf(lsl_outlet out, const char **data, const uint32_t *lengths); /** @copydoc lsl_push_sample_buf @@ -111,6 +113,17 @@ extern LIBLSL_C_API int32_t lsl_push_sample_buft(lsl_outlet out, const char **da * @param pushthrough @see lsl_push_sample_ftp */ extern LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough); +/** @copydoc lsl_push_sample_buftp + * @param data An array of data buffers to push. The number of buffers in the array must be no less + * than `nbufs`. + * @param bytes An array comprising the number of bytes in each buffer. The number of entries in + * bytes must be no less than `nbufs`. + * @param nbufs The number of values pointed to in `data` and equivalently the number of items in + * `bytes`. + */ +extern LIBLSL_C_API int32_t lsl_push_sample_rawtpn(lsl_outlet out, void **data, + const uint32_t *bytes, double timestamp, int32_t pushthrough, uint32_t nbufs); + /** Push a chunk of multiplexed samples into the outlet. One timestamp per sample is provided. * * @attention Note that the provided buffer size is measured in channel values (e.g. floats) rather diff --git a/include/lsl_cpp.h b/include/lsl_cpp.h index 72d58819..0db1633b 100644 --- a/include/lsl_cpp.h +++ b/include/lsl_cpp.h @@ -513,6 +513,21 @@ class stream_outlet { lsl_push_sample_vtp(obj.get(), (sample), timestamp, pushthrough); } + /** + * Push a pointer to an array of buffers of variable size as one sample into the outlet. + * + * @param bufs A pointer to an array of data buffers. + * @param bytes An array of sizes (number of bytes) of buffers in bufs. + * @param nbufs Total number of buffers. + * @param timestamp Optionally the capture time of the sample, in agreement with local_clock(); + * @param pushthrough Whether to push the sample through to the receivers immediately instead of + * concatenating with subsequent samples. + */ + void push_numeric_bufs(void **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0, + bool pushthrough = true) { + lsl_push_sample_rawtpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs); + } + // =================================================== // === Pushing an chunk of samples into the outlet === diff --git a/src/api_config.cpp b/src/api_config.cpp index 7c7635ee..f0f93405 100644 --- a/src/api_config.cpp +++ b/src/api_config.cpp @@ -105,6 +105,12 @@ void api_config::load_from_file(const std::string &filename) { } else loguru::g_stderr_verbosity = log_level; + // log config filename only after setting the verbosity level + if (!filename.empty()) + LOG_F(INFO, "Configuration loaded from %s", filename.c_str()); + else + LOG_F(INFO, "Loaded default config"); + // read out the [ports] parameters multicast_port_ = pt.get("ports.MulticastPort", 16571); base_port_ = pt.get("ports.BasePort", 16572); @@ -263,12 +269,6 @@ void api_config::load_from_file(const std::string &filename) { smoothing_halftime_ = pt.get("tuning.SmoothingHalftime", 90.0F); force_default_timestamps_ = pt.get("tuning.ForceDefaultTimestamps", false); - // log config filename only after setting the verbosity level and all config has been read - if (!filename.empty()) - LOG_F(INFO, "Configuration loaded from %s", filename.c_str()); - else - LOG_F(INFO, "Loaded default config"); - } catch (std::exception &e) { LOG_F(ERROR, "Error parsing config file '%s': '%s', rolling back to defaults", filename.c_str(), e.what()); diff --git a/src/lsl_outlet_c.cpp b/src/lsl_outlet_c.cpp index a8472407..411b1759 100644 --- a/src/lsl_outlet_c.cpp +++ b/src/lsl_outlet_c.cpp @@ -160,14 +160,74 @@ LIBLSL_C_API int32_t lsl_push_sample_buft( LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough) { + stream_outlet_impl *outimpl = out; + // As the number of bytes-per-buffer is the same as the number of chars-per-buffer, + // we can pass `lengths` through as `bytes`. + return lsl_push_sample_rawtpn(out, (void **)data, lengths, timestamp, pushthrough, + (uint32_t)outimpl->info().channel_count()); +} + +LIBLSL_C_API int32_t lsl_push_sample_rawtpn(lsl_outlet out, void **data, const uint32_t *bytes, + double timestamp, int32_t pushthrough, uint32_t nbufs) { + stream_outlet_impl *outimpl = out; try { - stream_outlet_impl *outimpl = out; - std::vector tmp; - for (uint32_t k = 0; k < (uint32_t)outimpl->info().channel_count(); k++) - tmp.emplace_back(data[k], lengths[k]); - return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + if (outimpl->is_sync_blocking()) { + // Convert input to a vector of asio buffers for a gather-write operation. + std::vector bufs; + bufs.reserve(nbufs); + for (auto buf_ix = 0; buf_ix < nbufs; buf_ix++) { + bufs.push_back(asio::buffer(data[buf_ix], bytes[buf_ix])); + } + return outimpl->push_sample_gather(bufs, timestamp, pushthrough); + } else { + // Make contiguous. + if (outimpl->info().channel_format() == cft_string) { + // For strings we place in std::string vector to make sure they are properly + // terminated. + std::vector tmp; + for (uint32_t k = 0; k < nbufs; k++) + tmp.emplace_back((const char *)data[k], bytes[k]); + return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + } else { + // Otherwise we put into new memory block. + uint32_t total_bytes = 0, byte_offset = 0; + for (size_t k = 0; k < nbufs; k++) { total_bytes += bytes[k]; } + char *tmp = (char *)malloc(total_bytes); + for (size_t k = 0; k < nbufs; k++) { + memcpy(&tmp[byte_offset], data[k], bytes[k]); + byte_offset += bytes[k]; + } + // TODO: I tried passing void buffer but eventually fail because the convert + // functions + // become ambiguous. + lsl_error_code_t ec; + switch (outimpl->info().channel_format()) { + case cft_int8: + ec = outimpl->push_sample_noexcept((const char *)tmp, timestamp, pushthrough); + case cft_int16: + ec = + outimpl->push_sample_noexcept((const int16_t *)tmp, timestamp, pushthrough); + case cft_int32: + ec = + outimpl->push_sample_noexcept((const int32_t *)tmp, timestamp, pushthrough); + case cft_int64: + ec = + outimpl->push_sample_noexcept((const int64_t *)tmp, timestamp, pushthrough); + case cft_float32: + ec = outimpl->push_sample_noexcept((const float *)tmp, timestamp, pushthrough); + case cft_double64: + ec = outimpl->push_sample_noexcept((const double *)tmp, timestamp, pushthrough); + case cft_undefined: ec = lsl_internal_error; + } + free(tmp); + return ec; + } + } } catch (std::exception &e) { LOG_F(WARNING, "Unexpected error during push_sample: %s", e.what()); + if (!outimpl->is_sync_blocking() && outimpl->info().channel_format() != cft_string) + LOG_F(ERROR, "lsl_push_sample_buftpn only compatible with string type or when outlet " + "is using sync writes."); return lsl_internal_error; } } diff --git a/src/sample.h b/src/sample.h index b951d7d7..e57c6d10 100644 --- a/src/sample.h +++ b/src/sample.h @@ -54,6 +54,8 @@ class factory { /// Reclaim a sample that's no longer used. void reclaim_sample(sample *s); + std::size_t datasize() const { return format_sizes[fmt_] * static_cast(num_chans_); } + private: /// Pop a sample from the freelist (multi-producer/single-consumer queue by Dmitry Vjukov) sample *pop_freelist(); diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index 299414d1..e00c8a0c 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -6,6 +6,7 @@ #include "tcp_server.h" #include "udp_server.h" #include +#include #include #include @@ -22,8 +23,16 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu chunk_size_(info.calc_transport_buf_samples(requested_bufsize, flags)), info_(std::make_shared(info)), send_buffer_(std::make_shared(chunk_size_)), - io_ctx_data_(std::make_shared(1)), + do_sync_(flags & transp_sync_blocking), io_ctx_data_(std::make_shared(1)), io_ctx_service_(std::make_shared(1)) { + + if ((info.channel_format() == cft_string) && do_sync_) + throw std::invalid_argument("Synchronous push not supported for string-formatted streams."); + + // reserver space for sync timestamps so `push_back` doesn't caused reallocations + // to invalidate pointers to elements + if(do_sync_) sync_timestamps_.reserve(chunk_size_); + ensure_lsl_initialized(); const api_config *cfg = api_config::get_instance(); @@ -41,7 +50,7 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu // create TCP data server tcp_server_ = std::make_shared(info_, io_ctx_data_, send_buffer_, sample_factory_, - chunk_size_, cfg->allow_ipv4(), cfg->allow_ipv6()); + chunk_size_, cfg->allow_ipv4(), cfg->allow_ipv6(), do_sync_); // fail if both stacks failed to instantiate if (udp_servers_.empty()) @@ -53,10 +62,8 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu for (auto &responder : responders_) responder->begin_serving(); // and start the IO threads to handle them - const std::string name{"IO_" + this->info().name().substr(0, 11)}; for (const auto &io : {io_ctx_data_, io_ctx_service_}) - io_threads_.emplace_back(std::make_shared([io, name]() { - loguru::set_thread_name(name.c_str()); + io_threads_.emplace_back(std::make_shared([io]() { while (true) { try { io->run(); @@ -66,6 +73,10 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu } } })); + + const std::string name{this->info().name().substr(0, 11)}; + asio::post(*io_ctx_data_, [name]() { loguru::set_thread_name(("IO_" + name).c_str()); }); + asio::post(*io_ctx_service_, [name]() { loguru::set_thread_name(("SVC_" + name).c_str()); }); } void stream_outlet_impl::instantiate_stack(udp udp_protocol) { @@ -147,8 +158,12 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; sample_p smp( sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough)); - smp->assign_untyped(data); - send_buffer_->push_sample(smp); + if (!do_sync_) { + smp->assign_untyped(data); // Note: Makes a copy! + send_buffer_->push_sample(smp); + } else { + enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough); + } } bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); } @@ -157,13 +172,44 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) { return send_buffer_->wait_for_consumers(timeout); } +void stream_outlet_impl::push_timestamp_sync(const double& timestamp) { + static_assert(TAG_TRANSMITTED_TIMESTAMP == 2, "Unexpected TAG_TRANSMITTED_TIMESTAMP"); + const uint64_t ENDIAN_SAFE_TAG_TRANSMITTED = (2LL << 28) | 2LL; + if (timestamp == DEDUCED_TIMESTAMP) { + sync_buffs_.emplace_back(&TAG_DEDUCED_TIMESTAMP, 1); + } else { + sync_timestamps_.emplace_back(ENDIAN_SAFE_TAG_TRANSMITTED, timestamp); + // add a pointer to the memory region containing |TAG_TRANSMITTED_TIMESTAMP|timestamp + // one byte for the tag, 8 for the timestamp + sync_buffs_.emplace_back(reinterpret_cast(&sync_timestamps_.back()) + 7, 9); + } +} + +void stream_outlet_impl::pushthrough_sync() { + // LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size()); + tcp_server_->write_all_blocking(sync_buffs_); + sync_buffs_.clear(); + sync_timestamps_.clear(); +} + +void stream_outlet_impl::enqueue_sync( + asio::const_buffer buff, const double& timestamp, bool pushthrough) { + push_timestamp_sync(timestamp); + sync_buffs_.push_back(buff); + if (pushthrough) pushthrough_sync(); +} + template void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrough) { - if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; - sample_p smp( - sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough)); - smp->assign_typed(data); - send_buffer_->push_sample(smp); + if (timestamp == 0.0 || lsl::api_config::get_instance()->force_default_timestamps()) timestamp = lsl_local_clock(); + if (!do_sync_) { + sample_p smp( + sample_factory_->new_sample(timestamp, pushthrough)); + smp->assign_typed(data); + send_buffer_->push_sample(smp); + } else { + enqueue_sync(asio::buffer(data, sample_factory_->datasize()), timestamp, pushthrough); + } } template void stream_outlet_impl::enqueue(const char *data, double, bool); @@ -174,4 +220,11 @@ template void stream_outlet_impl::enqueue(const float *data, double, bool template void stream_outlet_impl::enqueue(const double *data, double, bool); template void stream_outlet_impl::enqueue(const std::string *data, double, bool); +void stream_outlet_impl::enqueue_sync_multi( + std::vector buffs, const double& timestamp, bool pushthrough) { + push_timestamp_sync(timestamp); + sync_buffs_.insert(sync_buffs_.end(), buffs.begin(), buffs.end()); + if (pushthrough) pushthrough_sync(); +} + } // namespace lsl diff --git a/src/stream_outlet_impl.h b/src/stream_outlet_impl.h index 92488006..54d6c118 100644 --- a/src/stream_outlet_impl.h +++ b/src/stream_outlet_impl.h @@ -4,6 +4,7 @@ #include "common.h" #include "forward.h" #include "stream_info_impl.h" +#include #include #include #include @@ -136,7 +137,11 @@ class stream_outlet_impl { void push_sample(const std::string *data, double timestamp = 0.0, bool pushthrough = true) { enqueue(data, timestamp, pushthrough); } - + lsl_error_code_t push_sample_gather( + std::vector buffs, double timestamp = 0.0, bool pushthrough = true) { + enqueue_sync_multi(buffs, timestamp, pushthrough); + return lsl_no_error; + } template inline lsl_error_code_t push_sample_noexcept( @@ -251,7 +256,6 @@ class stream_outlet_impl { throw std::runtime_error("The number of buffer elements to send is not a multiple of " "the stream's channel count."); if (num_samples > 0) { - if (timestamp == 0.0) timestamp = lsl_clock(); if (info().nominal_srate() != IRREGULAR_RATE) timestamp = timestamp - (num_samples - 1) / info().nominal_srate(); push_sample(buffer, timestamp, pushthrough && (num_samples == 1)); @@ -294,6 +298,9 @@ class stream_outlet_impl { /// Wait until some consumer shows up. bool wait_for_consumers(double timeout = FOREVER); + /// If the outlet is intended to use synchronous blocking transfers + bool is_sync_blocking() { return do_sync_; }; + private: /// Instantiate a new server stack. void instantiate_stack(udp udp_protocol); @@ -301,6 +308,25 @@ class stream_outlet_impl { /// Allocate and enqueue a new sample into the send buffer. template void enqueue(const T *data, double timestamp, bool pushthrough); + /// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single + /// timestamp. + void push_timestamp_sync(const double& timestamp); + + /// push sync_buffs_ through each tcp server. + void pushthrough_sync(); + + /// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the + /// server. + void enqueue_sync(asio::const_buffer buff, const double& timestamp, bool pushthrough); + + /** + * Append a single timestamp and multiple within-sample buffers to sync_buffs_. + * This is useful when a sample is discontiguous in memory. It makes no assumptions about how + * many channels are included in each buffer. + */ + void enqueue_sync_multi( + std::vector buffs, const double& timestamp, bool pushthrough); + /** * Check whether some given number of channels matches the stream's channel_count. * Throws an error if not. @@ -319,6 +345,8 @@ class stream_outlet_impl { stream_info_impl_p info_; /// the single-producer, multiple-receiver send buffer send_buffer_p send_buffer_; + /// Flag to indicate that push_* operations should be blocking synchronous. false by default. + bool do_sync_; /// the IO service objects io_context_p io_ctx_data_, io_ctx_service_; @@ -331,6 +359,10 @@ class stream_outlet_impl { std::vector responders_; /// threads that handle the I/O operations (two per stack: one for UDP and one for TCP) std::vector io_threads_; + /// buffers used in synchronous call to gather-write data directly to the socket. + std::vector sync_buffs_; + /// timestamp buffer for sync transfers + std::vector> sync_timestamps_; }; } // namespace lsl diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 90dc9531..1b06cb50 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -65,7 +65,9 @@ class client_session : public std::enable_shared_from_this { public: /// Instantiate a new session & its socket. client_session(const tcp_server_p &serv, tcp_socket &&sock) - : io_(serv->io_), serv_(serv), sock_(std::move(sock)), requeststream_(&requestbuf_) {} + : io_(serv->io_), serv_(serv), sock_(std::move(sock)), requeststream_(&requestbuf_) { + LOG_F(1, "Initialized client session %p", this); + } /// Destructor. ~client_session(); @@ -142,10 +144,31 @@ class client_session : public std::enable_shared_from_this { std::condition_variable completion_cond_; }; +class sync_transfer_handler { + bool transfer_is_sync_; + // sockets that should receive data in sync mode + std::vector sync_sockets_; + // io context for sync mode, app is responsible for running it + asio::io_context io_ctx_; +public: + sync_transfer_handler(): io_ctx_(1) { + + } + + /// schedules a native socket handle to be added the next time a push operation is done + void add_socket(const tcp_socket::native_handle_type handle, tcp_socket::protocol_type protocol) { + asio::post(io_ctx_, [=](){ + sync_sockets_.push_back(std::make_unique(io_ctx_, protocol, handle)); + }); + } + void write_all_blocking(const std::vector &bufs); +}; + tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, - factory_p factory, int chunk_size, bool allow_v4, bool allow_v6) + factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync) : chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)), factory_(std::move(factory)), send_buffer_(std::move(sendbuf)) { + if (do_sync) sync_handler = std::make_unique(); // assign connection-dependent fields info_->session_id(api_config::get_instance()->session_id()); info_->reset_uid(); @@ -178,6 +201,11 @@ tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p s throw std::runtime_error("Failed to instantiate socket acceptors for the TCP server"); } +tcp_server::~tcp_server() noexcept +{ + // defined here so the compiler can generate the destructor for the sync_handler +} + // === externally issued asynchronous commands === @@ -209,12 +237,9 @@ void tcp_server::end_serving() { void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) { try { - // Select the IO context for handling the socket - auto &sock_io_ctx = *io_; - - // accept a connection on the session's socket - acceptor->async_accept(sock_io_ctx, [shared_this = shared_from_this(), &acceptor]( - err_t err, tcp_socket sock) { + // accept a new connection + acceptor->async_accept(*io_, [shared_this = shared_from_this(), &acceptor]( + err_t err, tcp_socket sock) { if (err == asio::error::operation_aborted || err == asio::error::shut_down) return; // no error: create a new session and start processing @@ -232,7 +257,54 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) { } -// === graceful cancellation of in-flight sockets === +// === synchronous transfer + +void tcp_server::write_all_blocking(const std::vector &bufs) +{ + sync_handler->write_all_blocking(bufs); +} + +void sync_transfer_handler::write_all_blocking(const std::vector &bufs) { + bool any_session_broken = false; + + for (auto &sock : sync_sockets_) { + asio::async_write(*sock, bufs, + [this, &sock, &any_session_broken]( + const asio::error_code &ec, size_t bytes_transferred) { + switch (ec.value()) { + case 0: break; // success + case asio::error::broken_pipe: + case asio::error::connection_reset: + LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket."); + any_session_broken = true; + asio::post(io_ctx_, [sock]() { + asio::error_code close_ec; + sock->close(close_ec); + }); + break; + case asio::error::operation_aborted: + LOG_F(INFO, "Socket wasn't fast enough"); + break; + default: + LOG_F(ERROR, "Unhandled write_all_blocking error: %s.", ec.message().c_str()); + } + }); + } + try { + // prepare the io context for new work + io_ctx_.restart(); + io_ctx_.run(); + + if (any_session_broken) { + // remove sessions whose socket was closed + auto new_end_it = std::remove_if(sync_sockets_.begin(), sync_sockets_.end(), + [](const tcp_socket_p &sock) { + return !sock->is_open(); + }); + sync_sockets_.erase(new_end_it, sync_sockets_.end()); + } + } catch (std::exception &e) { LOG_F(ERROR, "Error during write_all_blocking: %s", e.what()); } +} void tcp_server::register_inflight_session(const std::shared_ptr &session) { std::lock_guard lock(inflight_mut_); @@ -535,6 +607,18 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) { // convenient for unit tests if (max_buffered_ <= 0) return; + if (serv->sync_handler) { + LOG_F(INFO, "Using synchronous blocking transfers for new client session."); + auto protocol = sock_.local_endpoint().protocol(); + // move the socket into the sync_transfer_io_ctx by releasing it from this + // io ctx and re-creating it with sync_transfer_io_ctx. + // See https://stackoverflow.com/q/52671836/73299 + // Then schedule the sync_transfer_io_ctx to add it to the list of sync sockets + serv->sync_handler->add_socket(sock_.release(), protocol); + serv->unregister_inflight_session(this); + return; + } + // determine transfer parameters auto queue = serv->send_buffer_->new_consumer(max_buffered_); diff --git a/src/tcp_server.h b/src/tcp_server.h index 37314a15..94795cbf 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -47,9 +47,13 @@ class tcp_server : public std::enable_shared_from_this { * @param protocol The protocol (IPv4 or IPv6) that shall be serviced by this server. * @param chunk_size The preferred chunk size, in samples. If 0, the pushthrough flag determines * the effective chunking. + * @param do_sync Set true to indicate data transfer should happen synchronously in a blocking + * call. Default false -- asynchronous transfer in a thread (copies data). */ tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory, - int chunk_size, bool allow_v4, bool allow_v6); + int chunk_size, bool allow_v4, bool allow_v6, bool do_sync = false); + + ~tcp_server() noexcept; /** * Begin serving TCP connections. @@ -67,6 +71,12 @@ class tcp_server : public std::enable_shared_from_this { */ void end_serving(); + /** + * Write directly to each socket. This should only be used when server initialized with + * do_sync = true. + */ + void write_all_blocking(const std::vector& bufs); + private: friend class client_session; @@ -95,6 +105,10 @@ class tcp_server : public std::enable_shared_from_this { // acceptor socket tcp_acceptor_p acceptor_v4_, acceptor_v6_; // our server socket + + // optional pointer to a handler class for synchronous transfers + std::unique_ptr sync_handler; + // registry of in-flight asessions (for cancellation) std::map> inflight_; std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access diff --git a/testing/flood.c b/testing/flood.c new file mode 100644 index 00000000..1eb175d4 --- /dev/null +++ b/testing/flood.c @@ -0,0 +1,61 @@ +#include +#include +#include +#include +#include +#include + +static double start_t; +static int64_t samples_pushed = 0; + +void handle_signal(int signal) { + double time = lsl_local_clock() - start_t; + printf("%ld samples pushed in %fs, %d samples/s\n", samples_pushed, time, + (int)(samples_pushed / time)); + if (signal == SIGTERM || signal == SIGINT) exit(0); + start_t = lsl_local_clock(); + samples_pushed = 0; +} + +int main(int argc, char *argv[]) { + signal(SIGTERM, handle_signal); + signal(SIGINT, handle_signal); +#ifdef SIGUSR1 + signal(SIGUSR1, handle_signal); +#endif +#ifdef SIGCONT + signal(SIGCONT, handle_signal); +#endif + printf("LSL inlet stress tester. Sends [nchan] uint16 channels as fast as possible.\n"); + printf("Usage: %s [streamname] [nchan=56] [chunksize=500]\n", argv[0]); + printf("Using lsl %d, lsl_library_info: %s\n\n", lsl_library_version(), lsl_library_info()); + + const char *name = argc > 1 ? argv[1] : "Flood"; + const int nchan = argc > 2 ? strtol(argv[2], NULL, 10) : 56; + const int samples_per_chunk = argc > 3 ? strtol(argv[3], NULL, 10) : 500; + const char uid[] = "325wqer4354"; + + /* declare a new streaminfo (name: SendDataC / argument 1, content type: EEG, 8 channels, 500 + * Hz, float values, some made-up device id (can also be empty) */ + lsl_streaminfo info = lsl_create_streaminfo(name, "Bench", nchan, 50000, cft_int16, uid); + + /* make a new outlet (chunking: default, buffering: 360 seconds) */ + lsl_outlet outlet = lsl_create_outlet_ex(info, 0, 360, transp_sync_blocking); + char *infoxml = lsl_get_xml(lsl_get_info(outlet)); + printf("Streaminfo: %s\n", infoxml); + lsl_destroy_string(infoxml); + + const int buf_elements = nchan * samples_per_chunk; + int16_t *buf = malloc(buf_elements * 2); + memset(buf, 0xab, buf_elements * sizeof(buf[0])); + + printf("Now sending data...\n"); + start_t = lsl_local_clock(); + + for (int t = 0;; t++) { + lsl_push_chunk_s(outlet, buf, buf_elements); + samples_pushed += samples_per_chunk; + } + lsl_destroy_outlet(outlet); + return 0; +} diff --git a/testing/spike.cpp b/testing/spike.cpp new file mode 100644 index 00000000..9d78b4db --- /dev/null +++ b/testing/spike.cpp @@ -0,0 +1,50 @@ +#include "lsl_cpp.h" +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"}; + +int main(int argc, char *argv[]) { + int n_channels = argc > 1 ? std::stoi(argv[1]) : 200; + int samplingrate = argc > 2 ? std::stoi(argv[2]) : 50000; + int max_buffered = argc > 3 ? std::stoi(argv[3]) : 360; + bool sync = argc > 4 ? std::stoi(argv[4]) > 0 : true; + + try { + lsl::stream_info info("Spike", "bench", n_channels, samplingrate, lsl::cf_int16, "Spike"); + lsl::stream_outlet outlet( + info, 0, max_buffered, sync ? transp_sync_blocking : transp_default); + + std::vector chunk(n_channels * samplingrate / 1000, 5); + + const auto t_start = std::chrono::high_resolution_clock::now(); + auto next_sample_time = t_start; + const auto time_per_chunk = std::chrono::microseconds(8 / samplingrate); + + // send data forever + std::cout << "Now sending data... " << std::endl; + for (unsigned t = 0;; t++) { + if (t % 3 == 0) { + for (int s = 0; s < 5; ++s) + outlet.push_chunk_multiplexed(chunk); + // Wait until the next expected sample time. + } else { + for (int s = 0; s < 10; ++s) { + outlet.push_sample(chunk.data()); + std::this_thread::sleep_for(100us); + } + } + next_sample_time += time_per_chunk; + std::this_thread::sleep_until(next_sample_time); + } + + } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } + std::cout << "Press any key to exit. " << std::endl; + std::cin.get(); + return 0; +}