Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,14 @@ struct Configuration {
// Maximum bytes per fetch response per bucket for LogScanner (1 MB)
int32_t scanner_log_fetch_max_bytes_for_bucket{1024 * 1024};
int64_t writer_batch_timeout_ms{100};
// Whether to enable idempotent writes
bool writer_enable_idempotence{true};
// Maximum number of in-flight requests per bucket for idempotent writes
size_t writer_max_inflight_requests_per_bucket{5};
// Total memory available for buffering write batches (default 64MB)
size_t writer_buffer_memory_size{64 * 1024 * 1024};
// Maximum time in milliseconds to block waiting for buffer memory
uint64_t writer_buffer_wait_timeout_ms{std::numeric_limits<uint64_t>::max()};
// Connect timeout in milliseconds for TCP transport connect
uint64_t connect_timeout_ms{120000};
// Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth)
Expand Down
5 changes: 5 additions & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
ffi_config.scanner_log_fetch_wait_max_time_ms = config.scanner_log_fetch_wait_max_time_ms;
ffi_config.scanner_log_fetch_max_bytes_for_bucket = config.scanner_log_fetch_max_bytes_for_bucket;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
ffi_config.writer_enable_idempotence = config.writer_enable_idempotence;
ffi_config.writer_max_inflight_requests_per_bucket =
config.writer_max_inflight_requests_per_bucket;
ffi_config.writer_buffer_memory_size = config.writer_buffer_memory_size;
ffi_config.writer_buffer_wait_timeout_ms = config.writer_buffer_wait_timeout_ms;
ffi_config.connect_timeout_ms = config.connect_timeout_ms;
ffi_config.security_protocol = rust::String(config.security_protocol);
ffi_config.security_sasl_mechanism = rust::String(config.security_sasl_mechanism);
Expand Down
8 changes: 8 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ mod ffi {
scanner_log_fetch_wait_max_time_ms: i32,
scanner_log_fetch_max_bytes_for_bucket: i32,
writer_batch_timeout_ms: i64,
writer_enable_idempotence: bool,
writer_max_inflight_requests_per_bucket: usize,
writer_buffer_memory_size: usize,
writer_buffer_wait_timeout_ms: u64,
connect_timeout_ms: u64,
security_protocol: String,
security_sasl_mechanism: String,
Expand Down Expand Up @@ -676,6 +680,10 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult {
scanner_log_fetch_min_bytes: config.scanner_log_fetch_min_bytes,
scanner_log_fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms,
scanner_log_fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket,
writer_enable_idempotence: config.writer_enable_idempotence,
writer_max_inflight_requests_per_bucket: config.writer_max_inflight_requests_per_bucket,
writer_buffer_memory_size: config.writer_buffer_memory_size,
writer_buffer_wait_timeout_ms: config.writer_buffer_wait_timeout_ms,
connect_timeout_ms: config.connect_timeout_ms,
security_protocol: config.security_protocol.to_string(),
security_sasl_mechanism: config.security_sasl_mechanism.to_string(),
Expand Down
16 changes: 16 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ class Config:
@writer_batch_timeout_ms.setter
def writer_batch_timeout_ms(self, timeout: int) -> None: ...
@property
def writer_enable_idempotence(self) -> bool: ...
@writer_enable_idempotence.setter
def writer_enable_idempotence(self, enabled: bool) -> None: ...
@property
def writer_max_inflight_requests_per_bucket(self) -> int: ...
@writer_max_inflight_requests_per_bucket.setter
def writer_max_inflight_requests_per_bucket(self, num: int) -> None: ...
@property
def writer_buffer_memory_size(self) -> int: ...
@writer_buffer_memory_size.setter
def writer_buffer_memory_size(self, size: int) -> None: ...
@property
def writer_buffer_wait_timeout_ms(self) -> int: ...
@writer_buffer_wait_timeout_ms.setter
def writer_buffer_wait_timeout_ms(self, timeout: int) -> None: ...
@property
def connect_timeout_ms(self) -> int: ...
@connect_timeout_ms.setter
def connect_timeout_ms(self, timeout: int) -> None: ...
Expand Down
80 changes: 80 additions & 0 deletions bindings/python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,38 @@ impl Config {
))
})?;
}
"writer.enable-idempotence" => {
config.writer_enable_idempotence = match value.as_str() {
"true" => true,
"false" => false,
other => {
return Err(FlussError::new_err(format!(
"Invalid value '{other}' for '{key}', expected 'true' or 'false'"
)));
}
};
}
"writer.max-inflight-requests-per-bucket" => {
config.writer_max_inflight_requests_per_bucket =
value.parse::<usize>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"writer.buffer.memory-size" => {
config.writer_buffer_memory_size = value.parse::<usize>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"writer.buffer.wait-timeout-ms" => {
config.writer_buffer_wait_timeout_ms =
value.parse::<u64>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"writer.bucket.no-key-assigner" => {
config.writer_bucket_no_key_assigner =
value.parse::<fcore::config::NoKeyAssigner>().map_err(|e| {
Expand Down Expand Up @@ -296,6 +328,54 @@ impl Config {
Ok(())
}

/// Get whether idempotent writes are enabled
#[getter]
fn writer_enable_idempotence(&self) -> bool {
self.inner.writer_enable_idempotence
}

/// Set whether idempotent writes are enabled
#[setter]
fn set_writer_enable_idempotence(&mut self, enabled: bool) {
self.inner.writer_enable_idempotence = enabled;
}

/// Get the max in-flight requests per bucket
#[getter]
fn writer_max_inflight_requests_per_bucket(&self) -> usize {
self.inner.writer_max_inflight_requests_per_bucket
}

/// Set the max in-flight requests per bucket
#[setter]
fn set_writer_max_inflight_requests_per_bucket(&mut self, num: usize) {
self.inner.writer_max_inflight_requests_per_bucket = num;
}

/// Get the writer buffer memory size
#[getter]
fn writer_buffer_memory_size(&self) -> usize {
self.inner.writer_buffer_memory_size
}

/// Set the writer buffer memory size
#[setter]
fn set_writer_buffer_memory_size(&mut self, size: usize) {
self.inner.writer_buffer_memory_size = size;
}

/// Get the writer buffer wait timeout in milliseconds
#[getter]
fn writer_buffer_wait_timeout_ms(&self) -> u64 {
self.inner.writer_buffer_wait_timeout_ms
}

/// Set the writer buffer wait timeout in milliseconds
#[setter]
fn set_writer_buffer_wait_timeout_ms(&mut self, timeout: u64) {
self.inner.writer_buffer_wait_timeout_ms = timeout;
}

/// Get the connect timeout in milliseconds
#[getter]
fn connect_timeout_ms(&self) -> u64 {
Expand Down
4 changes: 4 additions & 0 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ use std::time::Duration;
use crate::error::{Error, FlussError, Result};
use crate::metadata::TablePath;

// TODO: implement `close(&self, timeout: Duration)` to gracefully shut down the
// writer client (drain pending batches, then force-close on timeout).
// Java's FlussConnection.close() calls writerClient.close(Long.MAX_VALUE).
// WriterClient::close() already exists but is never called from the public API.
pub struct FlussConnection {
metadata: Arc<Metadata>,
network_connects: Arc<RpcClient>,
Expand Down
Loading
Loading