diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 8957aced..c48bdcca 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -18,10 +18,7 @@ use crate::client::credentials::CredentialsReceiver; use crate::error::{Error, Result}; use crate::io::{FileIO, Storage}; use crate::metadata::TableBucket; -use crate::metrics::{ - SCANNER_REMOTE_FETCH_BYTES_TOTAL, SCANNER_REMOTE_FETCH_ERRORS_TOTAL, - SCANNER_REMOTE_FETCH_REQUESTS_TOTAL, -}; +use crate::metrics::ScannerMetrics; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; use futures::TryStreamExt; use parking_lot::Mutex; @@ -394,6 +391,10 @@ struct DownloadCoordinator { max_concurrent_downloads: usize, recycle_notify: Arc, fetcher: Arc, + /// Per-table scanner metric handles cloned by every spawned download + /// task to attribute remote-fetch metrics to the owning scanner's + /// `(database, table)`. + metrics: Arc, } impl DownloadCoordinator { @@ -462,10 +463,11 @@ impl DownloadCoordinator { // Clone data for the spawned task let fetcher = self.fetcher.clone(); let recycle_notify = self.recycle_notify.clone(); + let metrics = Arc::clone(&self.metrics); // Spawn download task self.active_downloads.spawn(async move { - spawn_download_task(request, permit, fetcher, recycle_notify).await + spawn_download_task(request, permit, fetcher, recycle_notify, metrics).await }); self.in_flight += 1; } @@ -491,6 +493,7 @@ async fn spawn_download_task( permit: tokio::sync::OwnedSemaphorePermit, fetcher: Arc, recycle_notify: Arc, + metrics: Arc, ) -> DownloadResult { // Check if receiver still alive (early cancellation check) if request.result_sender.is_closed() { @@ -501,7 +504,7 @@ async fn spawn_download_task( // Java reference: RemoteLogDownloader.java increments `remoteFetchRequestCount` // immediately before initiating the download. Each retry of the same segment // counts as a separate request (matches Java behavior). - metrics::counter!(SCANNER_REMOTE_FETCH_REQUESTS_TOTAL).increment(1); + metrics.record_remote_fetch_request(); // Try download ONCE let download_result = fetcher.fetch(&request).await; @@ -509,8 +512,7 @@ async fn spawn_download_task( match download_result { Ok(fetch_result) => { // Success - permit will be released on drop (FileSource handles file deletion) - metrics::counter!(SCANNER_REMOTE_FETCH_BYTES_TOTAL) - .increment(fetch_result.file_size as u64); + metrics.record_remote_fetch_bytes(fetch_result.file_size as u64); DownloadResult::Success { result: RemoteLogFile { file_path: fetch_result.file_path, @@ -528,7 +530,7 @@ async fn spawn_download_task( Err(e) => { // Download failed - check if we should retry or give up // Counted per attempt, so retries each contribute one error. - metrics::counter!(SCANNER_REMOTE_FETCH_ERRORS_TOTAL).increment(1); + metrics.record_remote_fetch_error(); let retry_count = request.retry_count + 1; if retry_count > MAX_RETRY_COUNT { @@ -781,12 +783,13 @@ pub struct RemoteLogDownloader { } impl RemoteLogDownloader { - pub fn new( + pub(crate) fn new( local_log_dir: TempDir, max_prefetch_segments: usize, max_concurrent_downloads: usize, remote_log_read_concurrency: usize, credentials_rx: CredentialsReceiver, + metrics: Arc, ) -> Result { let fetcher = Arc::new(ProductionFetcher { credentials_rx, @@ -794,14 +797,20 @@ impl RemoteLogDownloader { remote_log_read_concurrency, }); - Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads) + Self::new_with_fetcher( + fetcher, + max_prefetch_segments, + max_concurrent_downloads, + metrics, + ) } /// Create a RemoteLogDownloader with a custom fetcher (for testing). - pub fn new_with_fetcher( + pub(crate) fn new_with_fetcher( fetcher: Arc, max_prefetch_segments: usize, max_concurrent_downloads: usize, + metrics: Arc, ) -> Result { let (request_sender, request_receiver) = mpsc::unbounded_channel(); @@ -813,6 +822,7 @@ impl RemoteLogDownloader { max_concurrent_downloads, recycle_notify: Arc::new(Notify::new()), fetcher, + metrics, }; // Spawn coordinator task - it will exit when request_sender is dropped @@ -976,6 +986,8 @@ impl RemoteLogDownloader { #[cfg(test)] mod tests { use super::*; + use crate::metadata::TablePath; + use crate::test_utils::test_scanner_metrics; use std::sync::atomic::{AtomicUsize, Ordering}; /// Helper function to create a TableBucket for testing @@ -983,6 +995,14 @@ mod tests { TableBucket::new(table_id, bucket_id) } + /// `ScannerMetrics` instance shared across the local test fixtures. The + /// labels are arbitrary because none of the tests in this module install + /// a metrics recorder; the metrics just need to exist for the API + /// surface. + fn metrics() -> Arc { + test_scanner_metrics(&TablePath::new("db", "tbl")) + } + /// Simplified fake fetcher for testing struct FakeFetcher { completion_gate: Arc, @@ -1172,6 +1192,7 @@ mod tests { fake_fetcher.clone(), 10, // High prefetch limit 2, // Max concurrent downloads = 2 + metrics(), ) .unwrap(); @@ -1219,6 +1240,7 @@ mod tests { fake_fetcher, 2, // Max prefetch = 2 10, // High concurrent limit + metrics(), ) .unwrap(); @@ -1278,7 +1300,7 @@ mod tests { let fake_fetcher = Arc::new(FakeFetcher::new(2, true)); // Fail twice, succeed third time let downloader = - RemoteLogDownloader::new_with_fetcher(fake_fetcher.clone(), 10, 1).unwrap(); + RemoteLogDownloader::new_with_fetcher(fake_fetcher.clone(), 10, 1, metrics()).unwrap(); let bucket = create_table_bucket(1, 0); let seg = create_segment("seg1", 0, 1000, bucket); @@ -1303,7 +1325,7 @@ mod tests { let seg2 = create_segment("seg2", 100, 1000, create_table_bucket(1, 0)); let fake_fetcher2 = Arc::new(FakeFetcher::new(100, true)); // Fail forever let downloader2 = - RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10, 1).unwrap(); + RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10, 1, metrics()).unwrap(); let future2 = downloader2.request_remote_log("dir", &seg2); tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 5fa8bb92..d7e2dd2c 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -27,10 +27,7 @@ use crate::config::Config; use crate::error::Error::UnsupportedOperation; use crate::error::{ApiError, Error, FlussError, Result}; use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath}; -use crate::metrics::{ - SCANNER_BYTES_PER_REQUEST, SCANNER_FETCH_LATENCY_MS, SCANNER_FETCH_REQUESTS_TOTAL, - SCANNER_POLL_IDLE_RATIO, SCANNER_TIME_BETWEEN_POLL_MS, -}; +use crate::metrics::ScannerMetrics; use crate::proto::{ ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable, }; @@ -293,6 +290,9 @@ struct LogScannerInner { /// scanner trip a `debug_assert!` in `record_poll_start` (debug /// builds) or emit a `log::warn!` (release builds). poll_state: Mutex, + /// Per-table scanner metric handles, pre-bound with `database`/`table` + /// labels. + metrics: Arc, } /// Snapshot state used to derive the scanner poll-timing metrics. @@ -365,6 +365,7 @@ impl LogScannerInner { None => to_arrow_schema(full_row_type)?, }; + let metrics = Arc::new(ScannerMetrics::new(&table_info.table_path)); Ok(Self { table_path: table_info.table_path.clone(), table_id: table_info.table_id, @@ -378,10 +379,12 @@ impl LogScannerInner { log_scanner_status.clone(), config, projected_fields, + Arc::clone(&metrics), )?, arrow_schema, reader_active: std::sync::atomic::AtomicBool::new(false), poll_state: Mutex::new(PollState::default()), + metrics, }) } @@ -485,7 +488,7 @@ impl LogScannerInner { until the overlap clears" ); } - metrics::gauge!(SCANNER_TIME_BETWEEN_POLL_MS).set(between_ms); + self.metrics.record_time_between_poll_ms(between_ms); } /// Computes `poll_idle_ratio = poll_time / (poll_time + between_time)`. @@ -522,7 +525,7 @@ impl LogScannerInner { return; } if let Some(r) = ratio { - metrics::gauge!(SCANNER_POLL_IDLE_RATIO).set(r); + self.metrics.record_poll_idle_ratio(r); } } @@ -918,6 +921,9 @@ struct LogFetcher { security_token_manager: Arc, log_fetch_buffer: Arc, nodes_with_pending_fetch_requests: Arc>>, + /// Per-table scanner metric handles shared with the owning + /// `LogScannerInner` and `RemoteLogDownloader`. + metrics: Arc, max_poll_records: usize, fetch_max_bytes: i32, fetch_min_bytes: i32, @@ -932,6 +938,8 @@ struct FetchResponseContext { read_context: ReadContext, remote_read_context: ReadContext, remote_log_downloader: Arc, + /// Per-table scanner metric handles for `scanner.fetch_*` recording. + metrics: Arc, /// `Instant` captured immediately before the FetchLog RPC; used to compute /// `scanner.fetch_latency_ms` on a successful response. request_start_time: Instant, @@ -945,6 +953,7 @@ impl LogFetcher { log_scanner_status: Arc, config: &Config, projected_fields: Option>, + metrics: Arc, ) -> Result { let full_row_type = table_info.get_row_type(); let full_arrow_schema = to_arrow_schema(full_row_type)?; @@ -988,6 +997,7 @@ impl LogFetcher { config.remote_file_download_thread_num, config.scanner_remote_log_read_concurrency, credentials_rx, + Arc::clone(&metrics), )?); // Start the background token refresh task @@ -1005,6 +1015,7 @@ impl LogFetcher { security_token_manager, log_fetch_buffer, nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())), + metrics, max_poll_records: config.scanner_log_max_poll_records, fetch_max_bytes: config.scanner_log_fetch_max_bytes, fetch_min_bytes: config.scanner_log_fetch_min_bytes, @@ -1177,6 +1188,7 @@ impl LogFetcher { let remote_log_downloader = Arc::clone(&self.remote_log_downloader); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); let metadata = self.metadata.clone(); + let metrics = Arc::clone(&self.metrics); // Spawn async task to handle the fetch request // Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped. // This is acceptable because: @@ -1211,7 +1223,7 @@ impl LogFetcher { // Java increment the fetch counter and capture `requestStartTime` immediately // before the RPC. Failed connection acquisition above is not counted. let request_start_time = Instant::now(); - metrics::counter!(SCANNER_FETCH_REQUESTS_TOTAL).increment(1); + metrics.record_fetch_request(); let fetch_response = match con .request(message::FetchLogRequest::new(fetch_request.clone())) @@ -1237,6 +1249,7 @@ impl LogFetcher { read_context, remote_read_context, remote_log_downloader, + metrics, request_start_time, }; Self::handle_fetch_response(fetch_response, response_context).await; @@ -1267,6 +1280,7 @@ impl LogFetcher { read_context, remote_read_context, remote_log_downloader, + metrics, request_start_time, } = context; @@ -1274,9 +1288,8 @@ impl LogFetcher { // both report the serialized API message body size, excluding protocol // headers and framing. Recorded unconditionally (including zero-record // responses) to match Java's histogram semantics. - metrics::histogram!(SCANNER_FETCH_LATENCY_MS) - .record(request_start_time.elapsed().as_secs_f64() * 1000.0); - metrics::histogram!(SCANNER_BYTES_PER_REQUEST).record(fetch_response.encoded_len() as f64); + metrics.record_fetch_latency_ms(request_start_time.elapsed().as_secs_f64() * 1000.0); + metrics.record_bytes_per_request(fetch_response.encoded_len() as f64); for pb_fetch_log_resp in fetch_response.tables_resp { let table_id = pb_fetch_log_resp.table_id; @@ -2046,7 +2059,9 @@ mod tests { use crate::record::MemoryLogRecordsArrowBuilder; use crate::row::{Datum, GenericRow}; use crate::rpc::FlussError; - use crate::test_utils::{build_cluster_arc, build_table_info}; + use crate::test_utils::{ + assert_scanner_entries_labeled, build_cluster_arc, build_table_info, test_scanner_metrics, + }; fn build_records(table_info: &TableInfo, table_path: Arc) -> Result> { let mut builder = MemoryLogRecordsArrowBuilder::new( @@ -2084,6 +2099,7 @@ mod tests { status.clone(), &Config::default(), None, + test_scanner_metrics(&table_path), )?; let bucket = TableBucket::new(1, 0); @@ -2117,6 +2133,7 @@ mod tests { status, &Config::default(), None, + test_scanner_metrics(&table_path), )?; let bucket = TableBucket::new(1, 0); @@ -2154,6 +2171,7 @@ mod tests { status, &Config::default(), None, + test_scanner_metrics(&table_path), )?; fetcher.nodes_with_pending_fetch_requests.lock().insert(1); @@ -2178,6 +2196,7 @@ mod tests { status.clone(), &Config::default(), None, + test_scanner_metrics(&table_path), )?; let response = FetchLogResponse { @@ -2203,6 +2222,7 @@ mod tests { read_context: fetcher.read_context.clone(), remote_read_context: fetcher.remote_read_context.clone(), remote_log_downloader: fetcher.remote_log_downloader.clone(), + metrics: Arc::clone(&fetcher.metrics), request_start_time: Instant::now(), }; @@ -2229,6 +2249,7 @@ mod tests { status.clone(), &Config::default(), None, + test_scanner_metrics(&table_path), )?; let bucket = TableBucket::new(1, 0); @@ -2257,6 +2278,7 @@ mod tests { read_context: fetcher.read_context.clone(), remote_read_context: fetcher.remote_read_context.clone(), remote_log_downloader: fetcher.remote_log_downloader.clone(), + metrics: Arc::clone(&fetcher.metrics), request_start_time: Instant::now(), }; @@ -2361,6 +2383,7 @@ mod tests { status, &config, None, + test_scanner_metrics(&table_path), )?; let requests = fetcher.prepare_fetch_log_requests().await; @@ -2474,6 +2497,10 @@ mod tests { (0.0..=1.0).contains(&ratio), "poll_idle_ratio must be in [0, 1], got {ratio}" ); + + // Both gauges must carry `database=db` / `table=tbl` (the fixture + // values from `with_test_log_scanner_inner`). + assert_scanner_entries_labeled(&snapshotter.snapshot().into_vec(), "db", "tbl"); } /// Java parity: `ScannerMetricGroup.recordPollStart` emits @@ -2501,6 +2528,7 @@ mod tests { between, 0.0, "first-poll time_between_poll_ms must be 0.0 (Java parity), got {between}" ); + assert_scanner_entries_labeled(&snapshotter.snapshot().into_vec(), "db", "tbl"); } /// Pins the single-consumer contract: overlapping `PollGuard`s on the @@ -2560,6 +2588,7 @@ mod tests { status, &Config::default(), None, + test_scanner_metrics(&table_path), ) .expect("build LogFetcher"); @@ -2586,6 +2615,7 @@ mod tests { read_context: fetcher.read_context.clone(), remote_read_context: fetcher.remote_read_context.clone(), remote_log_downloader: fetcher.remote_log_downloader.clone(), + metrics: Arc::clone(&fetcher.metrics), request_start_time: Instant::now(), }; @@ -2623,5 +2653,11 @@ mod tests { vec![expected_bytes], "bytes histogram must record encoded_len() for parity with Java fetchLogResponse.totalSize()", ); + + // Every emitted scanner metric must carry both `database` and `table` + // labels — that's the whole point of `ScannerMetrics`. If a future + // contributor adds a new `metrics::*!` macro inline (bypassing + // `ScannerMetrics`), this assertion catches it. + assert_scanner_entries_labeled(&entries, "db", "tbl"); } } diff --git a/crates/fluss/src/metrics.rs b/crates/fluss/src/metrics.rs index 540e991f..7c62738c 100644 --- a/crates/fluss/src/metrics.rs +++ b/crates/fluss/src/metrics.rs @@ -22,6 +22,7 @@ //! recorder (e.g. `metrics-exporter-prometheus`) to collect them. When no //! recorder is installed, all metric calls are no-ops with zero overhead. +use crate::metadata::TablePath; use crate::rpc::ApiKey; // --------------------------------------------------------------------------- @@ -30,6 +31,10 @@ use crate::rpc::ApiKey; pub const LABEL_API_KEY: &str = "api_key"; +/// Identifies the database and table for per-table scanner metrics. +pub const LABEL_DATABASE: &str = "database"; +pub const LABEL_TABLE: &str = "table"; + // --------------------------------------------------------------------------- // Connection / RPC metrics // @@ -89,9 +94,9 @@ pub const SCANNER_POLL_IDLE_RATIO: &str = "fluss.client.scanner.poll_idle_ratio" // and counters for throughput; the recorder/exporter handles rate // computation (e.g. Prometheus `rate()`). // -// Java emits one `ScannerMetricGroup` per (database, table). Rust currently -// emits without per-table labels — adding `database`/`table` labels is -// tracked separately and intentionally deferred to keep this PR minimal. +// Java emits one `ScannerMetricGroup` per (database, table); Rust matches +// that by attaching `database` + `table` labels to every scanner metric +// (see `ScannerMetrics` below). // --------------------------------------------------------------------------- /// Histogram: elapsed ms for each successful FetchLog RPC. @@ -114,6 +119,130 @@ pub const SCANNER_REMOTE_FETCH_BYTES_TOTAL: &str = "fluss.client.scanner.remote_ pub const SCANNER_REMOTE_FETCH_ERRORS_TOTAL: &str = "fluss.client.scanner.remote_fetch_errors.total"; +// --------------------------------------------------------------------------- +// Per-table scanner metric handles +// --------------------------------------------------------------------------- + +/// Cached `(database, table)`-labeled scanner metric handles. +/// +/// Adding a new scanner metric: declare the constant above, add one +/// field plus an initializer line in [`Self::new`] using the matching +/// `scanner_{gauge,counter,histogram}` helper, and a `record_*` method. +/// The helpers are the single source of truth for the label set, so a +/// future label addition (e.g. `cluster_id`) is a one-line change. +/// +/// # Recorder binding +/// +/// `metrics::counter!(...)` / `gauge!(...)` / `histogram!(...)` resolve +/// the recorder at the macro callsite. Because this struct caches the +/// returned handles, every cached handle is bound to whichever recorder +/// is installed when [`Self::new`] runs. Construct the scanner *after* +/// installing the production recorder; in tests, construct it inside +/// the `metrics::with_local_recorder(...)` closure. With no recorder +/// installed, all `record_*` calls are zero-overhead no-ops. +pub(crate) struct ScannerMetrics { + time_between_poll_ms: metrics::Gauge, + poll_idle_ratio: metrics::Gauge, + fetch_requests_total: metrics::Counter, + fetch_latency_ms: metrics::Histogram, + bytes_per_request: metrics::Histogram, + remote_fetch_requests_total: metrics::Counter, + remote_fetch_bytes_total: metrics::Counter, + remote_fetch_errors_total: metrics::Counter, +} + +impl ScannerMetrics { + /// Build a fresh handle cache for `table_path`. Resolves the + /// currently installed recorder once per metric. + pub(crate) fn new(table_path: &TablePath) -> Self { + let database = table_path.database(); + let table = table_path.table(); + Self { + time_between_poll_ms: scanner_gauge(SCANNER_TIME_BETWEEN_POLL_MS, database, table), + poll_idle_ratio: scanner_gauge(SCANNER_POLL_IDLE_RATIO, database, table), + fetch_requests_total: scanner_counter(SCANNER_FETCH_REQUESTS_TOTAL, database, table), + fetch_latency_ms: scanner_histogram(SCANNER_FETCH_LATENCY_MS, database, table), + bytes_per_request: scanner_histogram(SCANNER_BYTES_PER_REQUEST, database, table), + remote_fetch_requests_total: scanner_counter( + SCANNER_REMOTE_FETCH_REQUESTS_TOTAL, + database, + table, + ), + remote_fetch_bytes_total: scanner_counter( + SCANNER_REMOTE_FETCH_BYTES_TOTAL, + database, + table, + ), + remote_fetch_errors_total: scanner_counter( + SCANNER_REMOTE_FETCH_ERRORS_TOTAL, + database, + table, + ), + } + } + + pub(crate) fn record_time_between_poll_ms(&self, value: f64) { + self.time_between_poll_ms.set(value); + } + + pub(crate) fn record_poll_idle_ratio(&self, value: f64) { + self.poll_idle_ratio.set(value); + } + + pub(crate) fn record_fetch_request(&self) { + self.fetch_requests_total.increment(1); + } + + pub(crate) fn record_fetch_latency_ms(&self, value: f64) { + self.fetch_latency_ms.record(value); + } + + pub(crate) fn record_bytes_per_request(&self, value: f64) { + self.bytes_per_request.record(value); + } + + pub(crate) fn record_remote_fetch_request(&self) { + self.remote_fetch_requests_total.increment(1); + } + + pub(crate) fn record_remote_fetch_bytes(&self, bytes: u64) { + self.remote_fetch_bytes_total.increment(bytes); + } + + pub(crate) fn record_remote_fetch_error(&self) { + self.remote_fetch_errors_total.increment(1); + } +} + +// Per-table scanner handle factories. These centralize the +// `(database, table)` label set so a future schema change (renaming a +// label, adding `cluster_id`, etc.) is a one-line edit instead of +// touching every callsite in `ScannerMetrics::new`. + +fn scanner_gauge(name: &'static str, database: &str, table: &str) -> metrics::Gauge { + metrics::gauge!( + name, + LABEL_DATABASE => database.to_string(), + LABEL_TABLE => table.to_string(), + ) +} + +fn scanner_counter(name: &'static str, database: &str, table: &str) -> metrics::Counter { + metrics::counter!( + name, + LABEL_DATABASE => database.to_string(), + LABEL_TABLE => table.to_string(), + ) +} + +fn scanner_histogram(name: &'static str, database: &str, table: &str) -> metrics::Histogram { + metrics::histogram!( + name, + LABEL_DATABASE => database.to_string(), + LABEL_TABLE => table.to_string(), + ) +} + /// Returns a label value for reportable API keys, matching Java's /// `ConnectionMetrics.REPORT_API_KEYS` filter (`ProduceLog`, `FetchLog`, /// `PutKv`, `Lookup`). Returns `None` for admin/metadata/auth calls to @@ -131,6 +260,7 @@ pub(crate) fn api_key_label(api_key: ApiKey) -> Option<&'static str> { #[cfg(test)] mod tests { use super::*; + use crate::test_utils::assert_scanner_entries_labeled; use metrics_util::debugging::DebuggingRecorder; macro_rules! find_counter { @@ -339,8 +469,10 @@ mod tests { let snapshotter = recorder.snapshotter(); metrics::with_local_recorder(&recorder, || { - metrics::gauge!(SCANNER_TIME_BETWEEN_POLL_MS).set(200.0); - metrics::gauge!(SCANNER_POLL_IDLE_RATIO).set(0.8); + let table_path = TablePath::new("db", "tbl"); + let m = ScannerMetrics::new(&table_path); + m.record_time_between_poll_ms(200.0); + m.record_poll_idle_ratio(0.8); }); let snapshot = snapshotter.snapshot(); @@ -351,6 +483,7 @@ mod tests { Some(200.0) ); assert_eq!(find_gauge!(entries, SCANNER_POLL_IDLE_RATIO), Some(0.8)); + assert_scanner_entries_labeled(&entries, "db", "tbl"); } #[test] @@ -359,9 +492,11 @@ mod tests { let snapshotter = recorder.snapshotter(); metrics::with_local_recorder(&recorder, || { - metrics::counter!(SCANNER_FETCH_REQUESTS_TOTAL).increment(1); - metrics::histogram!(SCANNER_FETCH_LATENCY_MS).record(15.5); - metrics::histogram!(SCANNER_BYTES_PER_REQUEST).record(4096.0); + let table_path = TablePath::new("db", "tbl"); + let m = ScannerMetrics::new(&table_path); + m.record_fetch_request(); + m.record_fetch_latency_ms(15.5); + m.record_bytes_per_request(4096.0); }); let snapshot = snapshotter.snapshot(); @@ -379,6 +514,7 @@ mod tests { find_histogram!(entries, SCANNER_BYTES_PER_REQUEST), Some(vec![4096.0]) ); + assert_scanner_entries_labeled(&entries, "db", "tbl"); } #[test] @@ -387,9 +523,13 @@ mod tests { let snapshotter = recorder.snapshotter(); metrics::with_local_recorder(&recorder, || { - metrics::counter!(SCANNER_REMOTE_FETCH_REQUESTS_TOTAL).increment(3); - metrics::counter!(SCANNER_REMOTE_FETCH_BYTES_TOTAL).increment(1024); - metrics::counter!(SCANNER_REMOTE_FETCH_ERRORS_TOTAL).increment(1); + let table_path = TablePath::new("db", "tbl"); + let m = ScannerMetrics::new(&table_path); + m.record_remote_fetch_request(); + m.record_remote_fetch_request(); + m.record_remote_fetch_request(); + m.record_remote_fetch_bytes(1024); + m.record_remote_fetch_error(); }); let snapshot = snapshotter.snapshot(); @@ -407,5 +547,71 @@ mod tests { find_counter!(entries, SCANNER_REMOTE_FETCH_ERRORS_TOTAL), Some(1) ); + assert_scanner_entries_labeled(&entries, "db", "tbl"); + } + + /// Two scanners on different tables must produce independent metric + /// series. + #[test] + fn different_table_paths_produce_separate_metric_series() { + use std::collections::HashMap; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let m1 = ScannerMetrics::new(&TablePath::new("db1", "t1")); + let m2 = ScannerMetrics::new(&TablePath::new("db2", "t2")); + + for _ in 0..5 { + m1.record_fetch_request(); + } + for _ in 0..3 { + m2.record_fetch_request(); + } + }); + + let snapshot = snapshotter.snapshot(); + let entries: Vec<_> = snapshot.into_vec(); + + let request_entries: Vec<_> = entries + .iter() + .filter(|(key, _, _, _)| key.key().name() == SCANNER_FETCH_REQUESTS_TOTAL) + .collect(); + + assert_eq!( + request_entries.len(), + 2, + "(db1,t1) and (db2,t2) must be separate metric series" + ); + + let mut counter_by_table: HashMap<(String, String), u64> = HashMap::new(); + for (key, _, _, val) in request_entries { + let mut database = None; + let mut table = None; + for label in key.key().labels() { + if label.key() == LABEL_DATABASE { + database = Some(label.value().to_string()); + } else if label.key() == LABEL_TABLE { + table = Some(label.value().to_string()); + } + } + let database = database.expect("scanner metric must include database label"); + let table = table.expect("scanner metric must include table label"); + let counter_value = match val { + metrics_util::debugging::DebugValue::Counter(v) => *v, + other => panic!("expected Counter, got {other:?}"), + }; + counter_by_table.insert((database, table), counter_value); + } + + assert_eq!( + counter_by_table.get(&("db1".to_string(), "t1".to_string())), + Some(&5), + ); + assert_eq!( + counter_by_table.get(&("db2".to_string(), "t2".to_string())), + Some(&3), + ); } } diff --git a/crates/fluss/src/test_utils.rs b/crates/fluss/src/test_utils.rs index 47bb2ea8..f1e17e5f 100644 --- a/crates/fluss/src/test_utils.rs +++ b/crates/fluss/src/test_utils.rs @@ -20,6 +20,7 @@ use crate::metadata::{ DataField, DataTypes, PhysicalTablePath, Schema, TableBucket, TableDescriptor, TableInfo, TablePath, }; +use crate::metrics::{LABEL_DATABASE, LABEL_TABLE, ScannerMetrics}; use std::collections::HashMap; use std::sync::Arc; @@ -87,3 +88,59 @@ pub(crate) fn build_cluster_arc( ) -> Arc { Arc::new(build_cluster(table_path, table_id, buckets)) } + +/// Build an `Arc` for tests. Most callers don't install +/// a recorder, so the cached handles are no-ops; tests that *do* install +/// `metrics::with_local_recorder(...)` must call this *inside* the +/// recorder closure for the cached handles to bind to that recorder. +pub(crate) fn test_scanner_metrics(table_path: &TablePath) -> Arc { + Arc::new(ScannerMetrics::new(table_path)) +} + +/// Asserts that every entry whose name starts with `fluss.client.scanner.` +/// carries both the `database` and `table` labels matching the expected +/// values. Use after a `Snapshotter::snapshot().into_vec()` to verify all +/// emitted scanner metrics in one shot — protects against future scanner +/// metrics that bypass [`ScannerMetrics`]. +pub(crate) fn assert_scanner_entries_labeled( + entries: &[( + metrics_util::CompositeKey, + Option, + Option, + metrics_util::debugging::DebugValue, + )], + expected_database: &str, + expected_table: &str, +) { + for (key, _, _, _) in entries { + let name = key.key().name(); + if !name.starts_with("fluss.client.scanner.") { + continue; + } + let labels: Vec<_> = key + .key() + .labels() + .map(|l| (l.key().to_string(), l.value().to_string())) + .collect(); + let database = labels + .iter() + .find(|(k, _)| k == LABEL_DATABASE) + .unwrap_or_else(|| { + panic!("scanner metric `{name}` is missing the database label; labels={labels:?}") + }); + let table = labels + .iter() + .find(|(k, _)| k == LABEL_TABLE) + .unwrap_or_else(|| { + panic!("scanner metric `{name}` is missing the table label; labels={labels:?}") + }); + assert_eq!( + database.1, expected_database, + "scanner metric `{name}` has unexpected database label" + ); + assert_eq!( + table.1, expected_table, + "scanner metric `{name}` has unexpected table label" + ); + } +} diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 3f94841e..5d983030 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -141,6 +141,8 @@ Complete API reference for the Fluss Rust client. Single-consumer: do not call `poll` concurrently on the same scanner (e.g. from `tokio::join!` or two tasks sharing an `Arc`). Mirrors Java's `LogScannerImpl.acquire()` guard. Debug builds surface overlapping calls via a `debug_assert!`; release builds skip the check for performance and produce skewed poll-timing metrics (`fluss.client.scanner.time_between_poll_ms`, `fluss.client.scanner.poll_idle_ratio`) if the contract is violated. +All `fluss.client.scanner.*` metrics carry `database` and `table` labels (matching Java's per-`TablePath` `ScannerMetricGroup`), so multi-table consumers get one time series per scanned table. + | Method | Description | |-----------------------------------------------------------------------------------------------------------|----------------------------------------------------------| | `async fn subscribe(&self, bucket_id: i32, start_offset: i64) -> Result<()>` | Subscribe to a bucket |