Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ unsafe fn delete_connection(conn: *mut Connection) {

impl Connection {
fn get_admin(&self) -> ffi::FfiPtrResult {
let admin_result = RUNTIME.block_on(async { self.inner.get_admin().await });
let admin_result = self.inner.get_admin();

match admin_result {
Ok(admin) => {
Expand Down
2 changes: 1 addition & 1 deletion bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ pub fn resolve_row_types(
Some(fcore::metadata::DataType::Decimal(dt)) => {
let (precision, scale) = (dt.precision(), dt.scale());
let bd = bigdecimal::BigDecimal::from_str(cow.as_ref()).map_err(|e| {
anyhow!("Column {idx}: invalid decimal string '{}': {e}", cow)
anyhow!("Column {idx}: invalid decimal string '{cow}': {e}")
})?;
let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, scale)
.map_err(|e| anyhow!("Column {idx}: {e}"))?;
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def main():
table_descriptor = fluss.TableDescriptor(fluss_schema)

# Get the admin for Fluss
admin = await conn.get_admin()
admin = conn.get_admin()

# Create a Fluss table
table_path = fluss.TablePath("fluss", "sample_table_types")
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class Config:
class FlussConnection:
@staticmethod
async def create(config: Config) -> FlussConnection: ...
async def get_admin(self) -> FlussAdmin: ...
def get_admin(self) -> FlussAdmin: ...
async def get_table(self, table_path: TablePath) -> FlussTable: ...
def close(self) -> None: ...
def __enter__(self) -> FlussConnection: ...
Expand Down
18 changes: 6 additions & 12 deletions bindings/python/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,13 @@ impl FlussConnection {
}

/// Get admin interface
fn get_admin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();
fn get_admin(&self, py: Python<'_>) -> PyResult<Py<FlussAdmin>> {
let admin = self
.inner
.get_admin()
.map_err(|e| FlussError::from_core_error(&e))?;

future_into_py(py, async move {
let admin = client
.get_admin()
.await
.map_err(|e| FlussError::from_core_error(&e))?;

let py_admin = FlussAdmin::from_core(admin);

Python::attach(|py| Py::new(py, py_admin))
})
Py::new(py, FlussAdmin::from_core(admin))
}

/// Get a table
Expand Down
1 change: 0 additions & 1 deletion bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ impl TableScan {

let admin = conn
.get_admin()
.await
.map_err(|e| FlussError::from_core_error(&e))?;

let (projected_schema, projected_row_type) =
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async def _connect_with_retry(bootstrap_servers, timeout=60):
conn = None
try:
conn = await fluss.FlussConnection.create(config)
admin = await conn.get_admin()
admin = conn.get_admin()
nodes = await admin.get_server_nodes()
if any(n.server_type == "TabletServer" for n in nodes):
return conn
Expand Down Expand Up @@ -281,4 +281,4 @@ def plaintext_bootstrap_servers(fluss_cluster):
@pytest_asyncio.fixture(scope="session")
async def admin(connection):
"""Session-scoped admin client."""
return await connection.get_admin()
return connection.get_admin()
4 changes: 2 additions & 2 deletions bindings/python/test/test_sasl_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def test_sasl_connect_with_valid_credentials(sasl_bootstrap_servers):
"security.sasl.password": "admin-secret",
})
conn = await fluss.FlussConnection.create(config)
admin = await conn.get_admin()
admin = conn.get_admin()

db_name = "py_sasl_test_valid_db"
db_descriptor = fluss.DatabaseDescriptor(comment="created via SASL auth")
Expand All @@ -58,7 +58,7 @@ async def test_sasl_connect_with_second_user(sasl_bootstrap_servers):
"security.sasl.password": "alice-secret",
})
conn = await fluss.FlussConnection.create(config)
admin = await conn.get_admin()
admin = conn.get_admin()

# Basic operation to confirm functional connection
assert not await admin.database_exists("some_nonexistent_db_alice")
Expand Down
2 changes: 1 addition & 1 deletion crates/examples/src/example_kv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub async fn main() -> Result<()> {

let table_path = TablePath::new("fluss", "rust_upsert_lookup_example");

let admin = conn.get_admin().await?;
let admin = conn.get_admin()?;
admin
.create_table(&table_path, &table_descriptor, true)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/examples/src/example_partitioned_kv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn main() -> Result<()> {

let table_path = TablePath::new("fluss", "partitioned_kv_example");

let admin = conn.get_admin().await?;
let admin = conn.get_admin()?;
admin
.create_table(&table_path, &table_descriptor, true)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/examples/src/example_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub async fn main() -> Result<()> {

let table_path = TablePath::new("fluss", "rust_test_long");

let admin = conn.get_admin().await?;
let admin = conn.get_admin()?;

admin
.create_table(&table_path, &table_descriptor, true)
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() -> Result<()> {
let mut config = Config::default();
config.bootstrap_servers = "127.0.0.1:9123".to_string();
let connection = FlussConnection::new(config).await?;
let admin = connection.get_admin().await?;
let admin = connection.get_admin()?;

// ---- Primary key (KV) table: upsert and lookup ----
let kv_path = TablePath::new("fluss", "users");
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl FlussConnection {
&self.args
}

pub async fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
pub fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
// 1. Fast path: return cached instance if already initialized.
if let Some(admin) = self.admin_client.read().as_ref() {
return Ok(admin.clone());
Expand Down
4 changes: 2 additions & 2 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl<'a> TableScan<'a> {
/// .build()?,
/// ).build()?;
/// let table_path = TablePath::new("fluss".to_owned(), "rust_test_long".to_owned());
/// let admin = conn.get_admin().await?;
/// let admin = conn.get_admin()?;
/// admin.create_table(&table_path, &table_descriptor, true)
/// .await?;
/// let table_info = admin.get_table_info(&table_path).await?;
Expand Down Expand Up @@ -169,7 +169,7 @@ impl<'a> TableScan<'a> {
/// .build()?,
/// ).build()?;
/// let table_path = TablePath::new("fluss".to_owned(), "rust_test_long".to_owned());
/// let admin = conn.get_admin().await?;
/// let admin = conn.get_admin()?;
/// admin.create_table(&table_path, &table_descriptor, true)
/// .await?;
/// let table = conn.get_table(&table_path).await?;
Expand Down
5 changes: 1 addition & 4 deletions crates/fluss/src/client/write/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,7 @@ impl Sender {
}
}

debug!(
"Updated metadata for unknown leader tables: {:?}",
unknown_leaders
);
debug!("Updated metadata for unknown leader tables: {unknown_leaders:?}");
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
//! let mut config = Config::default();
//! config.bootstrap_servers = "127.0.0.1:9123".to_string();
//! let connection = FlussConnection::new(config).await?;
//! let admin = connection.get_admin().await?;
//! let admin = connection.get_admin()?;
//!
//! // ---- Primary key (KV) table: upsert and lookup ----
//! let kv_path = TablePath::new("fluss", "users");
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/metadata/data_lake_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod tests {

for (raw, expected) in cases {
let parsed = raw.parse::<DataLakeFormat>().unwrap();
assert_eq!(parsed, expected, "failed to parse: {}", raw);
assert_eq!(parsed, expected, "failed to parse: {raw}");
}

// negative cases
Expand Down
6 changes: 3 additions & 3 deletions crates/fluss/src/metadata/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ impl RowType {
.iter()
.map(|name| {
self.get_field_index(name).ok_or_else(|| IllegalArgument {
message: format!("Field '{}' does not exist in the row type", name),
message: format!("Field '{name}' does not exist in the row type"),
})
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -1522,7 +1522,7 @@ fn test_time_valid_precision() {
// Test all valid precision values 0 through 9
for precision in 0..=9 {
let result = TimeType::with_nullable(true, precision);
assert!(result.is_ok(), "precision {} should be valid", precision);
assert!(result.is_ok(), "precision {precision} should be valid");
let time = result.unwrap();
assert_eq!(time.precision(), precision);
}
Expand Down Expand Up @@ -1550,7 +1550,7 @@ fn test_timestamp_valid_precision() {
// Test all valid precision values 0 through 9
for precision in 0..=9 {
let result = TimestampType::with_nullable(true, precision);
assert!(result.is_ok(), "precision {} should be valid", precision);
assert!(result.is_ok(), "precision {precision} should be valid");
let timestamp_type = result.unwrap();
assert_eq!(timestamp_type.precision(), precision);
}
Expand Down
29 changes: 10 additions & 19 deletions crates/fluss/tests/integration/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod admin_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;

let admin = connection.get_admin().await.expect("should get admin");
let admin = connection.get_admin().expect("should get admin");

let db_descriptor = DatabaseDescriptorBuilder::default()
.comment("test_db")
Expand Down Expand Up @@ -73,10 +73,7 @@ mod admin_test {
async fn test_create_table() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection
.get_admin()
.await
.expect("Failed to get admin client");
let admin = connection.get_admin().expect("Failed to get admin client");

let test_db_name = "test_create_table_db";
let db_descriptor = DatabaseDescriptorBuilder::default()
Expand Down Expand Up @@ -202,10 +199,7 @@ mod admin_test {
async fn test_partition_apis() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection
.get_admin()
.await
.expect("Failed to get admin client");
let admin = connection.get_admin().expect("Failed to get admin client");

let test_db_name = "test_partition_apis_db";
let db_descriptor = DatabaseDescriptorBuilder::default()
Expand Down Expand Up @@ -341,10 +335,7 @@ mod admin_test {
async fn test_fluss_error_response() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection
.get_admin()
.await
.expect("Failed to get admin client");
let admin = connection.get_admin().expect("Failed to get admin client");

let table_path = TablePath::new("fluss", "not_exist");

Expand Down Expand Up @@ -375,7 +366,7 @@ mod admin_test {
async fn test_error_database_not_exist() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
let admin = connection.get_admin().unwrap();

// get_database_info for non-existent database
let result = admin.get_database_info("no_such_db").await;
Expand All @@ -394,7 +385,7 @@ mod admin_test {
async fn test_error_database_already_exist() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
let admin = connection.get_admin().unwrap();

let db_name = "test_error_db_already_exist";
let descriptor = DatabaseDescriptorBuilder::default().build();
Expand Down Expand Up @@ -424,7 +415,7 @@ mod admin_test {
async fn test_error_table_already_exist() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
let admin = connection.get_admin().unwrap();

let db_name = "test_error_tbl_already_exist_db";
let descriptor = DatabaseDescriptorBuilder::default().build();
Expand Down Expand Up @@ -472,7 +463,7 @@ mod admin_test {
async fn test_error_table_not_exist() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
let admin = connection.get_admin().unwrap();

let table_path = TablePath::new("fluss", "no_such_table");

Expand All @@ -491,7 +482,7 @@ mod admin_test {
async fn test_get_server_nodes() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
let admin = connection.get_admin().unwrap();

let nodes = admin
.get_server_nodes()
Expand Down Expand Up @@ -534,7 +525,7 @@ mod admin_test {
async fn test_error_table_not_partitioned() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
let admin = connection.get_admin().unwrap();

let db_name = "test_error_not_partitioned_db";
let descriptor = DatabaseDescriptorBuilder::default().build();
Expand Down
10 changes: 5 additions & 5 deletions crates/fluss/tests/integration/kv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;

let admin = connection.get_admin().await.unwrap();
let admin = connection.get_admin().unwrap();

let table_path = TablePath::new("fluss", "test_upsert_and_lookup");

Expand Down Expand Up @@ -172,7 +172,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;

let admin = connection.get_admin().await.unwrap();
let admin = connection.get_admin().unwrap();

let table_path = TablePath::new("fluss", "test_composite_pk");

Expand Down Expand Up @@ -282,7 +282,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;

let admin = connection.get_admin().await.expect("Failed to get admin");
let admin = connection.get_admin().expect("Failed to get admin");

let table_path = TablePath::new("fluss", "test_partial_update");

Expand Down Expand Up @@ -403,7 +403,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;

let admin = connection.get_admin().await.expect("Failed to get admin");
let admin = connection.get_admin().expect("Failed to get admin");

let table_path = TablePath::new("fluss", "test_partitioned_kv_table");

Expand Down Expand Up @@ -573,7 +573,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;

let admin = connection.get_admin().await.expect("Failed to get admin");
let admin = connection.get_admin().expect("Failed to get admin");

let table_path = TablePath::new("fluss", "test_all_datatypes");

Expand Down
Loading
Loading