Skip to content

Commit a524303

Browse files
authored
feat: protocol version 2 and n-dimensional array ingestion support
1 parent 107a3eb commit a524303

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+5987
-1004
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 4.0.4
2+
current_version = 5.0.0-rc1
33
commit = False
44
tag = False
55

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ cmake-build-release
99
.cache
1010
questdb-rs/Cargo.lock
1111
include/questdb/ingress/line_sender.gen.h
12-
cython/questdb/ingress/line_sender.pxd
12+
cython/questdb/ingress/line_sender.pxd
13+
profile.out

CMakeLists.txt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
cmake_minimum_required(VERSION 3.15.0)
2-
project(c-questdb-client VERSION 4.0.4)
2+
project(c-questdb-client VERSION 5.0.0)
3+
set(PROJECT_PRE_RELEASE "rc1")
34

45
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
56
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
@@ -98,6 +99,14 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
9899
line_sender_c_example
99100
examples/concat.c
100101
examples/line_sender_c_example.c)
102+
compile_example(
103+
line_sender_c_example_array_byte_strides
104+
examples/concat.c
105+
examples/line_sender_c_example_array_byte_strides.c)
106+
compile_example(
107+
line_sender_c_example_array_elem_strides
108+
examples/concat.c
109+
examples/line_sender_c_example_array_elem_strides.c)
101110
compile_example(
102111
line_sender_c_example_auth
103112
examples/concat.c
@@ -123,6 +132,12 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
123132
compile_example(
124133
line_sender_cpp_example
125134
examples/line_sender_cpp_example.cpp)
135+
compile_example(
136+
line_sender_cpp_example_array_byte_strides
137+
examples/line_sender_cpp_example_array_byte_strides.cpp)
138+
compile_example(
139+
line_sender_cpp_example_array_elem_strides
140+
examples/line_sender_cpp_example_array_elem_strides.cpp)
126141
compile_example(
127142
line_sender_cpp_example_auth
128143
examples/line_sender_cpp_example_auth.cpp)

README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33

44
This library makes it easy to insert data into [QuestDB](https://questdb.io/).
55

6-
This client library implements the [InfluxDB Line Protocol](
6+
This client library implements the QuestDB's variant of the [InfluxDB Line Protocol](
77
https://questdb.io/docs/reference/api/ilp/overview/) (ILP) over HTTP and TCP.
88

9+
When connecting to QuestDB over HTTP, the library will auto-detect the server's
10+
latest supported version and use it. Version 1 is compatible with
11+
the [InfluxDB Database](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/).
12+
913
* Implementation is in Rust, with no additional
1014
[run-time or link-time dependencies](doc/BUILD.md#pre-requisites-and-dependencies)
1115
on the C++ standard library or other libraries.
@@ -38,6 +42,21 @@ For an overview and code examples, see the
3842
To understand the protocol in more depth, consult the
3943
[protocol reference docs](https://questdb.io/docs/reference/api/ilp/overview/).
4044

45+
## Protocol Versions
46+
47+
The library supports the following ILP protocol versions.
48+
49+
These protocol versions are supported over both HTTP and TCP.
50+
51+
If you use HTTP, the library will automatically detect the server's
52+
latest supported protocol version and use it. If you use TCP, you can specify the
53+
`protocol_version=N` parameter when constructing the `Sender` object.
54+
55+
| Version | Description | Server Comatibility |
56+
| ------- | ------------------------------------------------------- | --------------------- |
57+
| **1** | Over HTTP it's compatible InfluxDB Line Protocol (ILP) | All QuestDB versions |
58+
| **2** | 64-bit floats sent as binary, adds n-dimentional arrays | 8.4.0+ (2023-10-30) |
59+
4160
## Getting Started
4261

4362
To get started, read the language-specific guides.

ci/compile.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ steps:
44
rustup default $(toolchain)
55
condition: ne(variables['toolchain'], '')
66
displayName: "Update and set Rust toolchain"
7+
- script: |
8+
python -m pip install --upgrade pip
9+
pip install numpy
10+
displayName: 'Install Python Dependencies'
711
- script: cmake -S . -B build -DCMAKE_BUILD_TYPE=Release -DQUESTDB_TESTS_AND_EXAMPLES=ON
812
env:
913
JAVA_HOME: $(JAVA_HOME_11_X64)

ci/run_all_tests.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def main():
4141
build_cxx20_dir.glob(f'**/test_line_sender{exe_suffix}')))
4242

4343
system_test_path = pathlib.Path('system_test') / 'test.py'
44-
qdb_v = '8.2.3' # The version of QuestDB we'll test against.
44+
#qdb_v = '8.2.3' # The version of QuestDB we'll test against.
4545

4646
run_cmd('cargo', 'test',
4747
'--', '--nocapture', cwd='questdb-rs')
@@ -51,9 +51,11 @@ def main():
5151
'--', '--nocapture', cwd='questdb-rs')
5252
run_cmd('cargo', 'test', '--features=almost-all-features',
5353
'--', '--nocapture', cwd='questdb-rs')
54+
run_cmd('cargo', 'test', cwd='questdb-rs-ffi')
5455
run_cmd(str(test_line_sender_path))
5556
run_cmd(str(test_line_sender_path_CXX20))
56-
run_cmd('python3', str(system_test_path), 'run', '--versions', qdb_v, '-v')
57+
#run_cmd('python3', str(system_test_path), 'run', '--versions', qdb_v, '-v')
58+
run_cmd('python3', str(system_test_path), 'run', '--repo', './questdb_nd_arr', '-v')
5759

5860

5961
if __name__ == '__main__':

ci/run_tests_pipeline.yaml

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,25 @@ stages:
5454
cd questdb-rs
5555
cargo build --examples --features almost-all-features
5656
displayName: "Build Rust examples"
57+
############################# temp for test begin #####################
58+
- script: |
59+
git clone -b nd_arr --depth 1 https://github.com/questdb/questdb.git ./questdb_nd_arr
60+
displayName: git clone questdb
61+
- task: Maven@3
62+
displayName: "Compile QuestDB"
63+
inputs:
64+
mavenPOMFile: 'questdb_nd_arr/pom.xml'
65+
jdkVersionOption: '1.11'
66+
options: "-DskipTests -Pbuild-web-console"
67+
############################# temp for test end #####################
5768
- script: python3 ci/run_all_tests.py
5869
env:
5970
JAVA_HOME: $(JAVA_HOME_11_X64)
6071
displayName: "Tests"
61-
- task: PublishBuildArtifacts@1
62-
inputs:
63-
pathToPublish: ./build
64-
displayName: "Publish build directory"
72+
# - task: PublishBuildArtifacts@1
73+
# inputs:
74+
# pathToPublish: ./build
75+
# displayName: "Publish build directory"
6576
- job: FormatAndLinting
6677
displayName: "cargo fmt and clippy"
6778
pool:
@@ -115,7 +126,7 @@ stages:
115126
submodules: false
116127
- template: compile.yaml
117128
- script: |
118-
git clone --depth 1 https://github.com/questdb/questdb.git
129+
git clone -b nd_arr --depth 1 https://github.com/questdb/questdb.git
119130
displayName: git clone questdb
120131
- task: Maven@3
121132
displayName: "Compile QuestDB"

cpp_test/mock_server.cpp

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "mock_server.hpp"
2626

2727
#include <string.h>
28+
#include <string>
2829

2930
#if defined(PLATFORM_UNIX)
3031
# include <fcntl.h>
@@ -190,43 +191,81 @@ bool mock_server::wait_for_data(std::optional<double> wait_timeout_sec)
190191
return !!count;
191192
}
192193

194+
int32_t bytes_to_int32_le(const std::byte* bytes)
195+
{
196+
return static_cast<int32_t>(
197+
(bytes[0] << 0) | (bytes[1] << 8) | (bytes[2] << 16) |
198+
(bytes[3] << 24));
199+
}
200+
193201
size_t mock_server::recv(double wait_timeout_sec)
194202
{
195203
if (!wait_for_data(wait_timeout_sec))
196204
return 0;
197205

198-
char chunk[1024];
206+
std::byte chunk[1024];
199207
size_t chunk_len{sizeof(chunk)};
200-
std::vector<char> accum;
208+
std::vector<std::byte> accum;
201209
for (;;)
202210
{
203211
wait_for_data();
204-
sock_ssize_t count =
205-
::recv(_conn_fd, &chunk[0], static_cast<sock_len_t>(chunk_len), 0);
212+
sock_ssize_t count = ::recv(
213+
_conn_fd,
214+
reinterpret_cast<char*>(&chunk[0]),
215+
static_cast<sock_len_t>(chunk_len),
216+
0);
206217
if (count == -1)
207218
throw std::runtime_error{"Bad `recv()`."};
208219
const size_t u_count = static_cast<size_t>(count);
209220
accum.insert(accum.end(), chunk, chunk + u_count);
210221
if (accum.size() < 2)
211222
continue;
212-
if ((accum[accum.size() - 1] == '\n') &&
213-
(accum[accum.size() - 2] != '\\'))
223+
if ((accum[accum.size() - 1] == std::byte('\n')) &&
224+
(accum[accum.size() - 2] != std::byte('\\')))
214225
break;
215226
}
216227

217228
size_t received_count{0};
218-
const char* head{&accum[0]};
219-
for (size_t index = 1; index < accum.size(); ++index)
229+
const std::byte* head{&accum[0]};
230+
size_t index{1};
231+
while (index < accum.size())
220232
{
221-
const char& last = accum[index];
222-
const char& prev = accum[index - 1];
223-
if ((last == '\n') && (prev != '\\'))
233+
const std::byte& last = accum[index];
234+
const std::byte& prev = accum[index - 1];
235+
if (last == std::byte('=') && prev == std::byte('='))
236+
{
237+
index++;
238+
std::byte& binary_type = accum[index];
239+
if (binary_type == std::byte(16)) // DOUBLE_BINARY_FORMAT_TYPE
240+
index += sizeof(double) + 1;
241+
else if (binary_type == std::byte(14)) // ARRAY_BINARY_FORMAT_TYPE
242+
{
243+
index++;
244+
const std::byte& array_elem_type = accum[index];
245+
if (array_elem_type == std::byte(10))
246+
{
247+
index++;
248+
const size_t dims = size_t(accum[index]);
249+
index++;
250+
size_t data_size{sizeof(double)};
251+
for (size_t i = 0; i < dims; i++)
252+
{
253+
data_size *= bytes_to_int32_le(&accum[index]);
254+
index += sizeof(int32_t);
255+
}
256+
index += data_size;
257+
}
258+
}
259+
continue;
260+
}
261+
else if ((last == std::byte('\n')) && (prev != std::byte('\\')))
224262
{
225-
const char* tail{&last + 1};
226-
_msgs.emplace_back(head, tail - head);
263+
const std::byte* tail{&last + 1};
264+
_msgs.emplace_back(head, tail);
227265
head = tail;
228266
++received_count;
229267
}
268+
index++;
230269
}
231270
return received_count;
232271
}

cpp_test/mock_server.hpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424

2525
#pragma once
2626

27+
#include <cassert>
2728
#include <vector>
28-
#include <string>
2929
#include <cstdint>
3030
#include <optional>
3131
#include <stdexcept>
32-
3332
#include "build_env.h"
33+
#if __cplusplus < 202002L
34+
# include "questdb/ingress/line_sender.hpp"
35+
#else
36+
# include <span>
37+
#endif
3438

3539
#if defined(PLATFORM_UNIX)
3640
typedef int socketfd_t;
@@ -60,9 +64,14 @@ class mock_server
6064

6165
size_t recv(double wait_timeout_sec = 0.1);
6266

63-
const std::vector<std::string>& msgs() const
67+
#if __cplusplus >= 202002L
68+
using buffer_view = std::span<const std::byte>;
69+
#endif
70+
71+
buffer_view msgs(size_t index) const
6472
{
65-
return _msgs;
73+
assert(index < _msgs.size());
74+
return {_msgs[index].data(), _msgs[index].size()};
6675
}
6776

6877
void close();
@@ -75,7 +84,7 @@ class mock_server
7584
socketfd_t _listen_fd;
7685
socketfd_t _conn_fd;
7786
uint16_t _port;
78-
std::vector<std::string> _msgs;
87+
std::vector<std::vector<std::byte>> _msgs;
7988
};
8089

8190
} // namespace questdb::ingress::test

0 commit comments

Comments
 (0)