diff --git a/Cargo.lock b/Cargo.lock index d659a6a7f8..141234148b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3511,6 +3511,7 @@ dependencies = [ "strum", "tempfile", "tokio", + "tracing", ] [[package]] diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index eff605f466..5696215a71 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -33,6 +33,7 @@ async-trait = { workspace = true } iceberg = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } strum = { workspace = true } +tracing = { workspace = true } [dev-dependencies] itertools = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 7e468e7e37..548e28c8f6 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -41,6 +41,8 @@ pub const SQL_CATALOG_PROP_URI: &str = "uri"; pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; /// catalog sql bind style pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style"; +/// catalog schema version, setting to "V1" will migrate from V0 to V1 schema +pub const SQL_CATALOG_PROP_SCHEMA_VERSION: &str = "sql.schema-version"; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; @@ -221,6 +223,17 @@ pub struct SqlCatalog { warehouse_location: String, fileio: FileIO, sql_bind_style: SqlBindStyle, + schema_version: SchemaVersion, +} + +#[derive(Debug, Clone, Copy, PartialEq, strum::EnumString, strum::Display)] +#[strum(ascii_case_insensitive)] +/// Schema version of the `iceberg_tables` catalog table. +pub enum SchemaVersion { + /// Original schema without the `iceberg_type` column. + V0, + /// Extended schema with the `iceberg_type` column for view support. + V1, } #[derive(Debug, PartialEq, strum::EnumString, strum::Display)] @@ -297,15 +310,60 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; + // Check if the catalog table supports views, indicating that the schema is V1 + let is_v1 = sqlx::query(&format!( + "SELECT {CATALOG_FIELD_RECORD_TYPE} FROM {CATALOG_TABLE_NAME} LIMIT 0" + )) + .execute(&pool) + .await + .is_ok(); + + // Migrate the schema to V1 if the catalog table does not support views and the caller opted in. + let schema_version = if is_v1 { + tracing::debug!("{CATALOG_TABLE_NAME} already supports views"); + SchemaVersion::V1 + } else { + let requested: SchemaVersion = config + .props + .get(SQL_CATALOG_PROP_SCHEMA_VERSION) + .and_then(|v| v.parse().ok()) + .unwrap_or(SchemaVersion::V0); + if requested == SchemaVersion::V1 { + tracing::debug!("{CATALOG_TABLE_NAME} is being updated to support views"); + sqlx::query(&format!( + "ALTER TABLE {CATALOG_TABLE_NAME} \ + ADD COLUMN {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5)" + )) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; + SchemaVersion::V1 + } else { + tracing::warn!( + "SqlCatalog is initialized without view support. To auto-migrate the database's schema and enable view support, set {}=V1", + SQL_CATALOG_PROP_SCHEMA_VERSION + ); + SchemaVersion::V0 + } + }; + Ok(SqlCatalog { name: config.name.to_owned(), connection: pool, warehouse_location: config.warehouse_location, fileio, sql_bind_style: config.sql_bind_style, + schema_version, }) } + fn record_type_filter(&self) -> &'static str { + match self.schema_version { + SchemaVersion::V1 => "AND (iceberg_type = 'TABLE' OR iceberg_type IS NULL)", + SchemaVersion::V0 => "", + } + } + /// SQLX Any does not implement PostgresSQL bindings, so we have to do this. fn replace_placeholders(&self, query: &str) -> String { match self.sql_bind_style { @@ -674,10 +732,8 @@ impl Catalog for SqlCatalog { FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )", + {}", + self.record_type_filter() ), vec![Some(&namespace.join(".")), Some(&self.name)], ) @@ -713,10 +769,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.record_type_filter() ), vec![Some(&namespace), Some(&self.name), Some(table_name)], ) @@ -740,10 +794,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.record_type_filter() ), vec![ Some(&self.name), @@ -781,10 +833,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.record_type_filter() ), vec![ Some(&self.name), @@ -907,10 +957,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.record_type_filter() ), vec![ Some(dest.name()), @@ -978,11 +1026,9 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - ) - AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?" + {} + AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?", + self.record_type_filter() ), vec![ Some(&staged_metadata_location_str), @@ -1020,14 +1066,15 @@ mod tests { use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; use regex::Regex; + use sqlx::any::install_default_drivers; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; use crate::catalog::{ - NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI, - SQL_CATALOG_PROP_WAREHOUSE, + NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, + SQL_CATALOG_PROP_SCHEMA_VERSION, SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE, }; - use crate::{SqlBindStyle, SqlCatalogBuilder}; + use crate::{SchemaVersion, SqlBindStyle, SqlCatalogBuilder}; const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; @@ -2029,4 +2076,119 @@ mod tests { format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"), ); } + + /// Creates a V0 SQLite database (no `iceberg_type` column) with one pre-inserted table row. + /// Returns the SQLite URI and the temp dir that owns the database file. + async fn create_v0_sqlite_db() -> (String, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let uri = format!( + "sqlite:{}", + temp_dir.path().join("catalog.db").to_str().unwrap() + ); + sqlx::Sqlite::create_database(&uri).await.unwrap(); + let pool = sqlx::AnyPool::connect(&uri).await.unwrap(); + sqlx::query( + "CREATE TABLE iceberg_tables ( + catalog_name VARCHAR(255) NOT NULL, + table_namespace VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + metadata_location VARCHAR(1000), + previous_metadata_location VARCHAR(1000), + PRIMARY KEY (catalog_name, table_namespace, table_name) + )", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO iceberg_tables + (catalog_name, table_namespace, table_name, metadata_location) + VALUES ('iceberg', 'ns', 'tbl', '/tmp/fake-location')", + ) + .execute(&pool) + .await + .unwrap(); + pool.close().await; + (uri, temp_dir) + } + + #[tokio::test] + async fn test_v0_schema_migration() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // Opening the catalog with sql.schema-version=V1 should migrate the V0 schema. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ( + SQL_CATALOG_PROP_SCHEMA_VERSION.to_string(), + SchemaVersion::V1.to_string(), + ), + ]); + let catalog = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await + .expect("should open V0 catalog and migrate schema when sql.schema-version=V1"); + + // The V0 row (no "iceberg_type" column) should be treated as a TABLE after migration. + let ns = NamespaceIdent::from_strs(["ns"]).unwrap(); + let tables = catalog.list_tables(&ns).await.unwrap(); + assert_eq!(tables.len(), 1); + assert_eq!(tables[0].name(), "tbl"); + } + + #[tokio::test] + async fn test_v0_schema_no_migration_without_property() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // Opening without sql.schema-version=V1 should NOT migrate — but should still work. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri.clone()), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]); + let catalog = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await + .expect("should open V0 catalog without migrating"); + + assert_eq!(catalog.schema_version, SchemaVersion::V0); + + // The table should still be visible via V0 queries (no iceberg_type filter). + let ns = NamespaceIdent::from_strs(["ns"]).unwrap(); + let tables = catalog.list_tables(&ns).await.unwrap(); + assert_eq!(tables.len(), 1); + assert_eq!(tables[0].name(), "tbl"); + + // Confirm the column was NOT added to the database. + let probe_pool = sqlx::AnyPool::connect(&uri).await.unwrap(); + let column_exists = sqlx::query("SELECT iceberg_type FROM iceberg_tables LIMIT 0") + .execute(&probe_pool) + .await + .is_ok(); + probe_pool.close().await; + assert!( + !column_exists, + "iceberg_type column should not exist when sql.schema-version=V1 was not set" + ); + } } diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs index b76006ed3b..3d6e9cb2e8 100644 --- a/crates/catalog/sql/src/lib.rs +++ b/crates/catalog/sql/src/lib.rs @@ -29,7 +29,7 @@ //! SqlBindStyle, SqlCatalogBuilder, //! }; //! -//! #[tokio::main] +//! #[tokio::main(flavor = "current_thread")] //! async fn main() { //! let catalog = SqlCatalogBuilder::default() //! .load(