Skip to content

[client] Allow configuring scanner fetch parameters#417

Merged
luoyuxia merged 5 commits into
apache:mainfrom
Prajwal-banakar:configure-scanner
Mar 12, 2026
Merged

[client] Allow configuring scanner fetch parameters#417
luoyuxia merged 5 commits into
apache:mainfrom
Prajwal-banakar:configure-scanner

Conversation

@Prajwal-banakar

Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #391

Make log scanner fetch parameters configurable (max/min bytes and max wait time) instead of using hardcoded constants, so users can tune scan behavior via Config.

Brief change log

  • Removed hardcoded scanner fetch constants from client/table/scanner.rs.
  • Added fetch tuning fields to LogFetcher and wired them from crate::config::Config.
  • Updated fetch request construction to use configured values (including per-bucket max fetch cap).
  • Added/updated unit tests to validate configured fetch params are applied.

Tests

  • cargo fmt --all
  • cargo clippy -p fluss-rs --all-features
  • cargo test -p fluss-rs
  • Doc-tests: cargo test -p fluss-rs --doc

API and Format

  • API: Yes (adds/uses scanner fetch tuning knobs in Config; no breaking changes intended if defaults remain the same)
  • Storage format: No

Documentation

  • No new user-facing docs added (existing configuration/usage remains the same; defaults unchanged).

@fresh-borzoni fresh-borzoni left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Prajwal-banakar Ty for the PR, left some comments, PTAL


/// Maximum bytes per fetch response for LogScanner.
/// Default: 16777216 (16MB)
#[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New config fields need Python bindings (FfiConfig + .pyi stub), C++ bindings, and website docs (configuration.md for both languages). Also missing from api-reference.md.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bindings and docs updated, PTAL!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, ffi_converter.hpp wiring is still missing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching it, updated!

max_fetch_bytes: 1024 * 1024,
max_fetch_bytes: self
.fetch_max_bytes
.min(DEFAULT_BUCKET_MAX_FETCH_BYTES),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This couples the total-response limit with the per-bucket cap, which is incorrect
Java has client.scanner.log.fetch.max-bytes-for-bucket for this

projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
config
.validate_scanner_fetch()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mb we should validate during connection creation like we do with SASL fields?

@fresh-borzoni

Copy link
Copy Markdown
Member

@Prajwal-banakar Ty, but I don't see cpp bindings have been wired

Comment thread crates/fluss/src/client/connection.rs Outdated
pub async fn new(arg: Config) -> Result<Self> {
arg.validate_security()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_scanner_fetch() // ← add this

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i just wired cpp bindings, PTAL!

@fresh-borzoni fresh-borzoni left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, @Prajwal-banakar Left comments, PTAL


/// Maximum bytes per fetch response for LogScanner.
/// Default: 16777216 (16MB)
#[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, ffi_converter.hpp wiring is still missing

"scanner_log_fetch_min_bytes",
&self.scanner_log_fetch_min_bytes,
)
.field(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing scanner_log_fetch_max_bytes_for_bucket

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Comment thread crates/fluss/src/config.rs Outdated
pub writer_batch_timeout_ms: i64,

/// Maximum bytes per fetch response **per bucket** for LogScanner.
/// Default: 1048576 (1MB) (or whatever DEFAULT_BUCKET_MAX_FETCH_BYTES is)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove 'or whatever ...' part, it's inconsistent

@fresh-borzoni fresh-borzoni left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Prajwal-banakar TY, left minor comment, PTAL

Comment thread bindings/cpp/src/ffi_converter.hpp Outdated
ffi_config.scanner_log_fetch_wait_max_time_ms = config.scanner_log_fetch_wait_max_time_ms;
ffi_config.scanner_log_fetch_max_bytes_for_bucket = config.scanner_log_fetch_max_bytes_for_bucket;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated line

Comment thread bindings/python/src/config.rs Outdated
))
})?;
}
"scanner.log.fetch-max-bytes" => {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java uses scanner.log.fetch prefix, we should match it, also don't forget docs changes here

@Prajwal-banakar

Copy link
Copy Markdown
Contributor Author

Hi @fresh-borzoni sorry for the repeated back-and-forth I'm still getting familiar with the codebase. Fixed the remaining issues, Thanks for the patience and thorough review!

@fresh-borzoni fresh-borzoni left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Prajwal-banakar All good, I understand it takes time, so happy to review and help.

Thank you, LGTM

@luoyuxia luoyuxia left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Prajwal-banakar Could you please rebase main branch, then i'll merge it.

@Prajwal-banakar

Copy link
Copy Markdown
Contributor Author

Hi @luoyuxia i've rebased PTAL!

@luoyuxia luoyuxia merged commit 8642ea4 into apache:main Mar 12, 2026
9 checks passed
@Prajwal-banakar Prajwal-banakar deleted the configure-scanner branch March 12, 2026 04:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow to configure scanner fetch parameters

3 participants