Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions crates/fluss/src/client/table/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ use crate::client::credentials::CredentialsReceiver;
use crate::error::{Error, Result};
use crate::io::{FileIO, Storage};
use crate::metadata::TableBucket;
use crate::metrics::{
SCANNER_REMOTE_FETCH_BYTES_TOTAL, SCANNER_REMOTE_FETCH_ERRORS_TOTAL,
SCANNER_REMOTE_FETCH_REQUESTS_TOTAL,
};
use crate::metrics::ScannerMetrics;
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
use futures::TryStreamExt;
use parking_lot::Mutex;
Expand Down Expand Up @@ -394,6 +391,10 @@ struct DownloadCoordinator {
max_concurrent_downloads: usize,
recycle_notify: Arc<Notify>,
fetcher: Arc<dyn RemoteLogFetcher>,
/// Per-table scanner metric handles cloned by every spawned download
/// task to attribute remote-fetch metrics to the owning scanner's
/// `(database, table)`.
metrics: Arc<ScannerMetrics>,
}

impl DownloadCoordinator {
Expand Down Expand Up @@ -462,10 +463,11 @@ impl DownloadCoordinator {
// Clone data for the spawned task
let fetcher = self.fetcher.clone();
let recycle_notify = self.recycle_notify.clone();
let metrics = Arc::clone(&self.metrics);

// Spawn download task
self.active_downloads.spawn(async move {
spawn_download_task(request, permit, fetcher, recycle_notify).await
spawn_download_task(request, permit, fetcher, recycle_notify, metrics).await
});
self.in_flight += 1;
}
Expand All @@ -491,6 +493,7 @@ async fn spawn_download_task(
permit: tokio::sync::OwnedSemaphorePermit,
fetcher: Arc<dyn RemoteLogFetcher>,
recycle_notify: Arc<Notify>,
metrics: Arc<ScannerMetrics>,
) -> DownloadResult {
// Check if receiver still alive (early cancellation check)
if request.result_sender.is_closed() {
Expand All @@ -501,16 +504,15 @@ async fn spawn_download_task(
// Java reference: RemoteLogDownloader.java increments `remoteFetchRequestCount`
// immediately before initiating the download. Each retry of the same segment
// counts as a separate request (matches Java behavior).
metrics::counter!(SCANNER_REMOTE_FETCH_REQUESTS_TOTAL).increment(1);
metrics.record_remote_fetch_request();

// Try download ONCE
let download_result = fetcher.fetch(&request).await;

match download_result {
Ok(fetch_result) => {
// Success - permit will be released on drop (FileSource handles file deletion)
metrics::counter!(SCANNER_REMOTE_FETCH_BYTES_TOTAL)
.increment(fetch_result.file_size as u64);
metrics.record_remote_fetch_bytes(fetch_result.file_size as u64);
DownloadResult::Success {
result: RemoteLogFile {
file_path: fetch_result.file_path,
Expand All @@ -528,7 +530,7 @@ async fn spawn_download_task(
Err(e) => {
// Download failed - check if we should retry or give up
// Counted per attempt, so retries each contribute one error.
metrics::counter!(SCANNER_REMOTE_FETCH_ERRORS_TOTAL).increment(1);
metrics.record_remote_fetch_error();
let retry_count = request.retry_count + 1;

if retry_count > MAX_RETRY_COUNT {
Expand Down Expand Up @@ -781,27 +783,34 @@ pub struct RemoteLogDownloader {
}

impl RemoteLogDownloader {
pub fn new(
pub(crate) fn new(
local_log_dir: TempDir,
max_prefetch_segments: usize,
max_concurrent_downloads: usize,
remote_log_read_concurrency: usize,
credentials_rx: CredentialsReceiver,
metrics: Arc<ScannerMetrics>,
) -> Result<Self> {
let fetcher = Arc::new(ProductionFetcher {
credentials_rx,
local_log_dir: Arc::new(local_log_dir),
remote_log_read_concurrency,
});

Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads)
Self::new_with_fetcher(
fetcher,
max_prefetch_segments,
max_concurrent_downloads,
metrics,
)
}

/// Create a RemoteLogDownloader with a custom fetcher (for testing).
pub fn new_with_fetcher(
pub(crate) fn new_with_fetcher(
fetcher: Arc<dyn RemoteLogFetcher>,
max_prefetch_segments: usize,
max_concurrent_downloads: usize,
metrics: Arc<ScannerMetrics>,
) -> Result<Self> {
let (request_sender, request_receiver) = mpsc::unbounded_channel();

Expand All @@ -813,6 +822,7 @@ impl RemoteLogDownloader {
max_concurrent_downloads,
recycle_notify: Arc::new(Notify::new()),
fetcher,
metrics,
};

// Spawn coordinator task - it will exit when request_sender is dropped
Expand Down Expand Up @@ -976,13 +986,23 @@ impl RemoteLogDownloader {
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::TablePath;
use crate::test_utils::test_scanner_metrics;
use std::sync::atomic::{AtomicUsize, Ordering};

/// Helper function to create a TableBucket for testing
fn create_table_bucket(table_id: i64, bucket_id: i32) -> TableBucket {
TableBucket::new(table_id, bucket_id)
}

/// `ScannerMetrics` instance shared across the local test fixtures. The
/// labels are arbitrary because none of the tests in this module install
/// a metrics recorder; the metrics just need to exist for the API
/// surface.
fn metrics() -> Arc<ScannerMetrics> {
test_scanner_metrics(&TablePath::new("db", "tbl"))
}

/// Simplified fake fetcher for testing
struct FakeFetcher {
completion_gate: Arc<Notify>,
Expand Down Expand Up @@ -1172,6 +1192,7 @@ mod tests {
fake_fetcher.clone(),
10, // High prefetch limit
2, // Max concurrent downloads = 2
metrics(),
)
.unwrap();

Expand Down Expand Up @@ -1219,6 +1240,7 @@ mod tests {
fake_fetcher,
2, // Max prefetch = 2
10, // High concurrent limit
metrics(),
)
.unwrap();

Expand Down Expand Up @@ -1278,7 +1300,7 @@ mod tests {
let fake_fetcher = Arc::new(FakeFetcher::new(2, true)); // Fail twice, succeed third time

let downloader =
RemoteLogDownloader::new_with_fetcher(fake_fetcher.clone(), 10, 1).unwrap();
RemoteLogDownloader::new_with_fetcher(fake_fetcher.clone(), 10, 1, metrics()).unwrap();

let bucket = create_table_bucket(1, 0);
let seg = create_segment("seg1", 0, 1000, bucket);
Expand All @@ -1303,7 +1325,7 @@ mod tests {
let seg2 = create_segment("seg2", 100, 1000, create_table_bucket(1, 0));
let fake_fetcher2 = Arc::new(FakeFetcher::new(100, true)); // Fail forever
let downloader2 =
RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10, 1).unwrap();
RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10, 1, metrics()).unwrap();

let future2 = downloader2.request_remote_log("dir", &seg2);
tokio::time::sleep(Duration::from_millis(50)).await;
Expand Down
Loading
Loading