|
1 | 1 | from dataclasses import dataclass, field
|
2 |
| -from typing import Any, Dict, Optional, TYPE_CHECKING |
3 |
| -from typing_extensions import Self |
| 2 | +from typing import Any, Dict, Optional |
4 | 3 |
|
5 | 4 | from dbt.adapters.catalogs import CatalogIntegration, CatalogIntegrationConfig
|
6 |
| -from dbt.adapters.relation_configs import RelationResults |
7 |
| -from dbt_common.exceptions import DbtInternalError |
| 5 | +from dbt.adapters.contracts.relation import RelationConfig |
8 | 6 |
|
9 |
| -if TYPE_CHECKING: |
10 |
| - import agate |
| 7 | + |
| 8 | +@dataclass |
| 9 | +class IcebergRESTCatalogRelation: |
| 10 | + base_location: str |
| 11 | + catalog_name: str |
| 12 | + external_volume: Optional[str] = None |
| 13 | + table_format: str = "iceberg" |
11 | 14 |
|
12 | 15 |
|
13 | 16 | @dataclass
|
@@ -60,36 +63,34 @@ class IcebergRESTCatalogIntegration(CatalogIntegration):
|
60 | 63 | - must be False
|
61 | 64 | """
|
62 | 65 |
|
| 66 | + catalog_type: str = "iceberg_rest" |
63 | 67 | table_format: str = "iceberg"
|
64 | 68 | allows_writes: bool = False
|
65 | 69 |
|
66 | 70 | def __init__(self, config: CatalogIntegrationConfig) -> None:
|
67 | 71 | super().__init__(config)
|
68 |
| - if self.catalog_type not in ["iceberg_rest", "aws_glue"]: |
69 |
| - raise DbtInternalError( |
70 |
| - f"Attempting to create IcebergREST catalog integration for catalog {self.name} with catalog type {config.catalog_type}." |
71 |
| - ) |
72 |
| - if self.table_format and self.table_format != "iceberg": |
73 |
| - raise DbtInternalError( |
74 |
| - f"Unsupported table format for catalog {self.name}: {self.table_format}. Expected `iceberg` or unset." |
75 |
| - ) |
76 |
| - |
77 | 72 | if config.adapter_properties:
|
78 | 73 | self.namespace = config.adapter_properties.get("namespace")
|
79 | 74 |
|
80 |
| - @classmethod |
81 |
| - def from_relation_results(cls, relation_results: RelationResults) -> Self: |
82 |
| - table: "agate.Row" = relation_results["table"][0] |
83 |
| - catalog: "agate.Row" = relation_results["catalog"][0] |
| 75 | + def build_relation(self, config: RelationConfig) -> IcebergRESTCatalogRelation: |
| 76 | + return IcebergRESTCatalogRelation( |
| 77 | + base_location=self.__base_location(config), |
| 78 | + external_volume=config.config.extra.get("external_volume", self.external_volume), |
| 79 | + catalog_name=self.catalog_name, |
| 80 | + ) |
| 81 | + |
| 82 | + @staticmethod |
| 83 | + def __base_location(config: RelationConfig) -> str: |
| 84 | + # If the base_location_root config is supplied, overwrite the default value ("_dbt/") |
| 85 | + prefix = config.config.extra.get("base_location_root", "_dbt") |
84 | 86 |
|
85 |
| - adapter_properties = {} |
86 |
| - if namespace := catalog.get("namespace"): |
87 |
| - adapter_properties["namespace"] = namespace |
| 87 | + base_location = f"{prefix}/{config.schema}/{config.identifier}" |
88 | 88 |
|
89 |
| - config = IcebergRESTCatalogIntegrationConfig( |
90 |
| - name=catalog.get("catalog_name"), |
91 |
| - catalog_type=catalog.get("catalog_type"), |
92 |
| - external_volume=table.get("external_volume_name"), |
93 |
| - adapter_properties=adapter_properties, |
94 |
| - ) |
95 |
| - return cls(config) |
| 89 | + if subpath := config.config.extra.get("base_location_subpath"): |
| 90 | + base_location += f"/{subpath}" |
| 91 | + |
| 92 | + return base_location |
| 93 | + |
| 94 | + |
| 95 | +class IcebergAWSGlueCatalogIntegration(IcebergRESTCatalogIntegration): |
| 96 | + catalog_type: str = "aws_glue" |
0 commit comments