Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/catalog/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async-trait = { workspace = true }
iceberg = { workspace = true }
sqlx = { version = "0.8.1", features = ["any"], default-features = false }
strum = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
itertools = { workspace = true }
Expand Down
218 changes: 190 additions & 28 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub const SQL_CATALOG_PROP_URI: &str = "uri";
pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
/// catalog sql bind style
pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
/// catalog schema version, setting to "V1" will migrate from V0 to V1 schema
pub const SQL_CATALOG_PROP_SCHEMA_VERSION: &str = "sql.schema-version";

Comment on lines 43 to 46
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing these together makes me think sql_schema_version yet iceberg-java has jdbc.schema-version. I do prefer consistency for something like sql.<kebab>.

static CATALOG_TABLE_NAME: &str = "iceberg_tables";
static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
Expand Down Expand Up @@ -221,6 +223,17 @@ pub struct SqlCatalog {
warehouse_location: String,
fileio: FileIO,
sql_bind_style: SqlBindStyle,
schema_version: SchemaVersion,
}

#[derive(Debug, Clone, Copy, PartialEq, strum::EnumString, strum::Display)]
#[strum(ascii_case_insensitive)]
/// Schema version of the `iceberg_tables` catalog table.
pub enum SchemaVersion {
/// Original schema without the `iceberg_type` column.
V0,
/// Extended schema with the `iceberg_type` column for view support.
V1,
}

#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
Expand Down Expand Up @@ -297,15 +310,60 @@ impl SqlCatalog {
.await
.map_err(from_sqlx_error)?;

// Check if the catalog table supports views, indicating that the schema is V1
let is_v1 = sqlx::query(&format!(
"SELECT {CATALOG_FIELD_RECORD_TYPE} FROM {CATALOG_TABLE_NAME} LIMIT 0"
))
.execute(&pool)
.await
.is_ok();

// Migrate the schema to V1 if the catalog table does not support views and the caller opted in.
let schema_version = if is_v1 {
tracing::debug!("{CATALOG_TABLE_NAME} already supports views");
SchemaVersion::V1
} else {
let requested: SchemaVersion = config
.props
.get(SQL_CATALOG_PROP_SCHEMA_VERSION)
.and_then(|v| v.parse().ok())
.unwrap_or(SchemaVersion::V0);
if requested == SchemaVersion::V1 {
tracing::debug!("{CATALOG_TABLE_NAME} is being updated to support views");
sqlx::query(&format!(
"ALTER TABLE {CATALOG_TABLE_NAME} \
ADD COLUMN {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5)"
))
.execute(&pool)
.await
.map_err(from_sqlx_error)?;
SchemaVersion::V1
} else {
tracing::warn!(
"SqlCatalog is initialized without view support. To auto-migrate the database's schema and enable view support, set {}=V1",
SQL_CATALOG_PROP_SCHEMA_VERSION
);
SchemaVersion::V0
}
};

Ok(SqlCatalog {
name: config.name.to_owned(),
connection: pool,
warehouse_location: config.warehouse_location,
fileio,
sql_bind_style: config.sql_bind_style,
schema_version,
})
}

fn record_type_filter(&self) -> &'static str {
match self.schema_version {
SchemaVersion::V1 => "AND (iceberg_type = 'TABLE' OR iceberg_type IS NULL)",
SchemaVersion::V0 => "",
}
}

/// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
fn replace_placeholders(&self, query: &str) -> String {
match self.sql_bind_style {
Expand Down Expand Up @@ -674,10 +732,8 @@ impl Catalog for SqlCatalog {
FROM {CATALOG_TABLE_NAME}
WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND {CATALOG_FIELD_CATALOG_NAME} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)",
{}",
self.record_type_filter()
),
vec![Some(&namespace.join(".")), Some(&self.name)],
)
Expand Down Expand Up @@ -713,10 +769,8 @@ impl Catalog for SqlCatalog {
WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
{}",
self.record_type_filter()
),
vec![Some(&namespace), Some(&self.name), Some(table_name)],
)
Expand All @@ -740,10 +794,8 @@ impl Catalog for SqlCatalog {
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
{}",
self.record_type_filter()
),
vec![
Some(&self.name),
Expand Down Expand Up @@ -781,10 +833,8 @@ impl Catalog for SqlCatalog {
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
{}",
self.record_type_filter()
),
vec![
Some(&self.name),
Expand Down Expand Up @@ -907,10 +957,8 @@ impl Catalog for SqlCatalog {
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
{}",
self.record_type_filter()
),
vec![
Some(dest.name()),
Expand Down Expand Up @@ -978,11 +1026,9 @@ impl Catalog for SqlCatalog {
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)
AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
{}
AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?",
self.record_type_filter()
),
vec![
Some(&staged_metadata_location_str),
Expand Down Expand Up @@ -1020,14 +1066,15 @@ mod tests {
use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
use itertools::Itertools;
use regex::Regex;
use sqlx::any::install_default_drivers;
use sqlx::migrate::MigrateDatabase;
use tempfile::TempDir;

use crate::catalog::{
NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
SQL_CATALOG_PROP_WAREHOUSE,
NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE,
SQL_CATALOG_PROP_SCHEMA_VERSION, SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE,
};
use crate::{SqlBindStyle, SqlCatalogBuilder};
use crate::{SchemaVersion, SqlBindStyle, SqlCatalogBuilder};

const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";

Expand Down Expand Up @@ -2029,4 +2076,119 @@ mod tests {
format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
);
}

/// Creates a V0 SQLite database (no `iceberg_type` column) with one pre-inserted table row.
/// Returns the SQLite URI and the temp dir that owns the database file.
async fn create_v0_sqlite_db() -> (String, TempDir) {
let temp_dir = TempDir::new().unwrap();
let uri = format!(
"sqlite:{}",
temp_dir.path().join("catalog.db").to_str().unwrap()
);
sqlx::Sqlite::create_database(&uri).await.unwrap();
let pool = sqlx::AnyPool::connect(&uri).await.unwrap();
sqlx::query(
"CREATE TABLE iceberg_tables (
catalog_name VARCHAR(255) NOT NULL,
table_namespace VARCHAR(255) NOT NULL,
table_name VARCHAR(255) NOT NULL,
metadata_location VARCHAR(1000),
previous_metadata_location VARCHAR(1000),
PRIMARY KEY (catalog_name, table_namespace, table_name)
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO iceberg_tables
(catalog_name, table_namespace, table_name, metadata_location)
VALUES ('iceberg', 'ns', 'tbl', '/tmp/fake-location')",
)
.execute(&pool)
.await
.unwrap();
pool.close().await;
(uri, temp_dir)
}

#[tokio::test]
async fn test_v0_schema_migration() {
install_default_drivers();

let (uri, temp_dir) = create_v0_sqlite_db().await;

// Opening the catalog with sql.schema-version=V1 should migrate the V0 schema.
let props = HashMap::from_iter([
(SQL_CATALOG_PROP_URI.to_string(), uri),
(
SQL_CATALOG_PROP_WAREHOUSE.to_string(),
temp_dir.path().to_str().unwrap().to_string(),
),
(
SQL_CATALOG_PROP_BIND_STYLE.to_string(),
SqlBindStyle::QMark.to_string(),
),
(
SQL_CATALOG_PROP_SCHEMA_VERSION.to_string(),
SchemaVersion::V1.to_string(),
),
]);
let catalog = SqlCatalogBuilder::default()
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("iceberg", props)
.await
.expect("should open V0 catalog and migrate schema when sql.schema-version=V1");

// The V0 row (no "iceberg_type" column) should be treated as a TABLE after migration.
let ns = NamespaceIdent::from_strs(["ns"]).unwrap();
let tables = catalog.list_tables(&ns).await.unwrap();
assert_eq!(tables.len(), 1);
assert_eq!(tables[0].name(), "tbl");
}

#[tokio::test]
async fn test_v0_schema_no_migration_without_property() {
install_default_drivers();

let (uri, temp_dir) = create_v0_sqlite_db().await;

// Opening without sql.schema-version=V1 should NOT migrate — but should still work.
let props = HashMap::from_iter([
(SQL_CATALOG_PROP_URI.to_string(), uri.clone()),
(
SQL_CATALOG_PROP_WAREHOUSE.to_string(),
temp_dir.path().to_str().unwrap().to_string(),
),
(
SQL_CATALOG_PROP_BIND_STYLE.to_string(),
SqlBindStyle::QMark.to_string(),
),
]);
let catalog = SqlCatalogBuilder::default()
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("iceberg", props)
.await
.expect("should open V0 catalog without migrating");

assert_eq!(catalog.schema_version, SchemaVersion::V0);

// The table should still be visible via V0 queries (no iceberg_type filter).
let ns = NamespaceIdent::from_strs(["ns"]).unwrap();
let tables = catalog.list_tables(&ns).await.unwrap();
assert_eq!(tables.len(), 1);
assert_eq!(tables[0].name(), "tbl");

// Confirm the column was NOT added to the database.
let probe_pool = sqlx::AnyPool::connect(&uri).await.unwrap();
let column_exists = sqlx::query("SELECT iceberg_type FROM iceberg_tables LIMIT 0")
.execute(&probe_pool)
.await
.is_ok();
probe_pool.close().await;
assert!(
!column_exists,
"iceberg_type column should not exist when sql.schema-version=V1 was not set"
);
}
}
2 changes: 1 addition & 1 deletion crates/catalog/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! SqlBindStyle, SqlCatalogBuilder,
//! };
//!
//! #[tokio::main]
//! #[tokio::main(flavor = "current_thread")]
//! async fn main() {
//! let catalog = SqlCatalogBuilder::default()
//! .load(
Expand Down
Loading