From 201b476e7eca99cb894c8dbcd539bb7677b1f993 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 5 Feb 2025 11:23:00 -0800 Subject: [PATCH 1/7] Add Snowflake Implementation of Catalog Support --- dbt-snowflake/hatch.toml | 9 ++ .../src/dbt/adapters/snowflake/catalog.py | 87 +++++++++++++++++++ .../src/dbt/adapters/snowflake/impl.py | 11 ++- .../src/dbt/adapters/snowflake/relation.py | 42 +++++---- .../snowflake/relation_configs/catalog.py | 4 + .../relation_configs/dynamic_table.py | 43 ++++++++- .../macros/relations/dynamic_table/create.sql | 8 +- .../relations/dynamic_table/replace.sql | 11 ++- .../macros/relations/table/create.sql | 14 ++- .../test_relation_type_change.py | 2 +- 10 files changed, 199 insertions(+), 32 deletions(-) create mode 100644 dbt-snowflake/src/dbt/adapters/snowflake/catalog.py diff --git a/dbt-snowflake/hatch.toml b/dbt-snowflake/hatch.toml index 91dfb64a1..3007057fb 100644 --- a/dbt-snowflake/hatch.toml +++ b/dbt-snowflake/hatch.toml @@ -38,6 +38,15 @@ docker-dev = [ "docker run --rm -it --name dbt-snowflake-dev -v $(pwd):/opt/code dbt-snowflake-dev", ] +[envs.local] +# TODO: if/when hatch gets support for defining editable dependencies, the +# pre-install commands here and post-install commands in the matrix can be moved +# to the dependencies section +pre-install-commands = [ + "pip install -e ../dbt-adapter", + "pip install -e ../dbt-core/core", + ] + [envs.build] detached = true dependencies = [ diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py new file mode 100644 index 000000000..adb67b797 --- /dev/null +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py @@ -0,0 +1,87 @@ +from typing import Dict, Optional, Any + +import textwrap + +from dbt.adapters.base import BaseRelation +from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationType +from dbt.adapters.contracts.relation import RelationConfig +from dbt.adapters.relation_configs import RelationResults + + +class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration): + catalog_type = CatalogIntegrationType.managed + + def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str: + """ + {{ optional('external_volume', dynamic_table.catalog.external_volume) }} + {{ optional('catalog', dynamic_table.catalog.name) }} + base_location = '{{ dynamic_table.catalog.base_location }}' + :param config: + :param relation: + :return: + """ + base_location: str = f"_dbt/{relation.schema}/{relation.name}" + + if sub_path := config.get("base_location_subpath"): + base_location += f"/{sub_path}" + + iceberg_ddl_predicates: str = f""" + external_volume = '{self.external_volume}' + catalog = 'snowflake' + base_location = '{base_location}' + """ + return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10) + + @classmethod + def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: + import agate + + # this try block can be removed once enable_iceberg_materializations is retired + try: + catalog_results: "agate.Table" = relation_results["catalog"] + except KeyError: + # this happens when `enable_iceberg_materializations` is turned off + return {} + + if len(catalog_results) == 0: + # this happens when the dynamic table is a standard dynamic table (e.g. not iceberg) + return {} + + # for now, if we get catalog results, it's because this is an iceberg table + # this is because we only run `show iceberg tables` to get catalog metadata + # this will need to be updated once this is in `show objects` + catalog: "agate.Row" = catalog_results.rows[0] + config_dict = { + "table_format": "iceberg", + "name": catalog.get("catalog_name"), + "external_volume": catalog.get("external_volume_name"), + "base_location": catalog.get("base_location"), + } + + return config_dict + + +class SnowflakeGlueCatalogIntegration(CatalogIntegration): + catalog_type = CatalogIntegrationType.glue + auto_refresh: Optional[str] = None # "TRUE" | "FALSE" + replace_invalid_characters: Optional[str] = None # "TRUE" | "FALSE" + + def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None: + if adapter_configs: + if "auto_refresh" in adapter_configs: + self.auto_refresh = adapter_configs["auto_refresh"] + if "replace_invalid_characters" in adapter_configs: + self.replace_invalid_characters = adapter_configs["replace_invalid_characters"] + + def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str: + ddl_predicate = f""" + external_volume = '{self.external_volume}' + catalog = '{self.integration_name}' + """ + if self.namespace: + ddl_predicate += f"CATALOG_NAMESPACE = '{self.namespace}'\n" + if self.auto_refresh: + ddl_predicate += f"auto_refresh = {self.auto_refresh}\n" + if self.replace_invalid_characters: + ddl_predicate += f"replace_invalid_characters = {self.replace_invalid_characters}\n" + return ddl_predicate diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/impl.py b/dbt-snowflake/src/dbt/adapters/snowflake/impl.py index 6ae8ef183..7ad5d0399 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/impl.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/impl.py @@ -4,7 +4,12 @@ from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport from dbt.adapters.base.meta import available from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability +from dbt.adapters.contracts.catalog import CatalogIntegrationType from dbt.adapters.contracts.relation import RelationConfig +from dbt.adapters.snowflake.catalog import ( + SnowflakeManagedIcebergCatalogIntegration, + SnowflakeGlueCatalogIntegration, +) from dbt.adapters.sql import SQLAdapter from dbt.adapters.sql.impl import ( LIST_SCHEMAS_MACRO_NAME, @@ -56,6 +61,7 @@ class SnowflakeConfig(AdapterConfig): external_volume: Optional[str] = None base_location_root: Optional[str] = None base_location_subpath: Optional[str] = None + catalog_name: Optional[str] = None class SnowflakeAdapter(SQLAdapter): @@ -64,7 +70,10 @@ class SnowflakeAdapter(SQLAdapter): ConnectionManager = SnowflakeConnectionManager AdapterSpecificConfigs = SnowflakeConfig - + CatalogIntegrations = { + CatalogIntegrationType.managed.value: SnowflakeManagedIcebergCatalogIntegration, + CatalogIntegrationType.glue.value: SnowflakeGlueCatalogIntegration, + } CONSTRAINT_SUPPORT = { ConstraintType.check: ConstraintSupport.NOT_SUPPORTED, ConstraintType.not_null: ConstraintSupport.ENFORCED, diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation.py index f3ee3e510..efe1d033d 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation.py @@ -3,8 +3,9 @@ from dataclasses import dataclass, field from typing import FrozenSet, Optional, Type, Iterator, Tuple - +from dbt.adapters.clients import catalogs as catalogs_client from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType from dbt.adapters.contracts.relation import ComponentName, RelationConfig from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug from dbt.adapters.relation_configs import ( @@ -12,6 +13,7 @@ RelationConfigChangeAction, RelationResults, ) +from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration from dbt.adapters.utils import classproperty from dbt_common.exceptions import DbtRuntimeError from dbt_common.events.functions import fire_event, warn_or_error @@ -64,6 +66,10 @@ def is_dynamic_table(self) -> bool: @property def is_iceberg_format(self) -> bool: + if self.catalog_name: + return ( + catalogs_client.get_catalog(self.catalog_name).table_format == TableFormat.ICEBERG + ) return self.table_format == TableFormat.ICEBERG @classproperty @@ -168,6 +174,12 @@ def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> transient_explicitly_set_true: bool = config.get("transient", False) + catalog_name = config.get("catalog_name", None) + if catalog_name: + catalog = catalogs_client.get_catalog(catalog_name) + if catalog.table_format == TableFormat.ICEBERG: + return "iceberg" + # Temporary tables are a Snowflake feature that do not exist in the # Iceberg framework. We ignore the Iceberg status of the model. if temporary: @@ -203,21 +215,21 @@ def get_ddl_prefix_for_alter(self) -> str: else: return "" - def get_iceberg_ddl_options(self, config: RelationConfig) -> str: - # If the base_location_root config is supplied, overwrite the default value ("_dbt/") - base_location: str = ( - f"{config.get('base_location_root', '_dbt')}/{self.schema}/{self.name}" + def add_managed_catalog_integration(self, config: RelationConfig) -> str: + catalog_name = "snowflake_managed" + external_volume = config.get("external_volume") + integration_config = CatalogIntegrationConfig( + catalog_name=catalog_name, + integration_name=catalog_name, + table_format=self.table_format, + catalog_type=CatalogIntegrationType.managed.value, + external_volume=external_volume, ) - - if subpath := config.get("base_location_subpath"): - base_location += f"/{subpath}" - - iceberg_ddl_predicates: str = f""" - external_volume = '{config.get('external_volume')}' - catalog = 'snowflake' - base_location = '{base_location}' - """ - return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10) + catalogs_client.add_catalog( + SnowflakeManagedIcebergCatalogIntegration(integration_config), + catalog_name=catalog_name, + ) + return catalog_name def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]: drop_view_message: str = ( diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/catalog.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/catalog.py index c8d7de40f..309dac579 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/catalog.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/catalog.py @@ -89,6 +89,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any return config_dict + @classmethod + def from_relation_config(cls, relation_config: RelationConfig) -> Self: + return cls.from_dict(cls.parse_relation_config(relation_config)) + @classmethod def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: # this try block can be removed once enable_iceberg_materializations is retired diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 7361df80a..a6c954d59 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -1,12 +1,16 @@ from dataclasses import dataclass -from typing import Optional, Dict, Any, TYPE_CHECKING +from typing import Optional, Dict, Any, TYPE_CHECKING, Union +from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType from dbt.adapters.relation_configs import RelationConfigChange, RelationResults +from dbt.adapters.clients import catalogs as catalogs_client from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.contracts.relation import ComponentName from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11 from typing_extensions import Self +from dbt.adapters.relation_configs.formats import TableFormat +from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase from dbt.adapters.snowflake.relation_configs.catalog import ( SnowflakeCatalogConfig, @@ -37,6 +41,35 @@ def default(cls) -> Self: return cls("ON_CREATE") +def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> str: + if not catalog_info: + return "SNOWFLAKE" + elif isinstance(catalog_info, str): + return catalog_info + elif isinstance(catalog_info, dict): + catalog_config = SnowflakeCatalogConfig.from_dict(catalog_info) + else: + catalog_config = SnowflakeCatalogConfig.from_relation_config(catalog_info) + + if catalog_config.table_format != TableFormat.default(): + catalog_name = "snowflake_managed" + integration_config = CatalogIntegrationConfig( + catalog_name=catalog_name, + integration_name=catalog_config.name, + table_format=catalog_config.table_format, + catalog_type=CatalogIntegrationType.managed.value, + external_volume=catalog_config.external_volume, + ) + catalogs_client.add_catalog( + SnowflakeManagedIcebergCatalogIntegration(integration_config), + catalog_name=catalog_name, + ) + return catalog_name + else: + return TableFormat.default().value + + + @dataclass(frozen=True, eq=True, unsafe_hash=True) class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): """ @@ -60,12 +93,13 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): query: str target_lag: str snowflake_warehouse: str - catalog: SnowflakeCatalogConfig + catalog: str = "SNOWFLAKE" refresh_mode: Optional[RefreshMode] = RefreshMode.default() initialize: Optional[Initialize] = Initialize.default() @classmethod def from_dict(cls, config_dict: Dict[str, Any]) -> Self: + catalog = _setup_catalog_integration(config_dict["catalog"]) kwargs_dict = { "name": cls._render_part(ComponentName.Identifier, config_dict.get("name")), "schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")), @@ -75,7 +109,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self: "query": config_dict.get("query"), "target_lag": config_dict.get("target_lag"), "snowflake_warehouse": config_dict.get("snowflake_warehouse"), - "catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]), + "catalog": catalog, "refresh_mode": config_dict.get("refresh_mode"), "initialize": config_dict.get("initialize"), } @@ -84,6 +118,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self: @classmethod def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]: + catalog = _setup_catalog_integration(relation_config) config_dict = { "name": relation_config.identifier, "schema_name": relation_config.schema, @@ -91,7 +126,7 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any "query": relation_config.compiled_code, "target_lag": relation_config.config.extra.get("target_lag"), "snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"), - "catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config), + "catalog": catalog, } if refresh_mode := relation_config.config.extra.get("refresh_mode"): diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/create.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/create.sql index 4ebcf145b..d846e7ed2 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/create.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/create.sql @@ -15,7 +15,7 @@ {%- set dynamic_table = relation.from_config(config.model) -%} - {%- if dynamic_table.catalog.table_format == 'iceberg' -%} + {%- if dynamic_table.catalog != 'snowflake' -%} {{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} {%- else -%} {{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} @@ -70,12 +70,12 @@ -- A valid DDL statement which will result in a new dynamic iceberg table. -#} + {% set catalog_integration = adapter.get_catalog_integration(relation.catalog) -%} + create dynamic iceberg table {{ relation }} target_lag = '{{ dynamic_table.target_lag }}' warehouse = {{ dynamic_table.snowflake_warehouse }} - {{ optional('external_volume', dynamic_table.catalog.external_volume) }} - {{ optional('catalog', dynamic_table.catalog.name) }} - base_location = '{{ dynamic_table.catalog.base_location }}' + {{ catalog_integration.render_ddl_predicates(relation, config.model.config) }} {{ optional('refresh_mode', dynamic_table.refresh_mode) }} {{ optional('initialize', dynamic_table.initialize) }} as ( diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql index 2e7b4566a..0a09b32de 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql @@ -15,7 +15,7 @@ {%- set dynamic_table = relation.from_config(config.model) -%} - {%- if dynamic_table.catalog.table_format == 'iceberg' -%} + {%- if dynamic_table.catalog != 'SNOWFLAKE' -%} {{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} {%- else -%} {{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} @@ -68,13 +68,16 @@ -- Returns: -- A valid DDL statement which will result in a new dynamic iceberg table. -#} + {% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%} + + {% if not catalog_integration -%} + {{ raise('Catalog integration is required for iceberg tables') }} + {%- endif -%} create or replace dynamic iceberg table {{ relation }} target_lag = '{{ dynamic_table.target_lag }}' warehouse = {{ dynamic_table.snowflake_warehouse }} - {{ optional('external_volume', dynamic_table.catalog.external_volume) }} - {{ optional('catalog', dynamic_table.catalog.name) }} - base_location = '{{ dynamic_table.catalog.base_location }}' + {{ catalog_integration.render_ddl_predicates(relation) }} {{ optional('refresh_mode', dynamic_table.refresh_mode) }} {{ optional('initialize', dynamic_table.initialize) }} as ( diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql index 50bedd78f..b8cf3b4c0 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql @@ -12,6 +12,7 @@ {%- set cluster_by_keys = config.get('cluster_by', default=none) -%} {%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%} {%- set copy_grants = config.get('copy_grants', default=false) -%} + {%- set catalog_name = config.get('catalog_name', default=none) -%} {%- if cluster_by_keys is not none and cluster_by_keys is string -%} {%- set cluster_by_keys = [cluster_by_keys] -%} @@ -21,18 +22,25 @@ {% else %} {%- set cluster_by_string = none -%} {%- endif -%} + {%- if catalog_name is not none %} + {%- set catalog_integration = adapter.get_catalog_integration(catalog_name) -%} + {%- endif -%} {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none }} create or replace {{ materialization_prefix }} table {{ relation }} - {%- if relation.is_iceberg_format %} + {%- if catalog_integration is not none %} {# Valid DDL in CTAS statements. Plain create statements have a different order. https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table #} - {{ relation.get_iceberg_ddl_options(config.model.config) }} - {%- endif -%} + {{ catalog_integration.render_ddl_predicates(relation=relation, config=config.model.config) }} + {%- elif relation.is_iceberg_format %} + {%- set catalog_name = relation.add_managed_catalog_integration(config.model.config) -%} + {%- set catalog_integration = adapter.get_catalog_integration(catalog_name) -%} + {{ catalog_integration.render_ddl_predicates(relation, config.model.config) }} + {%- endif %} {%- set contract_config = config.get('contract') -%} {%- if contract_config.enforced -%} diff --git a/dbt-snowflake/tests/functional/relation_tests/test_relation_type_change.py b/dbt-snowflake/tests/functional/relation_tests/test_relation_type_change.py index 1024a92ca..244c5cb81 100644 --- a/dbt-snowflake/tests/functional/relation_tests/test_relation_type_change.py +++ b/dbt-snowflake/tests/functional/relation_tests/test_relation_type_change.py @@ -120,7 +120,7 @@ def test_replace(self, project, scenario): assert relation_type == scenario.final.relation_type, scenario.error_message if relation_type == "dynamic_table": dynamic_table = describe_dynamic_table(project, scenario.name) - assert dynamic_table.catalog.table_format == scenario.final.table_format + assert dynamic_table.catalog is not None else: pytest.skip() From 2abc5d987119bfd950da7b7a2d6a2e72aa04656b Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 5 Feb 2025 13:34:56 -0800 Subject: [PATCH 2/7] update formatting --- .../dbt/adapters/snowflake/relation_configs/dynamic_table.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py index a6c954d59..6a243d74f 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -69,7 +69,6 @@ def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> str return TableFormat.default().value - @dataclass(frozen=True, eq=True, unsafe_hash=True) class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): """ @@ -126,7 +125,7 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any "query": relation_config.compiled_code, "target_lag": relation_config.config.extra.get("target_lag"), "snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"), - "catalog": catalog, + "catalog": catalog, } if refresh_mode := relation_config.config.extra.get("refresh_mode"): From 96a8abe8cb3c8d7fc82a54b12acd94dd3a6fc4cd Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 5 Feb 2025 14:23:40 -0800 Subject: [PATCH 3/7] update import refs and add _hand_adapter_properties method --- .../src/dbt/adapters/snowflake/catalog.py | 34 ++++++++++++------ .../src/dbt/adapters/snowflake/impl.py | 2 +- .../src/dbt/adapters/snowflake/relation.py | 2 +- .../relation_configs/dynamic_table.py | 2 +- .../tests/unit/test_iceberg_location.py | 36 ++++++++++++------- 5 files changed, 51 insertions(+), 25 deletions(-) diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py index adb67b797..9a44f0bca 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py @@ -3,13 +3,22 @@ import textwrap from dbt.adapters.base import BaseRelation -from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationType +from dbt.adapters.base.catalog import CatalogIntegration, CatalogIntegrationType from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.relation_configs import RelationResults class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration): catalog_type = CatalogIntegrationType.managed + auto_refresh: Optional[str] = None # "TRUE" | "FALSE" + replace_invalid_characters: Optional[str] = None # "TRUE" | "FALSE" + + def _handle_adapter_properties(self, adapter_properties: Optional[Dict]) -> None: + if adapter_properties: + if "auto_refresh" in adapter_properties: + self.auto_refresh = adapter_properties["auto_refresh"] + if "replace_invalid_characters" in adapter_properties: + self.replace_invalid_characters = adapter_properties["replace_invalid_characters"] def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str: """ @@ -20,17 +29,22 @@ def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) :param relation: :return: """ - base_location: str = f"_dbt/{relation.schema}/{relation.name}" + base_location: str = f"{config.get('base_location_root', '_dbt')}" + base_location += f"/{relation.schema}/{relation.name}" if sub_path := config.get("base_location_subpath"): base_location += f"/{sub_path}" - iceberg_ddl_predicates: str = f""" + ddl_predicate = f""" external_volume = '{self.external_volume}' catalog = 'snowflake' base_location = '{base_location}' """ - return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10) + if self.auto_refresh: + ddl_predicate += f"auto_refresh = {self.auto_refresh}\n" + if self.replace_invalid_characters: + ddl_predicate += f"replace_invalid_characters = {self.replace_invalid_characters}\n" + return textwrap.indent(textwrap.dedent(ddl_predicate), " " * 10) @classmethod def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: @@ -66,12 +80,12 @@ class SnowflakeGlueCatalogIntegration(CatalogIntegration): auto_refresh: Optional[str] = None # "TRUE" | "FALSE" replace_invalid_characters: Optional[str] = None # "TRUE" | "FALSE" - def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None: - if adapter_configs: - if "auto_refresh" in adapter_configs: - self.auto_refresh = adapter_configs["auto_refresh"] - if "replace_invalid_characters" in adapter_configs: - self.replace_invalid_characters = adapter_configs["replace_invalid_characters"] + def _handle_adapter_properties(self, adapter_properties: Optional[Dict]) -> None: + if adapter_properties: + if "auto_refresh" in adapter_properties: + self.auto_refresh = adapter_properties["auto_refresh"] + if "replace_invalid_characters" in adapter_properties: + self.replace_invalid_characters = adapter_properties["replace_invalid_characters"] def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str: ddl_predicate = f""" diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/impl.py b/dbt-snowflake/src/dbt/adapters/snowflake/impl.py index 7ad5d0399..51b15a80d 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/impl.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/impl.py @@ -4,7 +4,7 @@ from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport from dbt.adapters.base.meta import available from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability -from dbt.adapters.contracts.catalog import CatalogIntegrationType +from dbt.adapters.base.catalog import CatalogIntegrationType from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.snowflake.catalog import ( SnowflakeManagedIcebergCatalogIntegration, diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation.py index efe1d033d..b1c30eb3a 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation.py @@ -5,7 +5,7 @@ from dbt.adapters.clients import catalogs as catalogs_client from dbt.adapters.base.relation import BaseRelation -from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType +from dbt.adapters.base.catalog import CatalogIntegrationConfig, CatalogIntegrationType from dbt.adapters.contracts.relation import ComponentName, RelationConfig from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug from dbt.adapters.relation_configs import ( diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 6a243d74f..c90607389 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import Optional, Dict, Any, TYPE_CHECKING, Union -from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType +from dbt.adapters.base.catalog import CatalogIntegrationConfig, CatalogIntegrationType from dbt.adapters.relation_configs import RelationConfigChange, RelationResults from dbt.adapters.clients import catalogs as catalogs_client from dbt.adapters.contracts.relation import RelationConfig diff --git a/dbt-snowflake/tests/unit/test_iceberg_location.py b/dbt-snowflake/tests/unit/test_iceberg_location.py index dca82b47e..cb838ca72 100644 --- a/dbt-snowflake/tests/unit/test_iceberg_location.py +++ b/dbt-snowflake/tests/unit/test_iceberg_location.py @@ -1,6 +1,19 @@ +from dbt.adapters.base.catalog import CatalogIntegrationConfig import pytest from dbt.adapters.snowflake.relation import SnowflakeRelation - +from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration +@pytest.fixture +def catalog_integration(): + return SnowflakeManagedIcebergCatalogIntegration( + CatalogIntegrationConfig( + catalog_name="my_catalog", + integration_name="my_integration", + table_format="iceberg", + catalog_type="managed", + external_volume="s3_iceberg_snow", + namespace="my_namespace", + ) + ) @pytest.fixture def iceberg_config() -> dict: @@ -14,21 +27,20 @@ def iceberg_config() -> dict: } -def get_actual_base_location(config: dict[str, str]) -> str: +def get_actual_base_location(config: dict[str, str], catalog_integration: SnowflakeManagedIcebergCatalogIntegration) -> str: """Get the actual base location from the configuration by parsing the DDL predicates.""" relation = SnowflakeRelation.create( schema=config["schema"], identifier=config["identifier"], ) - - actual_ddl_predicates = relation.get_iceberg_ddl_options(config).strip() + actual_ddl_predicates = catalog_integration.render_ddl_predicates(relation, config).strip() actual_base_location = actual_ddl_predicates.split("base_location = ")[1] return actual_base_location -def test_iceberg_path_and_subpath(iceberg_config: dict[str, str]): +def test_iceberg_path_and_subpath(iceberg_config: dict[str, str], catalog_integration): """Test when base_location_root and base_location_subpath are provided""" expected_base_location = ( f"'{iceberg_config['base_location_root']}/" @@ -37,10 +49,10 @@ def test_iceberg_path_and_subpath(iceberg_config: dict[str, str]): f"{iceberg_config['base_location_subpath']}'" ).strip() - assert get_actual_base_location(iceberg_config) == expected_base_location + assert get_actual_base_location(iceberg_config, catalog_integration) == expected_base_location -def test_iceberg_only_subpath(iceberg_config: dict[str, str]): +def test_iceberg_only_subpath(iceberg_config: dict[str, str], catalog_integration): """Test when only base_location_subpath is provided""" del iceberg_config["base_location_root"] @@ -51,10 +63,10 @@ def test_iceberg_only_subpath(iceberg_config: dict[str, str]): f"{iceberg_config['base_location_subpath']}'" ).strip() - assert get_actual_base_location(iceberg_config) == expected_base_location + assert get_actual_base_location(iceberg_config, catalog_integration) == expected_base_location -def test_iceberg_only_path(iceberg_config: dict[str, str]): +def test_iceberg_only_path(iceberg_config: dict[str, str], catalog_integration): """Test when only base_location_root is provided""" del iceberg_config["base_location_subpath"] @@ -64,10 +76,10 @@ def test_iceberg_only_path(iceberg_config: dict[str, str]): f"{iceberg_config['identifier']}'" ).strip() - assert get_actual_base_location(iceberg_config) == expected_base_location + assert get_actual_base_location(iceberg_config, catalog_integration) == expected_base_location -def test_iceberg_no_path(iceberg_config: dict[str, str]): +def test_iceberg_no_path(iceberg_config: dict[str, str], catalog_integration): """Test when no base_location_root or is base_location_subpath provided""" del iceberg_config["base_location_root"] del iceberg_config["base_location_subpath"] @@ -76,4 +88,4 @@ def test_iceberg_no_path(iceberg_config: dict[str, str]): f"'_dbt/" f"{iceberg_config['schema']}/" f"{iceberg_config['identifier']}'" ).strip() - assert get_actual_base_location(iceberg_config) == expected_base_location + assert get_actual_base_location(iceberg_config, catalog_integration) == expected_base_location From ee2d86cfc4e74f83e8c5b5b522b06a6c93835640 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 5 Feb 2025 15:20:43 -0800 Subject: [PATCH 4/7] refactor unit tests --- .../src/dbt/adapters/snowflake/catalog.py | 16 +++++++--- ...ion.py => test_managed_iceberg_catalog.py} | 32 +++++++++++++++++++ 2 files changed, 43 insertions(+), 5 deletions(-) rename dbt-snowflake/tests/unit/{test_iceberg_location.py => test_managed_iceberg_catalog.py} (73%) diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py index 9a44f0bca..1b42c6d7e 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py @@ -6,7 +6,9 @@ from dbt.adapters.base.catalog import CatalogIntegration, CatalogIntegrationType from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.relation_configs import RelationResults - +from dbt.exceptions import DbtValidationError +_AUTO_REFRESH_VALUES = ["TRUE", "FALSE"] +_REPLACE_INVALID_CHARACTERS_VALUES = ["TRUE", "FALSE"] class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration): catalog_type = CatalogIntegrationType.managed @@ -15,10 +17,14 @@ class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration): def _handle_adapter_properties(self, adapter_properties: Optional[Dict]) -> None: if adapter_properties: - if "auto_refresh" in adapter_properties: - self.auto_refresh = adapter_properties["auto_refresh"] - if "replace_invalid_characters" in adapter_properties: - self.replace_invalid_characters = adapter_properties["replace_invalid_characters"] + if auto_refresh := adapter_properties.get("auto_refresh"): + if auto_refresh not in _AUTO_REFRESH_VALUES: + raise DbtValidationError(f"Invalid auto_refresh value: {auto_refresh}") + self.auto_refresh = auto_refresh + if replace_invalid_characters := adapter_properties.get("replace_invalid_characters"): + if replace_invalid_characters not in _REPLACE_INVALID_CHARACTERS_VALUES: + raise DbtValidationError(f"Invalid replace_invalid_characters value: {replace_invalid_characters}") + self.replace_invalid_characters = replace_invalid_characters def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str: """ diff --git a/dbt-snowflake/tests/unit/test_iceberg_location.py b/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py similarity index 73% rename from dbt-snowflake/tests/unit/test_iceberg_location.py rename to dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py index cb838ca72..06ce9b579 100644 --- a/dbt-snowflake/tests/unit/test_iceberg_location.py +++ b/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py @@ -2,6 +2,7 @@ import pytest from dbt.adapters.snowflake.relation import SnowflakeRelation from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration + @pytest.fixture def catalog_integration(): return SnowflakeManagedIcebergCatalogIntegration( @@ -15,6 +16,23 @@ def catalog_integration(): ) ) +@pytest.fixture +def catalog_integration_with_properties(): + return SnowflakeManagedIcebergCatalogIntegration( + CatalogIntegrationConfig( + catalog_name="my_catalog", + integration_name="my_integration", + table_format="iceberg", + catalog_type="managed", + external_volume="s3_iceberg_snow", + namespace="my_namespace", + adapter_properties={ + "auto_refresh": "TRUE", + "replace_invalid_characters": "TRUE", + } + ) + ) + @pytest.fixture def iceberg_config() -> dict: """Fixture providing standard Iceberg configuration.""" @@ -89,3 +107,17 @@ def test_iceberg_no_path(iceberg_config: dict[str, str], catalog_integration): ).strip() assert get_actual_base_location(iceberg_config, catalog_integration) == expected_base_location + + +def test_managed_iceberg_catalog_with_properties(iceberg_config, catalog_integration_with_properties): + """Test when properties are provided""" + assert catalog_integration_with_properties.auto_refresh == "TRUE" + assert catalog_integration_with_properties.replace_invalid_characters == "TRUE" + + relation = SnowflakeRelation.create( + schema=iceberg_config["schema"], + identifier=iceberg_config["identifier"], + ) + actual_ddl_predicates = catalog_integration_with_properties.render_ddl_predicates(relation, iceberg_config).strip() + assert "auto_refresh = TRUE" in actual_ddl_predicates + assert "replace_invalid_characters = TRUE" in actual_ddl_predicates From f9f1c6ee95ef6361d3fe924e07cfb80ae578c48e Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 5 Feb 2025 15:35:40 -0800 Subject: [PATCH 5/7] simplify managed iceberg property validation --- .../src/dbt/adapters/snowflake/catalog.py | 15 +++++---- .../src/dbt/adapters/snowflake/relation.py | 2 -- .../unit/test_managed_iceberg_catalog.py | 32 ++++++++++++++++--- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py index 1b42c6d7e..b965d3bf7 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py @@ -6,23 +6,24 @@ from dbt.adapters.base.catalog import CatalogIntegration, CatalogIntegrationType from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.relation_configs import RelationResults -from dbt.exceptions import DbtValidationError -_AUTO_REFRESH_VALUES = ["TRUE", "FALSE"] -_REPLACE_INVALID_CHARACTERS_VALUES = ["TRUE", "FALSE"] +from dbt_common.exceptions import DbtValidationError + + +_SNOWFLAKE_BOOLEAN_VALUES = ["TRUE", "FALSE"] class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration): catalog_type = CatalogIntegrationType.managed - auto_refresh: Optional[str] = None # "TRUE" | "FALSE" - replace_invalid_characters: Optional[str] = None # "TRUE" | "FALSE" + auto_refresh: Optional[str] = None + replace_invalid_characters: Optional[str] = None def _handle_adapter_properties(self, adapter_properties: Optional[Dict]) -> None: if adapter_properties: if auto_refresh := adapter_properties.get("auto_refresh"): - if auto_refresh not in _AUTO_REFRESH_VALUES: + if auto_refresh not in _SNOWFLAKE_BOOLEAN_VALUES: raise DbtValidationError(f"Invalid auto_refresh value: {auto_refresh}") self.auto_refresh = auto_refresh if replace_invalid_characters := adapter_properties.get("replace_invalid_characters"): - if replace_invalid_characters not in _REPLACE_INVALID_CHARACTERS_VALUES: + if replace_invalid_characters not in _SNOWFLAKE_BOOLEAN_VALUES: raise DbtValidationError(f"Invalid replace_invalid_characters value: {replace_invalid_characters}") self.replace_invalid_characters = replace_invalid_characters diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/relation.py b/dbt-snowflake/src/dbt/adapters/snowflake/relation.py index b1c30eb3a..fcf1f34f7 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/relation.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/relation.py @@ -1,5 +1,3 @@ -import textwrap - from dataclasses import dataclass, field from typing import FrozenSet, Optional, Type, Iterator, Tuple diff --git a/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py b/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py index 06ce9b579..68a305d44 100644 --- a/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py +++ b/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py @@ -2,6 +2,8 @@ import pytest from dbt.adapters.snowflake.relation import SnowflakeRelation from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration +from dbt_common.exceptions import DbtValidationError + @pytest.fixture def catalog_integration(): @@ -16,6 +18,7 @@ def catalog_integration(): ) ) + @pytest.fixture def catalog_integration_with_properties(): return SnowflakeManagedIcebergCatalogIntegration( @@ -29,10 +32,11 @@ def catalog_integration_with_properties(): adapter_properties={ "auto_refresh": "TRUE", "replace_invalid_characters": "TRUE", - } + }, ) ) + @pytest.fixture def iceberg_config() -> dict: """Fixture providing standard Iceberg configuration.""" @@ -45,7 +49,9 @@ def iceberg_config() -> dict: } -def get_actual_base_location(config: dict[str, str], catalog_integration: SnowflakeManagedIcebergCatalogIntegration) -> str: +def get_actual_base_location( + config: dict[str, str], catalog_integration: SnowflakeManagedIcebergCatalogIntegration +) -> str: """Get the actual base location from the configuration by parsing the DDL predicates.""" relation = SnowflakeRelation.create( @@ -109,7 +115,9 @@ def test_iceberg_no_path(iceberg_config: dict[str, str], catalog_integration): assert get_actual_base_location(iceberg_config, catalog_integration) == expected_base_location -def test_managed_iceberg_catalog_with_properties(iceberg_config, catalog_integration_with_properties): +def test_managed_iceberg_catalog_with_properties( + iceberg_config, catalog_integration_with_properties +): """Test when properties are provided""" assert catalog_integration_with_properties.auto_refresh == "TRUE" assert catalog_integration_with_properties.replace_invalid_characters == "TRUE" @@ -118,6 +126,22 @@ def test_managed_iceberg_catalog_with_properties(iceberg_config, catalog_integra schema=iceberg_config["schema"], identifier=iceberg_config["identifier"], ) - actual_ddl_predicates = catalog_integration_with_properties.render_ddl_predicates(relation, iceberg_config).strip() + actual_ddl_predicates = catalog_integration_with_properties.render_ddl_predicates( + relation, iceberg_config + ).strip() assert "auto_refresh = TRUE" in actual_ddl_predicates assert "replace_invalid_characters = TRUE" in actual_ddl_predicates + + +def test_managed_iceberg_catalog_with_invalid_properties(): + with pytest.raises(DbtValidationError): + SnowflakeManagedIcebergCatalogIntegration( + CatalogIntegrationConfig( + catalog_name="my_catalog", + integration_name="my_integration", + table_format="iceberg", + catalog_type="managed", + external_volume="s3_iceberg_snow", + namespace="my_namespace", + adapter_properties={"auto_refresh": "INVALID"}) + ) From b404cb0a34f9dada0d0cc82501f63e108bc7fbac Mon Sep 17 00:00:00 2001 From: Colin Date: Sun, 9 Feb 2025 13:25:27 -0800 Subject: [PATCH 6/7] fix formatting and improve hatch --- dbt-snowflake/hatch.toml | 7 ++++--- .../src/dbt/adapters/snowflake/catalog.py | 7 +++++-- .../tests/unit/test_managed_iceberg_catalog.py | 15 ++++++++------- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/dbt-snowflake/hatch.toml b/dbt-snowflake/hatch.toml index 3007057fb..6feef8690 100644 --- a/dbt-snowflake/hatch.toml +++ b/dbt-snowflake/hatch.toml @@ -43,9 +43,10 @@ docker-dev = [ # pre-install commands here and post-install commands in the matrix can be moved # to the dependencies section pre-install-commands = [ - "pip install -e ../dbt-adapter", - "pip install -e ../dbt-core/core", - ] + "pip install -e ../dbt-adapters", + "pip install -e ../dbt-tests-adapter", + "pip install -e ../../dbt-core/core", +] [envs.build] detached = true diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py index b965d3bf7..d27a51a57 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalog.py @@ -11,10 +11,11 @@ _SNOWFLAKE_BOOLEAN_VALUES = ["TRUE", "FALSE"] + class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration): catalog_type = CatalogIntegrationType.managed auto_refresh: Optional[str] = None - replace_invalid_characters: Optional[str] = None + replace_invalid_characters: Optional[str] = None def _handle_adapter_properties(self, adapter_properties: Optional[Dict]) -> None: if adapter_properties: @@ -24,7 +25,9 @@ def _handle_adapter_properties(self, adapter_properties: Optional[Dict]) -> None self.auto_refresh = auto_refresh if replace_invalid_characters := adapter_properties.get("replace_invalid_characters"): if replace_invalid_characters not in _SNOWFLAKE_BOOLEAN_VALUES: - raise DbtValidationError(f"Invalid replace_invalid_characters value: {replace_invalid_characters}") + raise DbtValidationError( + f"Invalid replace_invalid_characters value: {replace_invalid_characters}" + ) self.replace_invalid_characters = replace_invalid_characters def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str: diff --git a/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py b/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py index 68a305d44..029da74a4 100644 --- a/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py +++ b/dbt-snowflake/tests/unit/test_managed_iceberg_catalog.py @@ -137,11 +137,12 @@ def test_managed_iceberg_catalog_with_invalid_properties(): with pytest.raises(DbtValidationError): SnowflakeManagedIcebergCatalogIntegration( CatalogIntegrationConfig( - catalog_name="my_catalog", - integration_name="my_integration", - table_format="iceberg", - catalog_type="managed", - external_volume="s3_iceberg_snow", - namespace="my_namespace", - adapter_properties={"auto_refresh": "INVALID"}) + catalog_name="my_catalog", + integration_name="my_integration", + table_format="iceberg", + catalog_type="managed", + external_volume="s3_iceberg_snow", + namespace="my_namespace", + adapter_properties={"auto_refresh": "INVALID"}, + ) ) From 808d086e5b7fede30938213903dd7ee89d95db81 Mon Sep 17 00:00:00 2001 From: Colin Date: Mon, 10 Feb 2025 12:40:21 -0800 Subject: [PATCH 7/7] set catalog_integration to none --- .../src/dbt/include/snowflake/macros/relations/table/create.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql index b8cf3b4c0..1db6a7899 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql @@ -24,6 +24,8 @@ {%- endif -%} {%- if catalog_name is not none %} {%- set catalog_integration = adapter.get_catalog_integration(catalog_name) -%} + {% else %} + {%- set catalog_integration = none -%} {%- endif -%} {%- set sql_header = config.get('sql_header', none) -%}