Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions crates/fluss/tests/integration/log_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod table_test {
ColumnPlan, array_dt_basics_columns, as_row_type, create_partitions, create_table,
dt_array_int, dt_map_string_int, dt_row_seq_label, extract_ids_from_batches,
get_shared_cluster, make_int_array, make_string_array, map_dt_basics_columns,
row_dt_basics_columns, scalar_dt_columns,
row_dt_basics_columns, scalar_dt_columns, wait_for_partitions_ready, wait_for_table_ready,
};
use arrow::array::record_batch;
use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan};
Expand Down Expand Up @@ -169,8 +169,7 @@ mod table_test {

create_table(&admin, &table_path, &table_descriptor).await;

// Wait for table to be fully initialized
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
wait_for_table_ready(&admin, &table_path).await;

// Test earliest offset (should be 0 for empty table)
let earliest_offsets = admin
Expand Down Expand Up @@ -475,7 +474,7 @@ mod table_test {
&TableDescriptor::builder().schema(schema).build().unwrap(),
)
.await;
tokio::time::sleep(Duration::from_secs(1)).await;
wait_for_table_ready(&admin, &table_path).await;
Comment thread
fresh-borzoni marked this conversation as resolved.

let table = connection.get_table(&table_path).await.unwrap();
let scanner = table.new_scan().create_record_batch_log_scanner().unwrap();
Expand Down Expand Up @@ -595,8 +594,8 @@ mod table_test {
// Create partitions
create_partitions(&admin, &table_path, "region", &["US", "EU"]).await;

// Wait for partitions to be available
tokio::time::sleep(Duration::from_secs(2)).await;
// Wait for partition bucket leaders to be available.
wait_for_partitions_ready(&admin, &table_path, &["US", "EU"]).await;

let table = connection
.get_table(&table_path)
Expand Down
13 changes: 7 additions & 6 deletions crates/fluss/tests/integration/record_batch_log_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
mod reader_test {
use crate::integration::utils::{
create_partitions, create_table, extract_ids_from_batches, get_shared_cluster,
wait_for_partitions_ready, wait_for_table_buckets_ready, wait_for_table_ready,
};
use arrow::array::record_batch;
use fluss::client::{EARLIEST_OFFSET, FlussConnection, RecordBatchLogReader};
Expand Down Expand Up @@ -48,7 +49,7 @@ mod reader_test {
.build()
.expect("Failed to build table");
create_table(&admin, &table_path, &table_descriptor).await;
tokio::time::sleep(Duration::from_secs(1)).await;
wait_for_table_ready(&admin, &table_path).await;

let table = connection
.get_table(&table_path)
Expand Down Expand Up @@ -121,7 +122,7 @@ mod reader_test {
.build()
.expect("Failed to build table");
create_table(&admin, &table_path, &table_descriptor).await;
tokio::time::sleep(Duration::from_secs(1)).await;
wait_for_table_ready(&admin, &table_path).await;

let table = connection
.get_table(&table_path)
Expand Down Expand Up @@ -189,7 +190,7 @@ mod reader_test {
.build()
.expect("Failed to build table");
create_table(&admin, &table_path, &table_descriptor).await;
tokio::time::sleep(Duration::from_secs(1)).await;
wait_for_table_ready(&admin, &table_path).await;

let table = connection
.get_table(&table_path)
Expand Down Expand Up @@ -284,7 +285,7 @@ mod reader_test {
.build()
.expect("Failed to build table");
create_table(&admin, &table_path, &table_descriptor).await;
tokio::time::sleep(Duration::from_secs(1)).await;
wait_for_table_buckets_ready(&admin, &table_path, &[0, 1]).await;

let table = connection
.get_table(&table_path)
Expand Down Expand Up @@ -380,7 +381,7 @@ mod reader_test {
.expect("Failed to build table");

create_table(&admin, &table_path, &table_descriptor).await;
tokio::time::sleep(Duration::from_secs(1)).await;
wait_for_table_ready(&admin, &table_path).await;

let table = connection
.get_table(&table_path)
Expand Down Expand Up @@ -453,7 +454,7 @@ mod reader_test {

create_table(&admin, &table_path, &table_descriptor).await;
create_partitions(&admin, &table_path, "region", &["US", "EU"]).await;
tokio::time::sleep(Duration::from_secs(2)).await;
wait_for_partitions_ready(&admin, &table_path, &["US", "EU"]).await;

let table = connection
.get_table(&table_path)
Expand Down
157 changes: 130 additions & 27 deletions crates/fluss/tests/integration/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use fluss::metadata::{
use fluss::record::ScanBatch;
use fluss::row::FlussArray;
use fluss::row::binary_array::FlussArrayWriter;
use fluss::rpc::message::OffsetSpec;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::Duration;
Expand Down Expand Up @@ -100,6 +102,110 @@ pub async fn create_table(
.expect("Failed to create table");
}

const READINESS_TIMEOUT: Duration = Duration::from_secs(30);
const READINESS_POLL_INTERVAL: Duration = Duration::from_millis(200);

async fn poll_until<F, Fut>(
timeout: Duration,
interval: Duration,
timeout_message: String,
mut probe: F,
) where
F: FnMut() -> Fut,
Fut: Future<Output = Result<(), String>>,
{
let start = std::time::Instant::now();

loop {
match probe().await {
Ok(()) => return,
Err(err) => {
if start.elapsed() >= timeout {
panic!(
"{timeout_message} after {} seconds. Last error: {err}",
timeout.as_secs()
);
}
}
}

tokio::time::sleep(interval).await;
}
}

/// Waits until the default bucket of a non-partitioned table can serve offset requests.
///
/// Newly-created tables may not have bucket leaders immediately. Polling list offsets avoids
/// fixed sleeps that are either flaky on slow CI or waste time when the cluster is ready sooner.
pub async fn wait_for_table_ready(admin: &FlussAdmin, table_path: &TablePath) {
wait_for_table_buckets_ready(admin, table_path, &[0]).await;
Comment thread
fresh-borzoni marked this conversation as resolved.
}

/// Waits until the specified buckets of a non-partitioned table can serve offset requests.
pub async fn wait_for_table_buckets_ready(
admin: &FlussAdmin,
table_path: &TablePath,
buckets: &[i32],
) {
poll_until(
READINESS_TIMEOUT,
READINESS_POLL_INTERVAL,
format!("Timed out waiting for table '{table_path}' buckets {buckets:?} to become ready"),
|| async {
admin
.list_offsets(table_path, buckets, OffsetSpec::Latest)
.await
.map(|_| ())
.map_err(|err| format!("{err:?}"))
},
)
.await;
}

/// Waits until all listed partition values can serve offset requests for the default bucket.
pub async fn wait_for_partitions_ready(
admin: &FlussAdmin,
table_path: &TablePath,
partition_values: &[&str],
) {
for partition_value in partition_values {
wait_for_partition_ready(admin, table_path, partition_value).await;
}
}

/// Waits until one partition value can serve offset requests for the default bucket.
pub async fn wait_for_partition_ready(
admin: &FlussAdmin,
table_path: &TablePath,
partition_value: &str,
) {
wait_for_partition_buckets_ready(admin, table_path, partition_value, &[0]).await;
}

/// Waits until the specified buckets of a partition can serve offset requests.
pub async fn wait_for_partition_buckets_ready(
admin: &FlussAdmin,
table_path: &TablePath,
partition_value: &str,
buckets: &[i32],
) {
poll_until(
READINESS_TIMEOUT,
READINESS_POLL_INTERVAL,
format!(
"Timed out waiting for table '{table_path}' partition '{partition_value}' buckets {buckets:?} to become ready"
),
|| async {
admin
.list_partition_offsets(table_path, partition_value, buckets, OffsetSpec::Latest)
.await
.map(|_| ())
.map_err(|err| format!("{err:?}"))
},
)
.await;
}

pub fn make_string_array(values: &[Option<&str>]) -> FlussArray {
let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::string());
for (idx, value) in values.iter().enumerate() {
Expand Down Expand Up @@ -141,38 +247,35 @@ pub fn extract_ids_from_batches(batches: &[ScanBatch]) -> Vec<i32> {

/// Similar to wait_for_cluster_ready but connects with SASL credentials.
pub async fn wait_for_cluster_ready_with_sasl(cluster: &FlussTestingCluster) {
let timeout = Duration::from_secs(30);
let poll_interval = Duration::from_millis(500);
let start = std::time::Instant::now();

let (username, password) = cluster
.sasl_users()
.first()
.expect("SASL cluster must have at least one user");

loop {
let connection = cluster
.get_fluss_connection_with_sasl(username, password)
.await;
if connection
.get_metadata()
.get_cluster()
.get_one_available_server()
.is_some()
{
return;
}

if start.elapsed() >= timeout {
panic!(
"SASL server readiness check timed out after {} seconds. \
CoordinatorEventProcessor may not be initialized or TabletServer may not be available.",
timeout.as_secs()
);
}

tokio::time::sleep(poll_interval).await;
}
poll_until(
Duration::from_secs(30),
Duration::from_millis(500),
"SASL server readiness check timed out".to_string(),
|| async {
let connection = cluster
.get_fluss_connection_with_sasl(username, password)
.await;
if connection
.get_metadata()
.get_cluster()
.get_one_available_server()
.is_some()
{
Ok(())
} else {
Err(
"CoordinatorEventProcessor may not be initialized or TabletServer may not be available"
.to_string(),
)
}
},
)
.await;
}

/// Creates partitions for a partitioned table.
Expand Down
Loading