diff --git a/Cargo.lock b/Cargo.lock index 97ee25d658..7832f4e777 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3525,6 +3525,7 @@ version = "0.8.0" dependencies = [ "anyhow", "async-trait", + "dashmap", "datafusion", "expect-test", "futures", diff --git a/Cargo.toml b/Cargo.toml index d099398dbd..faa726002c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ bytes = "1.10" chrono = "0.4.41" clap = { version = "4.5.48", features = ["derive", "cargo"] } ctor = "0.2.8" +dashmap = "6" datafusion = "51.0" datafusion-cli = "51.0" datafusion-sqllogictest = "51.0" diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 0ee1738b4f..fd3e489e4b 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -31,6 +31,7 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +dashmap = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 31bbdbd67f..e8b70f051a 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -16,16 +16,17 @@ // under the License. use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use dashmap::DashMap; use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result as DFResult}; use futures::future::try_join_all; +use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; -use iceberg::{Catalog, NamespaceIdent, Result}; +use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; use crate::table::IcebergTableProvider; use crate::to_datafusion_error; @@ -34,10 +35,15 @@ use crate::to_datafusion_error; /// access to table providers within a specific namespace. #[derive(Debug)] pub(crate) struct IcebergSchemaProvider { - /// A `HashMap` where keys are table names + /// Reference to the Iceberg catalog + catalog: Arc, + /// The namespace this schema represents + namespace: NamespaceIdent, + /// A concurrent map where keys are table names /// and values are dynamic references to objects implementing the /// [`TableProvider`] trait. - tables: HashMap>, + /// Wrapped in Arc to allow sharing across async boundaries in register_table. + tables: Arc>>, } impl IcebergSchemaProvider { @@ -71,13 +77,16 @@ impl IcebergSchemaProvider { ) .await?; - let tables: HashMap> = table_names - .into_iter() - .zip(providers.into_iter()) - .map(|(name, provider)| (name, Arc::new(provider))) - .collect(); + let tables = Arc::new(DashMap::new()); + for (name, provider) in table_names.into_iter().zip(providers.into_iter()) { + tables.insert(name, Arc::new(provider)); + } - Ok(IcebergSchemaProvider { tables }) + Ok(IcebergSchemaProvider { + catalog: client, + namespace, + tables, + }) } } @@ -89,13 +98,16 @@ impl SchemaProvider for IcebergSchemaProvider { fn table_names(&self) -> Vec { self.tables - .keys() - .flat_map(|table_name| { + .iter() + .flat_map(|entry| { + let table_name = entry.key().clone(); [table_name.clone()] .into_iter() - .chain(MetadataTableType::all_types().map(|metadata_table_name| { - format!("{}${}", table_name.clone(), metadata_table_name.as_str()) - })) + .chain( + MetadataTableType::all_types().map(move |metadata_table_name| { + format!("{}${}", table_name, metadata_table_name.as_str()) + }), + ) }) .collect() } @@ -127,7 +139,61 @@ impl SchemaProvider for IcebergSchemaProvider { Ok(self .tables .get(name) - .cloned() - .map(|t| t as Arc)) + .map(|entry| entry.value().clone() as Arc)) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> DFResult>> { + // Convert DataFusion schema to Iceberg schema + // DataFusion schemas don't have field IDs, so we use the function that assigns them automatically + let df_schema = table.schema(); + let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref()) + .map_err(to_datafusion_error)?; + + // Create the table in the Iceberg catalog + let table_creation = TableCreation::builder() + .name(name.clone()) + .schema(iceberg_schema) + .build(); + + let catalog = self.catalog.clone(); + let namespace = self.namespace.clone(); + let tables = self.tables.clone(); + let name_clone = name.clone(); + + // Use tokio's spawn_blocking to handle the async work on a blocking thread pool + let result = tokio::task::spawn_blocking(move || { + // Create a new runtime handle to execute the async work + let rt = tokio::runtime::Handle::current(); + rt.block_on(async move { + catalog + .create_table(&namespace, table_creation) + .await + .map_err(to_datafusion_error)?; + + // Create a new table provider using the catalog reference + let table_provider = IcebergTableProvider::try_new( + catalog.clone(), + namespace.clone(), + name_clone.clone(), + ) + .await + .map_err(to_datafusion_error)?; + + // Store the new table provider + let old_table = tables.insert(name_clone, Arc::new(table_provider)); + + Ok(old_table.map(|t| t as Arc)) + }) + }); + + // Block on the spawned task to get the result + // This is safe because spawn_blocking moves the blocking to a dedicated thread pool + futures::executor::block_on(result).map_err(|e| { + DataFusionError::Execution(format!("Failed to create Iceberg table: {e}")) + })? } } diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index e9f93287d8..487d8dc977 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -93,8 +93,7 @@ impl DataFusionEngine { let namespace = NamespaceIdent::new("default".to_string()); catalog.create_namespace(&namespace, HashMap::new()).await?; - // Create test tables - Self::create_unpartitioned_table(&catalog, &namespace).await?; + // Create partitioned test table (unpartitioned tables are now created via SQL) Self::create_partitioned_table(&catalog, &namespace).await?; Ok(Arc::new( @@ -102,35 +101,9 @@ impl DataFusionEngine { )) } - /// Create an unpartitioned test table with id and name columns - /// TODO: this can be removed when we support CREATE TABLE - async fn create_unpartitioned_table( - catalog: &impl Catalog, - namespace: &NamespaceIdent, - ) -> anyhow::Result<()> { - let schema = Schema::builder() - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), - ]) - .build()?; - - catalog - .create_table( - namespace, - TableCreation::builder() - .name("test_unpartitioned_table".to_string()) - .schema(schema) - .build(), - ) - .await?; - - Ok(()) - } - /// Create a partitioned test table with id, category, and value columns /// Partitioned by category using identity transform - /// TODO: this can be removed when we support CREATE TABLE + /// TODO: this can be removed when we support CREATE EXTERNAL TABLE async fn create_partitioned_table( catalog: &impl Catalog, namespace: &NamespaceIdent, diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml b/crates/sqllogictest/testdata/schedules/df_test.toml index df5e638d5a..1d7f42c8d4 100644 --- a/crates/sqllogictest/testdata/schedules/df_test.toml +++ b/crates/sqllogictest/testdata/schedules/df_test.toml @@ -22,6 +22,10 @@ df = { type = "datafusion" } engine = "df" slt = "df_test/show_tables.slt" +[[steps]] +engine = "df" +slt = "df_test/create_table.slt" + [[steps]] engine = "df" slt = "df_test/insert_into.slt" diff --git a/crates/sqllogictest/testdata/slts/df_test/create_table.slt b/crates/sqllogictest/testdata/slts/df_test/create_table.slt new file mode 100644 index 0000000000..2eab1b6bab --- /dev/null +++ b/crates/sqllogictest/testdata/slts/df_test/create_table.slt @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test CREATE TABLE with explicit schema +statement ok +CREATE TABLE default.default.empty_table (id INT NOT NULL, name STRING) + +# Verify the empty table exists and has correct schema +query IT rowsort +SELECT * FROM default.default.empty_table +---- + +# Insert data into the created table +query I +INSERT INTO default.default.empty_table VALUES (1, 'Alice') +---- +1 + +# Verify the inserted data +query IT rowsort +SELECT * FROM default.default.empty_table +---- +1 Alice + +# Insert multiple rows +query I +INSERT INTO default.default.empty_table VALUES (2, 'Bob'), (3, 'Charlie') +---- +2 + +# Verify all rows +query IT rowsort +SELECT * FROM default.default.empty_table +---- +1 Alice +2 Bob +3 Charlie + +# Test CREATE TABLE with different column types +statement ok +CREATE TABLE default.default.typed_table (id BIGINT NOT NULL, value DOUBLE, flag BOOLEAN) + +# Verify the typed table exists +query IDT rowsort +SELECT * FROM default.default.typed_table +---- + +# Insert data with different types +query I +INSERT INTO default.default.typed_table VALUES (100, 3.14, true), (200, 2.71, false) +---- +2 + +# Verify typed data +query IDT rowsort +SELECT * FROM default.default.typed_table +---- +100 3.14 true +200 2.71 false + +# Test CREATE TABLE with nullable columns +statement ok +CREATE TABLE default.default.nullable_table (id INT NOT NULL, optional_name STRING) + +# Insert with NULL value +query I +INSERT INTO default.default.nullable_table VALUES (1, 'Value'), (2, NULL) +---- +2 + +# Verify NULL handling +query IT rowsort +SELECT * FROM default.default.nullable_table +---- +1 Value +2 NULL diff --git a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt index 2ba33afcd1..1e07844326 100644 --- a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt +++ b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt @@ -15,6 +15,10 @@ # specific language governing permissions and limitations # under the License. +# Create unpartitioned test table +statement ok +CREATE TABLE default.default.test_unpartitioned_table (id INT NOT NULL, name STRING) + # Verify the table is initially empty query IT rowsort SELECT * FROM default.default.test_unpartitioned_table diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt index c5da5f6276..770072f9dc 100644 --- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt +++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt @@ -28,9 +28,6 @@ datafusion information_schema views VIEW default default test_partitioned_table BASE TABLE default default test_partitioned_table$manifests BASE TABLE default default test_partitioned_table$snapshots BASE TABLE -default default test_unpartitioned_table BASE TABLE -default default test_unpartitioned_table$manifests BASE TABLE -default default test_unpartitioned_table$snapshots BASE TABLE default information_schema columns VIEW default information_schema df_settings VIEW default information_schema parameters VIEW