Skip to content

Commit 6805917

Browse files
committed
[TASK-389] Idempotent writes
1 parent 2ac6efa commit 6805917

17 files changed

Lines changed: 2535 additions & 105 deletions

File tree

bindings/cpp/include/fluss.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,14 @@ struct Configuration {
998998
// Maximum number of records returned in a single call to Poll() for LogScanner
999999
size_t scanner_log_max_poll_records{500};
10001000
int64_t writer_batch_timeout_ms{100};
1001+
// Whether to enable idempotent writes
1002+
bool writer_enable_idempotence{true};
1003+
// Maximum number of in-flight requests per bucket for idempotent writes
1004+
size_t writer_max_inflight_requests_per_bucket{5};
1005+
// Total memory available for buffering write batches (default 64MB)
1006+
size_t writer_buffer_memory_size{64 * 1024 * 1024};
1007+
// Maximum time in milliseconds to block waiting for buffer memory
1008+
uint64_t writer_buffer_wait_timeout_ms{60000};
10011009
// Connect timeout in milliseconds for TCP transport connect
10021010
uint64_t connect_timeout_ms{120000};
10031011
// Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth)

bindings/cpp/src/ffi_converter.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
5757
ffi_config.scanner_remote_log_read_concurrency = config.scanner_remote_log_read_concurrency;
5858
ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records;
5959
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
60+
ffi_config.writer_enable_idempotence = config.writer_enable_idempotence;
61+
ffi_config.writer_max_inflight_requests_per_bucket =
62+
config.writer_max_inflight_requests_per_bucket;
63+
ffi_config.writer_buffer_memory_size = config.writer_buffer_memory_size;
64+
ffi_config.writer_buffer_wait_timeout_ms = config.writer_buffer_wait_timeout_ms;
6065
ffi_config.connect_timeout_ms = config.connect_timeout_ms;
6166
ffi_config.security_protocol = rust::String(config.security_protocol);
6267
ffi_config.security_sasl_mechanism = rust::String(config.security_sasl_mechanism);

bindings/cpp/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ mod ffi {
4949
scanner_remote_log_read_concurrency: usize,
5050
scanner_log_max_poll_records: usize,
5151
writer_batch_timeout_ms: i64,
52+
writer_enable_idempotence: bool,
53+
writer_max_inflight_requests_per_bucket: usize,
54+
writer_buffer_memory_size: usize,
55+
writer_buffer_wait_timeout_ms: u64,
5256
connect_timeout_ms: u64,
5357
security_protocol: String,
5458
security_sasl_mechanism: String,
@@ -653,6 +657,10 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
653657
remote_file_download_thread_num: config.remote_file_download_thread_num,
654658
scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency,
655659
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
660+
writer_enable_idempotence: config.writer_enable_idempotence,
661+
writer_max_inflight_requests_per_bucket: config.writer_max_inflight_requests_per_bucket,
662+
writer_buffer_memory_size: config.writer_buffer_memory_size,
663+
writer_buffer_wait_timeout_ms: config.writer_buffer_wait_timeout_ms,
656664
connect_timeout_ms: config.connect_timeout_ms,
657665
security_protocol: config.security_protocol.to_string(),
658666
security_sasl_mechanism: config.security_sasl_mechanism.to_string(),

bindings/python/fluss/__init__.pyi

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,22 @@ class Config:
174174
@writer_batch_timeout_ms.setter
175175
def writer_batch_timeout_ms(self, timeout: int) -> None: ...
176176
@property
177+
def writer_enable_idempotence(self) -> bool: ...
178+
@writer_enable_idempotence.setter
179+
def writer_enable_idempotence(self, enabled: bool) -> None: ...
180+
@property
181+
def writer_max_inflight_requests_per_bucket(self) -> int: ...
182+
@writer_max_inflight_requests_per_bucket.setter
183+
def writer_max_inflight_requests_per_bucket(self, num: int) -> None: ...
184+
@property
185+
def writer_buffer_memory_size(self) -> int: ...
186+
@writer_buffer_memory_size.setter
187+
def writer_buffer_memory_size(self, size: int) -> None: ...
188+
@property
189+
def writer_buffer_wait_timeout_ms(self) -> int: ...
190+
@writer_buffer_wait_timeout_ms.setter
191+
def writer_buffer_wait_timeout_ms(self, timeout: int) -> None: ...
192+
@property
177193
def connect_timeout_ms(self) -> int: ...
178194
@connect_timeout_ms.setter
179195
def connect_timeout_ms(self, timeout: int) -> None: ...

bindings/python/src/config.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,38 @@ impl Config {
9797
))
9898
})?;
9999
}
100+
"writer.enable-idempotence" => {
101+
config.writer_enable_idempotence = match value.as_str() {
102+
"true" => true,
103+
"false" => false,
104+
other => {
105+
return Err(FlussError::new_err(format!(
106+
"Invalid value '{other}' for '{key}', expected 'true' or 'false'"
107+
)));
108+
}
109+
};
110+
}
111+
"writer.max-inflight-requests-per-bucket" => {
112+
config.writer_max_inflight_requests_per_bucket =
113+
value.parse::<usize>().map_err(|e| {
114+
FlussError::new_err(format!(
115+
"Invalid value '{value}' for '{key}': {e}"
116+
))
117+
})?;
118+
}
119+
"writer.buffer.memory-size" => {
120+
config.writer_buffer_memory_size = value.parse::<usize>().map_err(|e| {
121+
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
122+
})?;
123+
}
124+
"writer.buffer.wait-timeout-ms" => {
125+
config.writer_buffer_wait_timeout_ms =
126+
value.parse::<u64>().map_err(|e| {
127+
FlussError::new_err(format!(
128+
"Invalid value '{value}' for '{key}': {e}"
129+
))
130+
})?;
131+
}
100132
"writer.bucket.no-key-assigner" => {
101133
config.writer_bucket_no_key_assigner = match value.as_str() {
102134
"round_robin" => fcore::config::NoKeyAssigner::RoundRobin,
@@ -255,6 +287,54 @@ impl Config {
255287
self.inner.writer_batch_timeout_ms = timeout;
256288
}
257289

290+
/// Get whether idempotent writes are enabled
291+
#[getter]
292+
fn writer_enable_idempotence(&self) -> bool {
293+
self.inner.writer_enable_idempotence
294+
}
295+
296+
/// Set whether idempotent writes are enabled
297+
#[setter]
298+
fn set_writer_enable_idempotence(&mut self, enabled: bool) {
299+
self.inner.writer_enable_idempotence = enabled;
300+
}
301+
302+
/// Get the max in-flight requests per bucket
303+
#[getter]
304+
fn writer_max_inflight_requests_per_bucket(&self) -> usize {
305+
self.inner.writer_max_inflight_requests_per_bucket
306+
}
307+
308+
/// Set the max in-flight requests per bucket
309+
#[setter]
310+
fn set_writer_max_inflight_requests_per_bucket(&mut self, num: usize) {
311+
self.inner.writer_max_inflight_requests_per_bucket = num;
312+
}
313+
314+
/// Get the writer buffer memory size
315+
#[getter]
316+
fn writer_buffer_memory_size(&self) -> usize {
317+
self.inner.writer_buffer_memory_size
318+
}
319+
320+
/// Set the writer buffer memory size
321+
#[setter]
322+
fn set_writer_buffer_memory_size(&mut self, size: usize) {
323+
self.inner.writer_buffer_memory_size = size;
324+
}
325+
326+
/// Get the writer buffer wait timeout in milliseconds
327+
#[getter]
328+
fn writer_buffer_wait_timeout_ms(&self) -> u64 {
329+
self.inner.writer_buffer_wait_timeout_ms
330+
}
331+
332+
/// Set the writer buffer wait timeout in milliseconds
333+
#[setter]
334+
fn set_writer_buffer_wait_timeout_ms(&mut self, timeout: u64) {
335+
self.inner.writer_buffer_wait_timeout_ms = timeout;
336+
}
337+
258338
/// Get the connect timeout in milliseconds
259339
#[getter]
260340
fn connect_timeout_ms(&self) -> u64 {

0 commit comments

Comments
 (0)