Skip to content

Commit 847f5fe

Browse files
chore: introduce max poll records config option (#355)
1 parent c17d224 commit 847f5fe

11 files changed

Lines changed: 73 additions & 24 deletions

File tree

bindings/cpp/include/fluss.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,8 @@ struct Configuration {
983983
size_t scanner_remote_log_prefetch_num{4};
984984
// Number of threads for downloading remote log data
985985
size_t remote_file_download_thread_num{3};
986+
// Maximum number of records returned in a single call to Poll() for LogScanner
987+
size_t scanner_log_max_poll_records{500};
986988
};
987989

988990
class Connection {

bindings/cpp/src/ffi_converter.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
5353
ffi_config.writer_batch_size = config.writer_batch_size;
5454
ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num;
5555
ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num;
56+
ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records;
5657
return ffi_config;
5758
}
5859

bindings/cpp/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ mod ffi {
4545
writer_batch_size: i32,
4646
scanner_remote_log_prefetch_num: usize,
4747
remote_file_download_thread_num: usize,
48+
scanner_log_max_poll_records: usize,
4849
}
4950

5051
struct FfiResult {
@@ -614,6 +615,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
614615
writer_batch_size: config.writer_batch_size,
615616
scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num,
616617
remote_file_download_thread_num: config.remote_file_download_thread_num,
618+
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
617619
};
618620

619621
let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config).await });

bindings/python/fluss/__init__.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ class Config:
161161
def remote_file_download_thread_num(self) -> int: ...
162162
@remote_file_download_thread_num.setter
163163
def remote_file_download_thread_num(self, num: int) -> None: ...
164+
@property
165+
def scanner_log_max_poll_records(self) -> int: ...
166+
@scanner_log_max_poll_records.setter
167+
def scanner_log_max_poll_records(self, num: int) -> None: ...
164168

165169
class FlussConnection:
166170
@staticmethod

bindings/python/src/config.rs

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,32 +43,46 @@ impl Config {
4343
config.bootstrap_servers = value;
4444
}
4545
"writer.request-max-size" => {
46-
if let Ok(size) = value.parse::<i32>() {
47-
config.writer_request_max_size = size;
48-
}
46+
config.writer_request_max_size = value.parse::<i32>().map_err(|e| {
47+
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
48+
})?;
4949
}
5050
"writer.acks" => {
5151
config.writer_acks = value;
5252
}
5353
"writer.retries" => {
54-
if let Ok(retries) = value.parse::<i32>() {
55-
config.writer_retries = retries;
56-
}
54+
config.writer_retries = value.parse::<i32>().map_err(|e| {
55+
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
56+
})?;
5757
}
5858
"writer.batch-size" => {
59-
if let Ok(size) = value.parse::<i32>() {
60-
config.writer_batch_size = size;
61-
}
59+
config.writer_batch_size = value.parse::<i32>().map_err(|e| {
60+
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
61+
})?;
6262
}
6363
"scanner.remote-log.prefetch-num" => {
64-
if let Ok(num) = value.parse::<usize>() {
65-
config.scanner_remote_log_prefetch_num = num;
66-
}
64+
config.scanner_remote_log_prefetch_num =
65+
value.parse::<usize>().map_err(|e| {
66+
FlussError::new_err(format!(
67+
"Invalid value '{value}' for '{key}': {e}"
68+
))
69+
})?;
6770
}
6871
"remote-file.download-thread-num" => {
69-
if let Ok(num) = value.parse::<usize>() {
70-
config.remote_file_download_thread_num = num;
71-
}
72+
config.remote_file_download_thread_num =
73+
value.parse::<usize>().map_err(|e| {
74+
FlussError::new_err(format!(
75+
"Invalid value '{value}' for '{key}': {e}"
76+
))
77+
})?;
78+
}
79+
"scanner.log.max-poll-records" => {
80+
config.scanner_log_max_poll_records =
81+
value.parse::<usize>().map_err(|e| {
82+
FlussError::new_err(format!(
83+
"Invalid value '{value}' for '{key}': {e}"
84+
))
85+
})?;
7286
}
7387
_ => {
7488
return Err(FlussError::new_err(format!("Unknown property: {key}")));
@@ -163,6 +177,18 @@ impl Config {
163177
fn set_remote_file_download_thread_num(&mut self, num: usize) {
164178
self.inner.remote_file_download_thread_num = num;
165179
}
180+
181+
/// Get the scanner log max poll records
182+
#[getter]
183+
fn scanner_log_max_poll_records(&self) -> usize {
184+
self.inner.scanner_log_max_poll_records
185+
}
186+
187+
/// Set the scanner log max poll records
188+
#[setter]
189+
fn set_scanner_log_max_poll_records(&mut self, num: usize) {
190+
self.inner.scanner_log_max_poll_records = num;
191+
}
166192
}
167193

168194
impl Config {

crates/fluss/src/client/table/scanner.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ struct LogFetcher {
636636
security_token_manager: Arc<SecurityTokenManager>,
637637
log_fetch_buffer: Arc<LogFetchBuffer>,
638638
nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
639+
max_poll_records: usize,
639640
}
640641

641642
struct FetchResponseContext {
@@ -694,6 +695,7 @@ impl LogFetcher {
694695
security_token_manager,
695696
log_fetch_buffer,
696697
nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())),
698+
max_poll_records: config.scanner_log_max_poll_records,
697699
})
698700
}
699701

@@ -1092,9 +1094,8 @@ impl LogFetcher {
10921094
/// Collect completed fetches from buffer
10931095
/// Reference: LogFetchCollector.collectFetch in Java
10941096
fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
1095-
const MAX_POLL_RECORDS: usize = 500; // Default max poll records
10961097
let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
1097-
let mut records_remaining = MAX_POLL_RECORDS;
1098+
let mut records_remaining = self.max_poll_records;
10981099

10991100
let collect_result: Result<()> = {
11001101
while records_remaining > 0 {

crates/fluss/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024;
2424
const DEFAULT_RETRIES: i32 = i32::MAX;
2525
const DEFAULT_PREFETCH_NUM: usize = 4;
2626
const DEFAULT_DOWNLOAD_THREADS: usize = 3;
27+
const DEFAULT_MAX_POLL_RECORDS: usize = 500;
2728

2829
const DEFAULT_ACKS: &str = "all";
2930

@@ -54,6 +55,11 @@ pub struct Config {
5455
/// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM)
5556
#[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)]
5657
pub remote_file_download_thread_num: usize,
58+
59+
/// Maximum number of records returned in a single call to poll() for LogScanner.
60+
/// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
61+
#[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
62+
pub scanner_log_max_poll_records: usize,
5763
}
5864

5965
impl Default for Config {
@@ -66,6 +72,7 @@ impl Default for Config {
6672
writer_batch_size: DEFAULT_WRITER_BATCH_SIZE,
6773
scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
6874
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
75+
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
6976
}
7077
}
7178
}

website/docs/user-guide/cpp/api-reference.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Complete API reference for the Fluss C++ client.
2424
| `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes |
2525
| `scanner_remote_log_prefetch_num` | `size_t` | `4` | Number of remote log segments to prefetch |
2626
| `remote_file_download_thread_num` | `size_t` | `3` | Number of threads for remote log downloads |
27+
| `scanner_log_max_poll_records` | `size_t` | `500` | Maximum number of records returned in a single Poll() |
2728

2829
## `Connection`
2930

website/docs/user-guide/python/api-reference.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ Complete API reference for the Fluss Python client.
1010
| Method / Property | Description |
1111
|------------------------------------|-----------------------------------------------------------|
1212
| `Config(properties: dict = None)` | Create config from a dict of key-value pairs |
13-
| `.bootstrap_servers` | Get/set coordinator server address |
14-
| `.writer_request_max_size` | Get/set max request size in bytes |
15-
| `.writer_acks` | Get/set acknowledgment setting (`"all"` for all replicas) |
16-
| `.writer_retries` | Get/set number of retries on failure |
17-
| `.writer_batch_size` | Get/set write batch size in bytes |
18-
| `.scanner_remote_log_prefetch_num` | Get/set number of remote log segments to prefetch |
19-
| `.remote_file_download_thread_num` | Get/set number of threads for remote log downloads |
13+
| `bootstrap_servers` | Get/set coordinator server address |
14+
| `writer_request_max_size` | Get/set max request size in bytes |
15+
| `writer_acks` | Get/set acknowledgment setting (`"all"` for all replicas) |
16+
| `writer_retries` | Get/set number of retries on failure |
17+
| `writer_batch_size` | Get/set write batch size in bytes |
18+
| `scanner_remote_log_prefetch_num` | Get/set number of remote log segments to prefetch |
19+
| `remote_file_download_thread_num` | Get/set number of threads for remote log downloads |
20+
| `scanner_log_max_poll_records` | Get/set max number of records returned in a single poll() |
2021

2122
## `FlussConnection`
2223

website/docs/user-guide/python/example/configuration.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ with await fluss.FlussConnection.create(config) as conn:
2828
| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
2929
| `writer.retries` | Number of retries on failure | `2147483647` |
3030
| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) |
31+
| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` |
32+
| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` |
33+
| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` |
3134

3235
Remember to close the connection when done:
3336

0 commit comments

Comments
 (0)