Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ bytes = "1.10"
chrono = "0.4.41"
clap = { version = "4.5.48", features = ["derive", "cargo"] }
ctor = "0.2.8"
dashmap = "6"
datafusion = "51.0"
datafusion-cli = "51.0"
datafusion-sqllogictest = "51.0"
Expand Down
1 change: 1 addition & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ repository = { workspace = true }
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
dashmap = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
Expand Down
100 changes: 83 additions & 17 deletions crates/integrations/datafusion/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
// under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use dashmap::DashMap;
use datafusion::catalog::SchemaProvider;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result as DFResult};
use futures::future::try_join_all;
use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids;
use iceberg::inspect::MetadataTableType;
use iceberg::{Catalog, NamespaceIdent, Result};
use iceberg::{Catalog, NamespaceIdent, Result, TableCreation};

use crate::table::IcebergTableProvider;
use crate::to_datafusion_error;
Expand All @@ -34,10 +35,15 @@ use crate::to_datafusion_error;
/// access to table providers within a specific namespace.
#[derive(Debug)]
pub(crate) struct IcebergSchemaProvider {
/// A `HashMap` where keys are table names
/// Reference to the Iceberg catalog
catalog: Arc<dyn Catalog>,
/// The namespace this schema represents
namespace: NamespaceIdent,
/// A concurrent map where keys are table names
/// and values are dynamic references to objects implementing the
/// [`TableProvider`] trait.
tables: HashMap<String, Arc<IcebergTableProvider>>,
/// Wrapped in Arc to allow sharing across async boundaries in register_table.
tables: Arc<DashMap<String, Arc<IcebergTableProvider>>>,
}

impl IcebergSchemaProvider {
Expand Down Expand Up @@ -71,13 +77,16 @@ impl IcebergSchemaProvider {
)
.await?;

let tables: HashMap<String, Arc<IcebergTableProvider>> = table_names
.into_iter()
.zip(providers.into_iter())
.map(|(name, provider)| (name, Arc::new(provider)))
.collect();
let tables = Arc::new(DashMap::new());
for (name, provider) in table_names.into_iter().zip(providers.into_iter()) {
tables.insert(name, Arc::new(provider));
}

Ok(IcebergSchemaProvider { tables })
Ok(IcebergSchemaProvider {
catalog: client,
namespace,
tables,
})
}
}

Expand All @@ -89,13 +98,16 @@ impl SchemaProvider for IcebergSchemaProvider {

fn table_names(&self) -> Vec<String> {
self.tables
.keys()
.flat_map(|table_name| {
.iter()
.flat_map(|entry| {
let table_name = entry.key().clone();
[table_name.clone()]
.into_iter()
.chain(MetadataTableType::all_types().map(|metadata_table_name| {
format!("{}${}", table_name.clone(), metadata_table_name.as_str())
}))
.chain(
MetadataTableType::all_types().map(move |metadata_table_name| {
format!("{}${}", table_name, metadata_table_name.as_str())
}),
)
})
.collect()
}
Expand Down Expand Up @@ -127,7 +139,61 @@ impl SchemaProvider for IcebergSchemaProvider {
Ok(self
.tables
.get(name)
.cloned()
.map(|t| t as Arc<dyn TableProvider>))
.map(|entry| entry.value().clone() as Arc<dyn TableProvider>))
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> DFResult<Option<Arc<dyn TableProvider>>> {
// Convert DataFusion schema to Iceberg schema
// DataFusion schemas don't have field IDs, so we use the function that assigns them automatically
let df_schema = table.schema();
let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
.map_err(to_datafusion_error)?;

// Create the table in the Iceberg catalog
let table_creation = TableCreation::builder()
.name(name.clone())
.schema(iceberg_schema)
.build();

let catalog = self.catalog.clone();
let namespace = self.namespace.clone();
let tables = self.tables.clone();
let name_clone = name.clone();

// Use tokio's spawn_blocking to handle the async work on a blocking thread pool
let result = tokio::task::spawn_blocking(move || {
// Create a new runtime handle to execute the async work
let rt = tokio::runtime::Handle::current();
rt.block_on(async move {
catalog
.create_table(&namespace, table_creation)
.await
.map_err(to_datafusion_error)?;

// Create a new table provider using the catalog reference
let table_provider = IcebergTableProvider::try_new(
catalog.clone(),
namespace.clone(),
name_clone.clone(),
)
.await
.map_err(to_datafusion_error)?;

// Store the new table provider
let old_table = tables.insert(name_clone, Arc::new(table_provider));

Ok(old_table.map(|t| t as Arc<dyn TableProvider>))
})
});

// Block on the spawned task to get the result
// This is safe because spawn_blocking moves the blocking to a dedicated thread pool
futures::executor::block_on(result).map_err(|e| {
DataFusionError::Execution(format!("Failed to create Iceberg table: {e}"))
})?
}
}
31 changes: 2 additions & 29 deletions crates/sqllogictest/src/engine/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,44 +93,17 @@ impl DataFusionEngine {
let namespace = NamespaceIdent::new("default".to_string());
catalog.create_namespace(&namespace, HashMap::new()).await?;

// Create test tables
Self::create_unpartitioned_table(&catalog, &namespace).await?;
// Create partitioned test table (unpartitioned tables are now created via SQL)
Self::create_partitioned_table(&catalog, &namespace).await?;

Ok(Arc::new(
IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
))
}

/// Create an unpartitioned test table with id and name columns
/// TODO: this can be removed when we support CREATE TABLE
async fn create_unpartitioned_table(
catalog: &impl Catalog,
namespace: &NamespaceIdent,
) -> anyhow::Result<()> {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()?;

catalog
.create_table(
namespace,
TableCreation::builder()
.name("test_unpartitioned_table".to_string())
.schema(schema)
.build(),
)
.await?;

Ok(())
}

/// Create a partitioned test table with id, category, and value columns
/// Partitioned by category using identity transform
/// TODO: this can be removed when we support CREATE TABLE
/// TODO: this can be removed when we support CREATE EXTERNAL TABLE
async fn create_partitioned_table(
catalog: &impl Catalog,
namespace: &NamespaceIdent,
Expand Down
4 changes: 4 additions & 0 deletions crates/sqllogictest/testdata/schedules/df_test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ df = { type = "datafusion" }
engine = "df"
slt = "df_test/show_tables.slt"

[[steps]]
engine = "df"
slt = "df_test/create_table.slt"

[[steps]]
engine = "df"
slt = "df_test/insert_into.slt"
90 changes: 90 additions & 0 deletions crates/sqllogictest/testdata/slts/df_test/create_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Test CREATE TABLE with explicit schema
statement ok
CREATE TABLE default.default.empty_table (id INT NOT NULL, name STRING)

# Verify the empty table exists and has correct schema
query IT rowsort
SELECT * FROM default.default.empty_table
----

# Insert data into the created table
query I
INSERT INTO default.default.empty_table VALUES (1, 'Alice')
----
1

# Verify the inserted data
query IT rowsort
SELECT * FROM default.default.empty_table
----
1 Alice

# Insert multiple rows
query I
INSERT INTO default.default.empty_table VALUES (2, 'Bob'), (3, 'Charlie')
----
2

# Verify all rows
query IT rowsort
SELECT * FROM default.default.empty_table
----
1 Alice
2 Bob
3 Charlie

# Test CREATE TABLE with different column types
statement ok
CREATE TABLE default.default.typed_table (id BIGINT NOT NULL, value DOUBLE, flag BOOLEAN)

# Verify the typed table exists
query IDT rowsort
SELECT * FROM default.default.typed_table
----

# Insert data with different types
query I
INSERT INTO default.default.typed_table VALUES (100, 3.14, true), (200, 2.71, false)
----
2

# Verify typed data
query IDT rowsort
SELECT * FROM default.default.typed_table
----
100 3.14 true
200 2.71 false

# Test CREATE TABLE with nullable columns
statement ok
CREATE TABLE default.default.nullable_table (id INT NOT NULL, optional_name STRING)

# Insert with NULL value
query I
INSERT INTO default.default.nullable_table VALUES (1, 'Value'), (2, NULL)
----
2

# Verify NULL handling
query IT rowsort
SELECT * FROM default.default.nullable_table
----
1 Value
2 NULL
4 changes: 4 additions & 0 deletions crates/sqllogictest/testdata/slts/df_test/insert_into.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
# specific language governing permissions and limitations
# under the License.

# Create unpartitioned test table
statement ok
CREATE TABLE default.default.test_unpartitioned_table (id INT NOT NULL, name STRING)

# Verify the table is initially empty
query IT rowsort
SELECT * FROM default.default.test_unpartitioned_table
Expand Down
3 changes: 0 additions & 3 deletions crates/sqllogictest/testdata/slts/df_test/show_tables.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ datafusion information_schema views VIEW
default default test_partitioned_table BASE TABLE
default default test_partitioned_table$manifests BASE TABLE
default default test_partitioned_table$snapshots BASE TABLE
default default test_unpartitioned_table BASE TABLE
default default test_unpartitioned_table$manifests BASE TABLE
default default test_unpartitioned_table$snapshots BASE TABLE
default information_schema columns VIEW
default information_schema df_settings VIEW
default information_schema parameters VIEW
Expand Down
Loading