Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 242 additions & 0 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::config::Config;
use crate::error::Error::UnsupportedOperation;
use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath};
use crate::metrics::{SCANNER_POLL_IDLE_RATIO, SCANNER_TIME_BETWEEN_POLL_MS};
use crate::proto::{
ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable,
};
Expand Down Expand Up @@ -277,6 +278,58 @@ struct LogScannerInner {
/// Guards against subscription changes while a
/// [`crate::client::RecordBatchLogReader`] is iterating.
reader_active: std::sync::atomic::AtomicBool,
/// Holds the snapshot fields used by [`PollGuard`] to derive the
/// scanner poll-timing metrics. The mutex makes each individual
/// `record_poll_start` / `record_poll_end` call atomic, but the
/// start↔end pairing depends on the single-consumer contract
/// documented on [`LogScanner::poll`] and
/// [`RecordBatchLogScanner::poll`] (mirrors Java's
/// `LogScannerImpl.acquire()`). Overlapping polls on the same
/// scanner trip a `debug_assert!` in `record_poll_start`.
poll_state: Mutex<PollState>,
}

/// Snapshot state used to derive the scanner poll-timing metrics.
///
/// The mutex makes each `record_poll_start` / `record_poll_end` call
/// atomic with respect to itself. It does **not** by itself preserve
/// start↔end pairing across overlapping `poll()` calls — that invariant
/// relies on the single-consumer contract that mirrors Java's
/// `LogScannerImpl.acquire()`. Concurrent polls on the same scanner are
/// detected by a `debug_assert!` in `record_poll_start` to surface
/// contract violations in tests; release builds favor low overhead and
/// assume callers honor the contract.
#[derive(Default, Debug)]
struct PollState {
/// Instant captured at the most recent `record_poll_start()`. `None`
/// before the first poll.
last_poll_at: Option<Instant>,
/// Instant captured at the start of the in-flight poll. `None` after
/// the last `record_poll_end()`.
poll_start_at: Option<Instant>,
/// Cached ms between the two most recent poll starts, used to compute
/// `poll_idle_ratio` in `record_poll_end`.
time_between_poll_ms: f64,
}

/// Pairs `record_poll_start` with `record_poll_end`. Created
/// at the top of `poll_records` / `poll_batches`; `record_poll_end` runs on
/// drop, including the cancellation path (caller drops the future).
struct PollGuard<'a> {
inner: &'a LogScannerInner,
}

impl<'a> PollGuard<'a> {
fn new(inner: &'a LogScannerInner) -> Self {
inner.record_poll_start();
Self { inner }
}
}

impl Drop for PollGuard<'_> {
fn drop(&mut self) {
self.inner.record_poll_end();
}
}

impl LogScannerInner {
Expand Down Expand Up @@ -318,6 +371,7 @@ impl LogScannerInner {
)?,
arrow_schema,
reader_active: std::sync::atomic::AtomicBool::new(false),
poll_state: Mutex::new(PollState::default()),
})
}

Expand All @@ -336,6 +390,10 @@ impl LogScannerInner {
}

async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
// Pairs record_poll_start (now) with record_poll_end
// (drop). Runs on every exit, including the cancellation path
// where the caller drops this future.
let _poll_guard = PollGuard::new(self);
let start = Instant::now();
let deadline = start + timeout;

Expand Down Expand Up @@ -374,6 +432,51 @@ impl LogScannerInner {
}
}

/// Records the start of a `poll()` call and emits
/// `SCANNER_TIME_BETWEEN_POLL_MS`. The first poll emits `0.0`,
/// matching Java's `ScannerMetricGroup.recordPollStart`
/// (`timeMsBetweenPoll = lastPollMs != 0L ? pollStartMs - lastPollMs : 0L`).
///
/// In debug builds, panics if a previous poll has not yet recorded
/// its end — that indicates a concurrent `poll()` on the same scanner,
/// which violates the single-consumer contract (Java enforces this
/// with `LogScannerImpl.acquire()` and throws
/// `ConcurrentModificationException`).
fn record_poll_start(&self) {
let now = Instant::now();
let mut state = self.poll_state.lock();
debug_assert!(
state.poll_start_at.is_none(),
"concurrent poll() detected on the same scanner; \
LogScanner / RecordBatchLogScanner are single-consumer \
(see LogScannerImpl.acquire() for Java parity)"
);
let between_ms = match state.last_poll_at {
Some(prev) => now.duration_since(prev).as_secs_f64() * 1000.0,
None => 0.0,
};
state.time_between_poll_ms = between_ms;
metrics::gauge!(SCANNER_TIME_BETWEEN_POLL_MS).set(between_ms);
state.last_poll_at = Some(now);
state.poll_start_at = Some(now);
}

/// Computes `poll_idle_ratio = poll_time / (poll_time + between_time)`.
/// On the first poll, `between_time` is 0 so the ratio is 1.0
/// (poll-bound).
fn record_poll_end(&self) {
let now = Instant::now();
let mut state = self.poll_state.lock();
let Some(start) = state.poll_start_at.take() else {
return;
};
let poll_time_ms = now.duration_since(start).as_secs_f64() * 1000.0;
let total = poll_time_ms + state.time_between_poll_ms;
if total > 0.0 {
metrics::gauge!(SCANNER_POLL_IDLE_RATIO).set(poll_time_ms / total);
}
}

async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
self.check_no_active_reader()?;
if self.is_partitioned_table {
Expand Down Expand Up @@ -516,6 +619,7 @@ impl LogScannerInner {
}

async fn poll_batches(&self, timeout: Duration) -> Result<Vec<ScanBatch>> {
let _poll_guard = PollGuard::new(self);
let start = Instant::now();
let deadline = start + timeout;

Expand Down Expand Up @@ -2204,4 +2308,142 @@ mod tests {
}
Ok(())
}

/// Builds a self-contained `LogScannerInner` for poll-timing tests
/// inside a `current_thread` runtime so callers can drive `PollGuard`
/// lifecycles synchronously.
fn with_test_log_scanner_inner<F: FnOnce(&LogScannerInner)>(body: F) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build current_thread runtime");
rt.block_on(async {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let table_info = build_table_info(table_path.clone(), 1, 1);
let cluster = build_cluster_arc(&table_path, 1, 1);
let metadata = Arc::new(Metadata::new_for_test(cluster));
let inner = LogScannerInner::new(
&table_info,
metadata,
Arc::new(RpcClient::new()),
&Config::default(),
None,
)
.expect("build LogScannerInner");
body(&inner);
});
}

fn snapshot_gauge(
snapshotter: &metrics_util::debugging::Snapshotter,
name: &str,
) -> Option<f64> {
use metrics_util::debugging::DebugValue;
snapshotter
.snapshot()
.into_vec()
.into_iter()
.find_map(|(key, _, _, val)| {
if key.key().name() == name {
if let DebugValue::Gauge(g) = val {
return Some(g.into_inner());
}
}
None
})
}

/// Exercises the `PollGuard` lifecycle across two consecutive
/// `record_poll_start` calls. Asserts both poll-timing gauges are
/// emitted at the right moments and `record_poll_end` runs on guard
/// drop (also the cancellation-safety path, since dropping the
/// `poll()` future drops the guard).
#[test]
fn poll_guard_emits_time_between_poll_and_idle_ratio() {
use crate::metrics::{SCANNER_POLL_IDLE_RATIO, SCANNER_TIME_BETWEEN_POLL_MS};
use metrics_util::debugging::DebuggingRecorder;

let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
with_test_log_scanner_inner(|inner| {
// First poll: emits time_between_poll_ms=0 (Java parity:
// ScannerMetricGroup.recordPollStart emits 0 when there is
// no previous poll). Idle ratio is also emitted as 1.0
// on drop (poll_time / (poll_time + 0) = 1.0).
{
let _g = PollGuard::new(inner);
std::thread::sleep(std::time::Duration::from_millis(5));
}

// Brief gap so time_between_poll_ms is observably > 0.
std::thread::sleep(std::time::Duration::from_millis(5));

// Second poll: refreshes both time_between_poll_ms (>0)
// and a fresh idle ratio.
{
let _g = PollGuard::new(inner);
std::thread::sleep(std::time::Duration::from_millis(5));
}
});
});

let between = snapshot_gauge(&snapshotter, SCANNER_TIME_BETWEEN_POLL_MS)
.expect("time_between_poll_ms must be emitted on every poll");
assert!(
between > 0.0,
"second-poll time_between_poll_ms must be positive, got {between}"
);

let ratio = snapshot_gauge(&snapshotter, SCANNER_POLL_IDLE_RATIO)
.expect("poll_idle_ratio must be emitted on poll end");
assert!(
(0.0..=1.0).contains(&ratio),
"poll_idle_ratio must be in [0, 1], got {ratio}"
);
}

/// Java parity: `ScannerMetricGroup.recordPollStart` emits
/// `timeMsBetweenPoll = 0` on the very first poll. The Rust gauge
/// must do the same so dashboards see the metric series from poll #1.
#[test]
fn time_between_poll_ms_emits_zero_on_first_poll() {
use crate::metrics::SCANNER_TIME_BETWEEN_POLL_MS;
use metrics_util::debugging::DebuggingRecorder;

let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
with_test_log_scanner_inner(|inner| {
let _g = PollGuard::new(inner);
// Drop at end of scope completes the poll; the value of
// SCANNER_TIME_BETWEEN_POLL_MS was emitted at start, not end.
});
});

let between = snapshot_gauge(&snapshotter, SCANNER_TIME_BETWEEN_POLL_MS)
.expect("time_between_poll_ms must be emitted on the first poll");
assert_eq!(
between, 0.0,
"first-poll time_between_poll_ms must be 0.0 (Java parity), got {between}"
);
}

/// Pins the single-consumer contract: overlapping `PollGuard`s on the
/// same scanner trip the `debug_assert!` in `record_poll_start`.
/// Release builds skip the check, so the test is gated on
/// `debug_assertions`.
#[cfg(debug_assertions)]
#[test]
#[should_panic(expected = "concurrent poll() detected")]
fn overlapping_polls_panic_in_debug_builds() {
with_test_log_scanner_inner(|inner| {
let _g1 = PollGuard::new(inner);
// _g1 has not been dropped → poll_start_at is still Some,
// so the second start must panic.
let _g2 = PollGuard::new(inner);
});
}
}
48 changes: 48 additions & 0 deletions crates/fluss/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,34 @@ pub const CLIENT_BYTES_RECEIVED_TOTAL: &str = "fluss.client.bytes_received.total
pub const CLIENT_REQUEST_LATENCY_MS: &str = "fluss.client.request_latency_ms";
pub const CLIENT_REQUESTS_IN_FLIGHT: &str = "fluss.client.requests_in_flight";

// ---------------------------------------------------------------------------
// Scanner poll-timing metrics
//
// Java reference: ScannerMetricGroup.java, LogScannerImpl.java
//
// These track consumer liveness and processing efficiency at the `poll()`
// boundary. Java records via `volatile long` fields read by gauge suppliers;
// Rust snapshots the values at poll start/end.
//
// Java's `lastPollSecondsAgo` gauge is intentionally NOT ported. Java
// implements it as a gauge supplier evaluated at scrape time, which the
// `metrics` crate facade has no equivalent for. A snapshot-at-poll-start
// port would just duplicate `time_between_poll_ms / 1000` and would not
// advance while a consumer is hung — defeating the metric's purpose
// (detecting a stuck consumer). Revisit if the `metrics` crate gains a
// supplier abstraction or we add a background liveness task.
// ---------------------------------------------------------------------------

/// Gauge: milliseconds between the start of consecutive `poll()` calls. A
/// large value usually means the consumer's downstream processing is slow.
pub const SCANNER_TIME_BETWEEN_POLL_MS: &str = "fluss.client.scanner.time_between_poll_ms";

/// Gauge: fraction of wall-clock time spent inside `poll()` —
/// `poll_time_ms / (poll_time_ms + time_between_poll_ms)`. A value near 1.0
/// means the scanner is starved for data; a low value means the consumer is
/// the bottleneck.
pub const SCANNER_POLL_IDLE_RATIO: &str = "fluss.client.scanner.poll_idle_ratio";

/// Returns a label value for reportable API keys, matching Java's
/// `ConnectionMetrics.REPORT_API_KEYS` filter (`ProduceLog`, `FetchLog`,
/// `PutKv`, `Lookup`). Returns `None` for admin/metadata/auth calls to
Expand Down Expand Up @@ -267,4 +295,24 @@ mod tests {
assert_eq!(counter_by_api_key.get("produce_log"), Some(&5));
assert_eq!(counter_by_api_key.get("fetch_log"), Some(&3));
}

#[test]
fn scanner_poll_timing_metrics_emit_correctly() {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
metrics::gauge!(SCANNER_TIME_BETWEEN_POLL_MS).set(200.0);
metrics::gauge!(SCANNER_POLL_IDLE_RATIO).set(0.8);
});

let snapshot = snapshotter.snapshot();
let entries: Vec<_> = snapshot.into_vec();

assert_eq!(
find_gauge!(entries, SCANNER_TIME_BETWEEN_POLL_MS),
Some(200.0)
);
assert_eq!(find_gauge!(entries, SCANNER_POLL_IDLE_RATIO), Some(0.8));
}
}
4 changes: 3 additions & 1 deletion website/docs/user-guide/rust/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ Complete API reference for the Fluss Rust client.

## `LogScanner`

Single-consumer: do not call `poll` concurrently on the same scanner (e.g. from `tokio::join!` or two tasks sharing an `Arc`). Mirrors Java's `LogScannerImpl.acquire()` guard. Debug builds surface overlapping calls via a `debug_assert!`; release builds skip the check for performance and produce skewed poll-timing metrics (`fluss.client.scanner.time_between_poll_ms`, `fluss.client.scanner.poll_idle_ratio`) if the contract is violated.

| Method | Description |
|-----------------------------------------------------------------------------------------------------------|----------------------------------------------------------|
| `async fn subscribe(&self, bucket_id: i32, start_offset: i64) -> Result<()>` | Subscribe to a bucket |
Expand All @@ -151,7 +153,7 @@ Complete API reference for the Fluss Rust client.

## `RecordBatchLogScanner`

Overlapping `poll` calls on clones that share state, or `poll` concurrent with `RecordBatchLogReader::next_batch`, are not supported. Use one active polling/consumption call at a time per underlying scanner state.
Single-consumer: overlapping `poll` calls on handles that share state, or `poll` concurrent with `RecordBatchLogReader::next_batch`, are not supported — use one active polling/consumption call at a time per underlying scanner state. Mirrors Java's `LogScannerImpl.acquire()` guard. Debug builds surface overlapping calls via a `debug_assert!`; release builds skip the check for performance and produce skewed poll-timing metrics (`fluss.client.scanner.time_between_poll_ms`, `fluss.client.scanner.poll_idle_ratio`) if the contract is violated.

| Method | Description |
|-----------------------------------------------------------------------------------------------------------|----------------------------------------------------------|
Expand Down
Loading