Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bindings/python/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,11 @@ impl TableBucket {

/// Convert to core TableBucket (internal use)
pub fn to_core(&self) -> fcore::metadata::TableBucket {
fcore::metadata::TableBucket::new(self.table_id, self.bucket)
fcore::metadata::TableBucket::new_with_partition(
self.table_id,
self.partition_id,
self.bucket,
)
}
}

Expand Down
6 changes: 5 additions & 1 deletion crates/fluss/src/client/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,11 @@ impl FlussAdmin {
// Convert proto response to LakeSnapshot
let mut table_buckets_offset = HashMap::new();
for bucket_snapshot in response.bucket_snapshots {
let table_bucket = TableBucket::new(response.table_id, bucket_snapshot.bucket_id);
let table_bucket = TableBucket::new_with_partition(
response.table_id,
bucket_snapshot.partition_id,
bucket_snapshot.bucket_id,
);
if let Some(log_offset) = bucket_snapshot.log_offset {
table_buckets_offset.insert(table_bucket, log_offset);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ mod tests {
let leader = metadata
.leader_for(&table_path, &TableBucket::new(1, 0))
.await
.expect("leader request should be Ok")
.expect("leader should exist");
.unwrap()
.expect("leader");
assert_eq!(leader.id(), 1);
}

Expand Down
153 changes: 109 additions & 44 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::{
};
use tempfile::TempDir;

use crate::TableId;
use crate::client::connection::FlussConnection;
use crate::client::credentials::SecurityTokenManager;
use crate::client::metadata::Metadata;
Expand All @@ -43,6 +42,7 @@ use crate::record::{
};
use crate::rpc::{RpcClient, RpcError, message};
use crate::util::FairBucketStatusMap;
use crate::{PartitionId, TableId};
Comment thread
luoyuxia marked this conversation as resolved.

const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
#[allow(dead_code)]
Expand Down Expand Up @@ -88,7 +88,7 @@ impl<'a> TableScan<'a> {
/// # pub async fn example() -> Result<()> {
/// let mut config = Config::default();
/// config.bootstrap_server = "127.0.0.1:9123".to_string();
/// let conn = FlussConnection::new(config).await;
/// let conn = FlussConnection::new(config).await?;
///
/// let table_descriptor = TableDescriptor::builder()
/// .schema(
Expand Down Expand Up @@ -164,7 +164,7 @@ impl<'a> TableScan<'a> {
/// # pub async fn example() -> Result<()> {
/// let mut config = Config::default();
/// config.bootstrap_server = "127.0.0.1:9123".to_string();
/// let conn = FlussConnection::new(config).await;
/// let conn = FlussConnection::new(config).await?;
///
/// let table_descriptor = TableDescriptor::builder()
/// .schema(
Expand Down Expand Up @@ -270,6 +270,7 @@ struct LogScannerInner {
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
log_fetcher: LogFetcher,
is_partitioned_table: bool,
}

impl LogScannerInner {
Expand All @@ -284,6 +285,7 @@ impl LogScannerInner {
Ok(Self {
table_path: table_info.table_path.clone(),
table_id: table_info.table_id,
is_partitioned_table: table_info.is_partitioned(),
metadata: metadata.clone(),
log_scanner_status: log_scanner_status.clone(),
log_fetcher: LogFetcher::new(
Expand Down Expand Up @@ -337,6 +339,13 @@ impl LogScannerInner {
}

async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if self.is_partitioned_table {
return Err(Error::UnsupportedOperation {
message: "The table is a partitioned table, please use \"subscribe_partition\" to \
subscribe a partitioned bucket instead."
.to_string(),
});
}
let table_bucket = TableBucket::new(self.table_id, bucket);
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
Expand All @@ -347,6 +356,13 @@ impl LogScannerInner {
}

async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
if self.is_partitioned_table {
return Err(Error::UnsupportedOperation {
message:
"The table is a partitioned table, subscribe_batch is not supported currently."
.to_string(),
});
}
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
.await?;
Expand All @@ -368,6 +384,29 @@ impl LogScannerInner {
Ok(())
}

async fn subscribe_partition(
&self,
partition_id: PartitionId,
bucket: i32,
offset: i64,
) -> Result<()> {
if !self.is_partitioned_table {
return Err(Error::UnsupportedOperation {
message: "The table is not a partitioned table, please use \"subscribe\" to \
subscribe a non-partitioned bucket instead."
.to_string(),
});
}
let table_bucket =
TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket);
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
.await?;
self.log_scanner_status
.assign_scan_bucket(table_bucket, offset);
Ok(())
}

async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
let result = self.log_fetcher.collect_fetches()?;
if !result.is_empty() {
Expand Down Expand Up @@ -435,6 +474,17 @@ impl LogScanner {
pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.inner.subscribe_batch(bucket_offsets).await
}

pub async fn subscribe_partition(
&self,
partition_id: PartitionId,
bucket: i32,
offset: i64,
) -> Result<()> {
self.inner
.subscribe_partition(partition_id, bucket, offset)
.await
}
}

// Implementation for RecordBatchLogScanner (batches mode)
Expand All @@ -451,6 +501,17 @@ impl RecordBatchLogScanner {
pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.inner.subscribe_batch(bucket_offsets).await
}

pub async fn subscribe_partition(
&self,
partition_id: PartitionId,
bucket: i32,
offset: i64,
) -> Result<()> {
self.inner
.subscribe_partition(partition_id, bucket, offset)
.await
}
}

struct LogFetcher {
Expand Down Expand Up @@ -617,55 +678,55 @@ impl LogFetcher {
)
}

async fn check_and_update_metadata(&self) -> Result<()> {
let need_update = self
.fetchable_buckets()
.iter()
.any(|bucket| self.get_table_bucket_leader(bucket).is_none());
async fn check_and_update_metadata(&self, table_buckets: &[TableBucket]) -> Result<()> {
let mut partition_ids = Vec::new();
let mut need_update = false;

if !need_update {
return Ok(());
for tb in table_buckets {
if self.get_table_bucket_leader(tb).is_some() {
continue;
}

if self.is_partitioned {
partition_ids.push(tb.partition_id().unwrap());
} else {
need_update = true;
break;
}
}

if self.is_partitioned {
// Fallback to full table metadata refresh until partition-aware updates are available.
let update_result = if self.is_partitioned && !partition_ids.is_empty() {
self.metadata
.update_tables_metadata(&HashSet::from([&self.table_path]), &HashSet::new(), vec![])
.update_tables_metadata(
&HashSet::from([&self.table_path]),
&HashSet::new(),
partition_ids,
)
.await
.or_else(|e| {
if let Error::RpcError { source, .. } = &e
&& matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_))
{
warn!(
"Retrying after encountering error while updating table metadata: {e}"
);
Ok(())
} else {
Err(e)
}
})?;
return Ok(());
}
} else if need_update {
self.metadata.update_table_metadata(&self.table_path).await
} else {
Ok(())
};

// TODO: Handle PartitionNotExist error
self.metadata
.update_tables_metadata(&HashSet::from([&self.table_path]), &HashSet::new(), vec![])
.await
.or_else(|e| {
if let Error::RpcError { source, .. } = &e
&& matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_))
{
warn!("Retrying after encountering error while updating table metadata: {e}");
Ok(())
} else {
Err(e)
}
})
// TODO: Handle PartitionNotExist error like java side
update_result.or_else(|e| {
if let Error::RpcError { source, .. } = &e
&& matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_))
{
warn!("Retrying after encountering error while updating table metadata: {e}");
Ok(())
} else {
Err(e)
}
})?;
Ok(())
}

/// Send fetch requests asynchronously without waiting for responses
async fn send_fetches(&self) -> Result<()> {
self.check_and_update_metadata().await?;
self.check_and_update_metadata(self.fetchable_buckets().as_slice())
.await?;
let fetch_request = self.prepare_fetch_log_requests().await;

for (leader, fetch_request) in fetch_request {
Expand Down Expand Up @@ -774,7 +835,11 @@ impl LogFetcher {

for fetch_log_for_bucket in fetch_log_for_buckets {
let bucket: i32 = fetch_log_for_bucket.bucket_id;
let table_bucket = TableBucket::new(table_id, bucket);
let table_bucket = TableBucket::new_with_partition(
table_id,
fetch_log_for_bucket.partition_id,
bucket,
);

// todo: check fetch result code for per-bucket
let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) else {
Expand Down Expand Up @@ -1302,7 +1367,7 @@ impl LogFetcher {
)
} else {
let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
partition_id: None,
partition_id: bucket.partition_id(),
bucket_id: bucket.bucket_id(),
fetch_offset: offset,
// 1M
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/metadata/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ pub struct TableBucket {

impl TableBucket {
pub fn new(table_id: TableId, bucket: BucketId) -> Self {
TableBucket {
Self {
table_id,
partition_id: None,
bucket,
Expand Down
66 changes: 63 additions & 3 deletions crates/fluss/tests/integration/log_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ mod table_test {
}

#[tokio::test]
async fn partitioned_table_append() {
async fn partitioned_table_append_scan() {
let cluster = get_fluss_cluster();
let connection = cluster.get_fluss_connection().await;

Expand Down Expand Up @@ -1098,11 +1098,71 @@ mod table_test {
"Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)' does not exist."
));

let log_scanner = table
.new_scan()
.create_log_scanner()
.expect("Failed to create log scanner");
let partition_info = admin
.list_partition_infos(&table_path)
.await
.expect("Failed to list partition infos");
for partition_info in partition_info {
log_scanner
.subscribe_partition(partition_info.get_partition_id(), 0, 0)
.await
.expect("Failed to subscribe to partition");
}

let expected_records = vec![
(1, "US", 100i64),
(2, "US", 200i64),
(3, "EU", 300i64),
(4, "EU", 400),
(5, "US", 500i64),
(6, "US", 600i64),
(7, "EU", 700i64),
(8, "EU", 800i64),
];
let expected_records: Vec<(i32, String, i64)> = expected_records
.into_iter()
.map(|(id, region, val)| (id, region.to_string(), val))
.collect();

let mut collected_records: Vec<(i32, String, i64)> = Vec::new();
let start_time = std::time::Instant::now();
while collected_records.len() < expected_records.len()
&& start_time.elapsed() < Duration::from_secs(10)
{
let records = log_scanner
.poll(Duration::from_millis(500))
.await
.expect("Failed to poll log scanner");
for rec in records {
let row = rec.row();
collected_records.push((
row.get_int(0),
row.get_string(1).to_string(),
row.get_long(2),
));
}
}

assert_eq!(
collected_records.len(),
expected_records.len(),
"Did not receive all records in time, expect receive {} records, but got {} records",
expected_records.len(),
collected_records.len()
);
collected_records.sort_by_key(|r| r.0);
assert_eq!(
collected_records, expected_records,
"Data mismatch between sent and received"
);

admin
.drop_table(&table_path, false)
.await
.expect("Failed to drop table");

// todo: add scan test in 203
}
}
Loading