diff --git a/dbt-snowflake/.changes/unreleased/Features-20250311-185013.yaml b/dbt-snowflake/.changes/unreleased/Features-20250311-185013.yaml new file mode 100644 index 000000000..e857d5ff5 --- /dev/null +++ b/dbt-snowflake/.changes/unreleased/Features-20250311-185013.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for Iceberg REST and AWS Glue catalogs +time: 2025-03-11T18:50:13.180115-04:00 +custom: + Author: mikealfare colin-rogers-dbt + Issue: "792" diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/__init__.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/__init__.py index cfd54a378..9c0528353 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/__init__.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/__init__.py @@ -4,10 +4,26 @@ IcebergManagedCatalogIntegration, IcebergManagedCatalogRelation, ) +from dbt.adapters.snowflake.catalogs._iceberg_rest import ( + IcebergAWSGlueCatalogIntegration, + IcebergRESTCatalogIntegration, + IcebergRESTCatalogRelation, +) -SnowflakeCatalogRelation = Union[IcebergManagedCatalogRelation] -SnowflakeCatalogIntegration = Union[IcebergManagedCatalogIntegration] +SnowflakeCatalogRelation = Union[ + IcebergManagedCatalogRelation, + IcebergRESTCatalogRelation, +] +SnowflakeCatalogIntegration = Union[ + IcebergAWSGlueCatalogIntegration, + IcebergManagedCatalogIntegration, + IcebergRESTCatalogIntegration, +] -CATALOG_INTEGRATIONS = [IcebergManagedCatalogIntegration] +CATALOG_INTEGRATIONS = [ + IcebergAWSGlueCatalogIntegration, + IcebergManagedCatalogIntegration, + IcebergRESTCatalogIntegration, +] diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py new file mode 100644 index 000000000..5e61d6459 --- /dev/null +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py @@ -0,0 +1,95 @@ +from dataclasses import dataclass +from typing import Optional + +from dbt.adapters.catalogs import CatalogIntegration, CatalogIntegrationConfig +from dbt.adapters.contracts.relation import RelationConfig + +from dbt.adapters.snowflake.catalogs._parse_relation_config import ( + auto_refresh, + catalog_namespace, + catalog_table, + external_volume, + replace_invalid_characters, +) + + +@dataclass +class IcebergRESTCatalogRelation: + """ + Represents a Snowflake Iceberg REST or AWS Glue catalog relation: + https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-rest + https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-aws-glue + """ + + catalog_table: str + catalog_name: Optional[str] = None + catalog_namespace: Optional[str] = None + external_volume: Optional[str] = None + replace_invalid_characters: Optional[bool] = None + auto_refresh: Optional[bool] = None + table_format: str = "iceberg" + + +class IcebergRESTCatalogIntegration(CatalogIntegration): + """ + Implements Snowflake's Iceberg REST Catalog Integration: + https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration-rest + + Implements Snowflake's AWS Glue Catalog Integration: + https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration-glue + + While external volumes are a separate, but related concept in Snowflake, + we assume that a catalog integration is always associated with an external volume. + + Attributes: + name (str): the name of the catalog integration, e.g. "my_iceberg_rest_catalog" + catalog_type (str): the type of catalog integration + - must be "iceberg_rest" or "aws_glue" + external_volume (str): the external volume associated with the catalog integration + - if left empty, the default for the database/account will be used + table_format (str): the table format this catalog uses + - must be "iceberg" + allows_writes (bool): identifies whether this catalog integration supports writes + - must be False + """ + + catalog_type: str = "iceberg_rest" + table_format: str = "iceberg" + allows_writes: bool = False + + def __init__(self, config: CatalogIntegrationConfig) -> None: + super().__init__(config) + if config.adapter_properties: + self.catalog_namespace = config.adapter_properties.get("catalog_namespace") + self.replace_invalid_characters = config.adapter_properties.get( + "replace_invalid_characters" + ) + self.auto_refresh = config.adapter_properties.get("auto_refresh") + else: + self.catalog_namespace = None + self.replace_invalid_characters = None + self.auto_refresh = None + + def build_relation(self, model: RelationConfig) -> IcebergRESTCatalogRelation: + + # booleans need to be handled explicitly since False is "None-sey" + _replace_invalid_characters = replace_invalid_characters(model) + if _replace_invalid_characters is None: + _replace_invalid_characters = self.replace_invalid_characters + + _auto_refresh = auto_refresh(model) + if _auto_refresh is None: + _auto_refresh = self.auto_refresh + + return IcebergRESTCatalogRelation( + catalog_table=catalog_table(model), + catalog_name=self.catalog_name, + catalog_namespace=catalog_namespace(model) or self.catalog_namespace, + external_volume=external_volume(model) or self.external_volume, + replace_invalid_characters=_replace_invalid_characters, + auto_refresh=_auto_refresh, + ) + + +class IcebergAWSGlueCatalogIntegration(IcebergRESTCatalogIntegration): + catalog_type: str = "aws_glue" diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_parse_relation_config.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_parse_relation_config.py index dd5c87788..136cc269f 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_parse_relation_config.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_parse_relation_config.py @@ -43,3 +43,27 @@ def automatic_clustering(model: RelationConfig) -> bool: if model.config: return model.config.get("automatic_clustering", False) return False + + +def catalog_table(model: RelationConfig) -> str: + if model.config: + return model.config.get("catalog_table", model.identifier) + return model.identifier + + +def catalog_namespace(model: RelationConfig) -> Optional[str]: + if model.config: + return model.config.get("catalog_namespace") + return None + + +def replace_invalid_characters(model: RelationConfig) -> Optional[bool]: + if model.config: + return model.config.get("replace_invalid_characters") + return None + + +def auto_refresh(model: RelationConfig) -> Optional[bool]: + if model.config: + return model.config.get("auto_refresh") + return None diff --git a/dbt-snowflake/tests/functional/catalog_integration_tests/__init__.py b/dbt-snowflake/tests/functional/catalog_integration_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbt-snowflake/tests/functional/catalog_integration_tests/_files.py b/dbt-snowflake/tests/functional/catalog_integration_tests/_files.py new file mode 100644 index 000000000..e62149ce8 --- /dev/null +++ b/dbt-snowflake/tests/functional/catalog_integration_tests/_files.py @@ -0,0 +1,57 @@ +import os + + +ICEBERG_REST_CATALOG = """ +catalogs: +- name: my_catalogs + read_integrations: + - name: my_iceberg_rest_catalog + adapter_type: snowflake + profile: snowflake_secondary + catalog_type: iceberg_rest +""" + + +ICEBERG_REST_MODEL = """ +config( + materialized='table', + catalog_name='my_iceberg_rest_catalog', + catalog_table_name='my_iceberg_rest_table', +) +""" + + +AWS_GLUE_CATALOG = """ +catalogs: +- name: my_catalogs + read_integrations: + - name: my_aws_glue_catalog + adapter_type: snowflake + profile: snowflake_secondary + catalog_type: aws_glue +""" + + +AWS_GLUE_MODEL = """ +config( + materialized='table', + catalog_name='my_aws_glue_catalog', + catalog_table_name='my_aws_glue_table', +) +""" + + +SECONDARY_PROFILE = f""" +secondary_profiles: +- snowflake_secondary: + outputs: + dev: + type: snowflake + threads: 4 + account: {os.getenv("SNOWFLAKE_TEST_ACCOUNT")} + user: {os.getenv("SNOWFLAKE_TEST_USER")}, + password: {os.getenv("SNOWFLAKE_TEST_PASSWORD")}, + database: {os.getenv("SNOWFLAKE_TEST_DATABASE")}, + warehouse: {os.getenv("SNOWFLAKE_TEST_WAREHOUSE")}, + target: dev +""" diff --git a/dbt-snowflake/tests/functional/catalog_integration_tests/test_iceberg_rest.py b/dbt-snowflake/tests/functional/catalog_integration_tests/test_iceberg_rest.py new file mode 100644 index 000000000..d613ffd37 --- /dev/null +++ b/dbt-snowflake/tests/functional/catalog_integration_tests/test_iceberg_rest.py @@ -0,0 +1,41 @@ +import yaml + +import pytest + +from tests.functional.catalog_integration_tests import _files + + +class IcebergREST: + + @pytest.mark.skip(reason="infra is not ready yet") + def test_table_gets_created(self, project): + results = project.run_dbt(["run"]) + assert len(results) == 1 + records = project.run_sql("select * from my_model", fetch="all") + assert len(records) > 0 + + +class TestIcebergREST(IcebergREST): + + @pytest.fixture(scope="class") + def profiles_config_update(self): + catalogs = yaml.load(_files.ICEBERG_REST_CATALOG, Loader=yaml.SafeLoader) + secondary_profile = yaml.load(_files.SECONDARY_PROFILE, Loader=yaml.SafeLoader) + return {**catalogs, **secondary_profile} + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": _files.ICEBERG_REST_MODEL} + + +class TestAWSGlue(IcebergREST): + + @pytest.fixture(scope="class") + def profiles_config_update(self): + catalogs = yaml.load(_files.ICEBERG_REST_CATALOG, Loader=yaml.SafeLoader) + secondary_profile = yaml.load(_files.SECONDARY_PROFILE, Loader=yaml.SafeLoader) + return {**catalogs, **secondary_profile} + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": _files.AWS_GLUE_MODEL} diff --git a/dbt-snowflake/tests/unit/catalog_tests/__init__.py b/dbt-snowflake/tests/unit/catalog_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbt-snowflake/tests/unit/test_iceberg_location.py b/dbt-snowflake/tests/unit/catalog_tests/test_iceberg_managed.py similarity index 100% rename from dbt-snowflake/tests/unit/test_iceberg_location.py rename to dbt-snowflake/tests/unit/catalog_tests/test_iceberg_managed.py diff --git a/dbt-snowflake/tests/unit/catalog_tests/test_iceberg_rest.py b/dbt-snowflake/tests/unit/catalog_tests/test_iceberg_rest.py new file mode 100644 index 000000000..f9e98a3f7 --- /dev/null +++ b/dbt-snowflake/tests/unit/catalog_tests/test_iceberg_rest.py @@ -0,0 +1,100 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +import pytest + +from dbt.adapters.snowflake.catalogs import ( + IcebergRESTCatalogIntegration, + IcebergRESTCatalogRelation, +) + + +CATALOG_INTEGRATION_NAME = "my_iceberg_rest_catalog_integration" +CATALOG_NAME = "my_iceberg_rest_catalog" + + +@dataclass +class FakeCatalogIntegrationConfig: + name: str = CATALOG_INTEGRATION_NAME + catalog_name: str = CATALOG_NAME + # the above are required, but static (for these tests) parameters + catalog_type: str = "iceberg_rest" + table_format: str = "iceberg" + external_volume: Optional[str] = None + adapter_properties: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class FakeRelationConfig: + database: str = "my_database" + schema: str = "my_schema" + identifier: str = "my_table" + config: Dict[str, Any] = field(default_factory=dict) + + +MINIMUM_CATALOG_INTEGRATION_CONFIG = FakeCatalogIntegrationConfig() +MINIMUM_RELATION_CONFIG = FakeRelationConfig() +MAXIMUM_CATALOG_INTEGRATION_CONFIG = FakeCatalogIntegrationConfig( + external_volume="my_external_volume", + adapter_properties={ + "catalog_namespace": "my_catalog_namespace", + "replace_invalid_characters": True, + "auto_refresh": True, + }, +) +MAXIMUM_RELATION_CONFIG = FakeRelationConfig( + config={ + "catalog_table": "my_special_catalog_table", + "catalog_namespace": "my_special_catalog_namespace", + "external_volume": "my_special_external_volume", + "replace_invalid_characters": False, + "auto_refresh": False, + } +) + + +DEFAULTS = IcebergRESTCatalogRelation( + catalog_table=MINIMUM_RELATION_CONFIG.identifier, + catalog_name=CATALOG_NAME, + catalog_namespace=None, + external_volume=None, + replace_invalid_characters=None, + auto_refresh=None, + table_format="iceberg", +) +CATALOG_DEFAULTS = IcebergRESTCatalogRelation( + catalog_table=MINIMUM_RELATION_CONFIG.identifier, + catalog_name=CATALOG_NAME, + catalog_namespace=MAXIMUM_CATALOG_INTEGRATION_CONFIG.adapter_properties["catalog_namespace"], + external_volume=MAXIMUM_CATALOG_INTEGRATION_CONFIG.external_volume, + replace_invalid_characters=MAXIMUM_CATALOG_INTEGRATION_CONFIG.adapter_properties[ + "replace_invalid_characters" + ], + auto_refresh=MAXIMUM_CATALOG_INTEGRATION_CONFIG.adapter_properties["auto_refresh"], + table_format="iceberg", +) +RELATION_OVERRIDES = IcebergRESTCatalogRelation( + catalog_table=MAXIMUM_RELATION_CONFIG.config["catalog_table"], + catalog_name=CATALOG_NAME, + catalog_namespace=MAXIMUM_RELATION_CONFIG.config["catalog_namespace"], + external_volume=MAXIMUM_RELATION_CONFIG.config["external_volume"], + replace_invalid_characters=MAXIMUM_RELATION_CONFIG.config["replace_invalid_characters"], + auto_refresh=MAXIMUM_RELATION_CONFIG.config["auto_refresh"], + table_format="iceberg", +) + + +@pytest.mark.parametrize( + "catalog_integration_config,relation_config,expected_relation", + [ + (MINIMUM_CATALOG_INTEGRATION_CONFIG, MINIMUM_RELATION_CONFIG, DEFAULTS), + (MAXIMUM_CATALOG_INTEGRATION_CONFIG, MINIMUM_RELATION_CONFIG, CATALOG_DEFAULTS), + (MINIMUM_CATALOG_INTEGRATION_CONFIG, MAXIMUM_RELATION_CONFIG, RELATION_OVERRIDES), + (MAXIMUM_CATALOG_INTEGRATION_CONFIG, MAXIMUM_RELATION_CONFIG, RELATION_OVERRIDES), + ], +) +def test_correct_relations_are_produced( + catalog_integration_config, relation_config, expected_relation +): + catalog_integration = IcebergRESTCatalogIntegration(catalog_integration_config) + assert catalog_integration.build_relation(relation_config) == expected_relation