Skip to content

feat: support subscribe from remote#76

Merged
luoyuxia merged 3 commits into
apache:mainfrom
luoyuxia:introduce-file-io
Dec 13, 2025
Merged

feat: support subscribe from remote#76
luoyuxia merged 3 commits into
apache:mainfrom
luoyuxia:introduce-file-io

Conversation

@luoyuxia
Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia commented Dec 7, 2025

Purpose

Linked issue: close #36

Brief change log

Tests

API and Format

Documentation

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for subscribing to and scanning remote log segments in Fluss, enabling the client to fetch and read log data that has been tiered to remote storage (filesystem or memory-backed storage). The implementation introduces a new I/O abstraction layer using OpenDAL for storage operations and extends the log scanner to handle both in-memory and remote log segments.

Key Changes:

  • Added I/O module with FileIO and Storage abstractions supporting filesystem and in-memory storage backends via OpenDAL
  • Implemented remote log download and processing capabilities in the scanner, allowing transparent fetching of tiered log segments
  • Modified create_log_scanner() to return Result<LogScanner> for proper error handling during scanner initialization

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 19 comments.

Show a summary per file
File Description
crates/fluss/src/io/mod.rs New module exposing FileIO and Storage abstractions for remote storage operations
crates/fluss/src/io/file_io.rs FileIO implementation providing input file operations and storage URL parsing
crates/fluss/src/io/storage.rs Storage enum supporting multiple backend types (memory, filesystem) with path handling
crates/fluss/src/io/storage_fs.rs Filesystem storage backend configuration using OpenDAL
crates/fluss/src/io/storage_memory.rs In-memory storage backend configuration for testing
crates/fluss/src/client/table/remote_log.rs Core remote log functionality including segment downloading, caching, and record parsing
crates/fluss/src/client/table/scanner.rs Extended log fetcher to handle remote log segments alongside in-memory records
crates/fluss/src/client/table/mod.rs Added remote_log module declaration
crates/fluss/src/error.rs New error variants for I/O operations (IoUnsupported, IoUnexpected)
crates/fluss/src/lib.rs Exposed new io module publicly
crates/fluss/src/metadata/datatype.rs Minor formatting improvements using shorthand syntax
crates/fluss/Cargo.toml Added dependencies (opendal, url, async-trait, uuid) and feature flags for storage backends
crates/fluss/tests/integration/table_remote_scan.rs New integration test verifying remote log scanning functionality
crates/fluss/tests/integration/fluss_cluster.rs Extended test cluster builder to support remote data directory mounting
crates/fluss/tests/integration/table.rs Updated to handle new create_log_scanner() Result return type and import ordering
crates/fluss/tests/test_fluss.rs Added table_remote_scan test module
crates/examples/src/example_table.rs Updated to handle new create_log_scanner() Result return type
bindings/python/src/table.rs Updated Python bindings to handle new create_log_scanner() Result return type

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread crates/fluss/tests/integration/table_remote_scan.rs
Comment thread crates/fluss/tests/integration/table_remote_scan.rs
Comment thread crates/fluss/tests/integration/fluss_cluster.rs Outdated
Comment thread crates/fluss/src/error.rs Outdated
Comment thread crates/fluss/src/client/table/remote_log.rs
Comment thread crates/fluss/Cargo.toml Outdated
Comment thread crates/fluss/tests/integration/table_remote_scan.rs Outdated
Comment thread crates/fluss/src/client/table/remote_log.rs
Comment thread crates/fluss/src/io/storage.rs Outdated
Comment thread crates/fluss/src/client/table/remote_log.rs Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread crates/fluss/src/client/table/remote_log.rs
Comment thread crates/fluss/src/client/table/remote_log.rs
Comment thread crates/fluss/src/io/storage.rs
Comment thread crates/fluss/src/client/table/scanner.rs
Comment thread crates/fluss/src/io/mod.rs
Comment thread crates/fluss/src/io/file_io.rs
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
read_context: ReadContext,
remote_log_downloader: Arc<RemoteLogDownloader>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure whether RemoteLogDownloader needs to be wrapped with ARC. From my understanding, LogScanner exclusively owns RemoteLogDownloader; RemoteLogDownloader is created synchronously when LogScanner is created, and destroyed when LogScanner is destroyed, and there's no need for multi-threaded sharing?

So, is it sufficient to directly use RemoteLogDownloader with exclusive ownership semantics? Analogous to unique_ptr in C++.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. We don't need ARC.

.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
.await?;

for pb_fetch_log_resp in fetch_response.tables_resp {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, remote reads download all files first before consuming them, right? One potential optimization is to make it a streaming process, where downloaded files can be consumed immediately.

Our internal index building scenario typically involves consuming files from a day ago or even a few hours ago. I'm concerned about efficiency issues if we use the current model.

I'll submit an issue to follow up on this.
#89

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'm intending to make it as an optimzation in following pr.
But i'll be great that you submit an issue to follow up it.

};

// delete the downloaded local file to free disk
delete_file(file_path).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this understanding correct? If the code call before delete_file fails, file cleanup won't be triggered.

It seems like we can rely on Raii to guarantee that cleanup will be triggered regardless? I'm not very familiar with Rust yet. In C++, a temporary guard object is usually used to ensure that the guard's destructor will perform the corresponding cleanup work outside the current function scope.

Perhaps we could mark it as a todo item first.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the

local_log_dir: TempDir,

in RemoteLogDownloader will clean up the whole directory that stores local files. The local_log_dir should be deleted after TempDir destory.

Comment thread crates/fluss/src/client/table/remote_log.rs Outdated
@zhaohaidao
Copy link
Copy Markdown
Contributor

Thanks. the latest changes LGTM. @luoyuxia

@luoyuxia luoyuxia merged commit fe72d85 into apache:main Dec 13, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow read log tiered to remote

3 participants