Skip to content

Commit 8554cc8

Browse files
committed
Implemented dbt snapshot changes introduced in dbt-adapters v1.9
1 parent f852f54 commit 8554cc8

File tree

10 files changed

+171
-77
lines changed

10 files changed

+171
-77
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Configuration variables
2-
VERSION=1.9.0
2+
VERSION=1.9.1
33
PROJ_DIR?=$(shell pwd)
44
VENV_DIR?=${PROJ_DIR}/.bldenv
55
BUILD_DIR=${PROJ_DIR}/build

dbt/adapters/oracle/__version__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414
See the License for the specific language governing permissions and
1515
limitations under the License.
1616
"""
17-
version = "1.9.0"
17+
version = "1.9.1"

dbt/include/oracle/macros/materializations/snapshot/snapshot.sql

+121-50
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@
4141
{% endmacro %}
4242

4343

44-
{% macro snapshot_staging_table(strategy, source_sql, target_relation) -%}
44+
{% macro oracle__snapshot_staging_table(strategy, source_sql, target_relation) -%}
45+
46+
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
47+
{% if strategy.hard_deletes == 'new_record' %}
48+
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
49+
{% endif %}
4550

4651
with snapshot_query as (
4752

@@ -52,22 +57,27 @@
5257
snapshotted_data as (
5358

5459
select {{ target_relation }}.*,
55-
{{ strategy.unique_key }} as dbt_unique_key
56-
60+
{{ unique_key_fields(strategy.unique_key) }}
5761
from {{ target_relation }}
58-
where dbt_valid_to is null
59-
62+
where
63+
{% if config.get('dbt_valid_to_current') %}
64+
{% set source_unique_key = columns.dbt_valid_to | trim %}
65+
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
66+
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
67+
{% else %}
68+
{{ columns.dbt_valid_to }} is null
69+
{% endif %}
6070
),
6171

6272
insertions_source_data as (
6373

6474
select
6575
snapshot_query.*,
66-
{{ strategy.unique_key }} as dbt_unique_key,
67-
{{ strategy.updated_at }} as dbt_updated_at,
68-
{{ strategy.updated_at }} as dbt_valid_from,
69-
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
70-
{{ strategy.scd_id }} as dbt_scd_id
76+
{{ unique_key_fields(strategy.unique_key) }},
77+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
78+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
79+
{{ oracle__get_dbt_valid_to_current(strategy, columns) }},
80+
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
7181

7282
from snapshot_query
7383
),
@@ -76,21 +86,21 @@
7686

7787
select
7888
snapshot_query.*,
79-
{{ strategy.unique_key }} as dbt_unique_key,
80-
{{ strategy.updated_at }} as dbt_updated_at,
81-
{{ strategy.updated_at }} as dbt_valid_from,
82-
{{ strategy.updated_at }} as dbt_valid_to
89+
{{ unique_key_fields(strategy.unique_key) }},
90+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
91+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
92+
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
8393

8494
from snapshot_query
8595
),
8696

87-
{%- if strategy.invalidate_hard_deletes %}
97+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
8898

8999
deletes_source_data as (
90100

91101
select
92102
snapshot_query.*,
93-
{{ strategy.unique_key }} as dbt_unique_key
103+
{{ unique_key_fields(strategy.unique_key) }}
94104
from snapshot_query
95105
),
96106
{% endif %}
@@ -100,15 +110,16 @@
100110
select
101111
'insert' as dbt_change_type,
102112
source_data.*
113+
{%- if strategy.hard_deletes == 'new_record' -%}
114+
,'False' as {{ columns.dbt_is_deleted }}
115+
{%- endif %}
103116

104117
from insertions_source_data source_data
105-
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
106-
where snapshotted_data.dbt_unique_key is null
107-
or (
108-
snapshotted_data.dbt_unique_key is not null
109-
and (
110-
{{ strategy.row_changed }}
111-
)
118+
left outer join snapshotted_data
119+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
120+
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
121+
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }})
122+
112123
)
113124

114125
),
@@ -118,53 +129,99 @@
118129
select
119130
'update' as dbt_change_type,
120131
source_data.*,
121-
snapshotted_data.dbt_scd_id
132+
snapshotted_data.{{ columns.dbt_scd_id }}
133+
{%- if strategy.hard_deletes == 'new_record' -%}
134+
, snapshotted_data.{{ columns.dbt_is_deleted }}
135+
{%- endif %}
122136

123137
from updates_source_data source_data
124-
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
138+
join snapshotted_data
139+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
125140
where (
126141
{{ strategy.row_changed }}
127142
)
128143
)
129144

130-
{%- if strategy.invalidate_hard_deletes -%}
145+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%}
131146
,
132147

133148
deletes as (
134149

135150
select
136151
'delete' as dbt_change_type,
137152
source_data.*,
138-
{{ snapshot_get_time() }} as dbt_valid_from,
139-
{{ snapshot_get_time() }} as dbt_updated_at,
140-
{{ snapshot_get_time() }} as dbt_valid_to,
141-
snapshotted_data.dbt_scd_id
153+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
154+
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
155+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
156+
snapshotted_data.{{ columns.dbt_scd_id }}
157+
{%- if strategy.hard_deletes == 'new_record' -%}
158+
, snapshotted_data.{{ columns.dbt_is_deleted }}
159+
{%- endif %}
142160

143161
from snapshotted_data
144-
left join deletes_source_data source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
145-
where source_data.dbt_unique_key is null
162+
left join deletes_source_data source_data
163+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
164+
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
165+
)
166+
{%- endif %}
167+
168+
{%- if strategy.hard_deletes == 'new_record' %}
169+
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
170+
,
171+
deletion_records as (
172+
173+
select
174+
'insert' as dbt_change_type,
175+
{%- for col in source_sql_cols -%}
176+
snapshotted_data.{{ adapter.quote(col.column) }},
177+
{% endfor -%}
178+
{%- if strategy.unique_key | is_list -%}
179+
{%- for key in strategy.unique_key -%}
180+
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
181+
{% endfor -%}
182+
{%- else -%}
183+
snapshotted_data.dbt_unique_key as dbt_unique_key,
184+
{% endif -%}
185+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
186+
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
187+
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
188+
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
189+
'True' as {{ columns.dbt_is_deleted }}
190+
from snapshotted_data
191+
left join deletes_source_data as source_data
192+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
193+
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
194+
146195
)
147196
{%- endif %}
148197

149198
select * from insertions
150199
union all
151200
select * from updates
152-
{%- if strategy.invalidate_hard_deletes %}
201+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
153202
union all
154203
select * from deletes
155204
{%- endif %}
205+
{%- if strategy.hard_deletes == 'new_record' %}
206+
union all
207+
select * from deletion_records
208+
{%- endif %}
156209

157210
{%- endmacro %}
158211

159212

160213

161-
{% macro build_snapshot_table(strategy, sql) %}
214+
{% macro oracle__build_snapshot_table(strategy, sql) %}
215+
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
162216

163217
select sbq.*,
164-
{{ strategy.scd_id }} as dbt_scd_id,
165-
{{ strategy.updated_at }} as dbt_updated_at,
166-
{{ strategy.updated_at }} as dbt_valid_from,
167-
cast(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as TIMESTAMP(9)) as dbt_valid_to
218+
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
219+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
220+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
221+
{{ oracle__get_dbt_valid_to_current(strategy, columns) }}
222+
{%- if strategy.hard_deletes == 'new_record' -%}
223+
, 'False' as {{ columns.dbt_is_deleted }}
224+
{% endif -%}
168225
from (
169226
{{ sql }}
170227
) sbq
@@ -239,32 +296,38 @@
239296
{% if not target_relation_exists %}
240297

241298
{% set build_sql = build_snapshot_table(strategy, model['compiled_sql']) %}
299+
{% set build_or_select_sql = build_sql %}
242300
{% set final_sql = create_table_as(False, target_relation, build_sql) %}
243301

244302
{% else %}
245303

246-
{{ adapter.valid_snapshot_target(target_relation) }}
304+
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}
247305

306+
{{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }}
307+
308+
{% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %}
248309
{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}
249310

250311
-- this may no-op if the database does not require column expansion
251312
{% do adapter.expand_target_column_types(from_relation=staging_table,
252313
to_relation=target_relation) %}
253314

315+
{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
316+
{% if unique_key | is_list %}
317+
{% for key in strategy.unique_key %}
318+
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
319+
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
320+
{% endfor %}
321+
{% endif %}
322+
254323
{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
255-
| rejectattr('name', 'equalto', 'dbt_change_type')
256-
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
257-
| rejectattr('name', 'equalto', 'dbt_unique_key')
258-
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
324+
| rejectattr('name', 'in', remove_columns)
259325
| list %}
260326

261327
{% do create_columns(target_relation, missing_columns) %}
262328

263329
{% set source_columns = adapter.get_columns_in_relation(staging_table)
264-
| rejectattr('name', 'equalto', 'dbt_change_type')
265-
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
266-
| rejectattr('name', 'equalto', 'dbt_unique_key')
267-
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
330+
| rejectattr('name', 'in', remove_columns)
268331
| list %}
269332

270333
{% set quoted_source_columns = [] %}
@@ -281,6 +344,8 @@
281344

282345
{% endif %}
283346

347+
{{ check_time_data_types(build_or_select_sql) }}
348+
284349
{% call statement('main') %}
285350
{{ final_sql }}
286351
{% endcall %}
@@ -305,9 +370,15 @@
305370
{% endmaterialization %}
306371

307372
{% macro oracle__snapshot_hash_arguments(args) -%}
308-
ORA_HASH({%- for arg in args -%}
309-
coalesce(cast({{ arg }} as varchar(50) ), '')
373+
STANDARD_HASH({%- for arg in args -%}
374+
coalesce(cast({{ arg }} as varchar(4000) ), '')
310375
{% if not loop.last %} || '|' || {% endif %}
311-
{%- endfor -%})
376+
{%- endfor -%}, 'SHA256')
312377
{%- endmacro %}
313378

379+
{% macro oracle__get_dbt_valid_to_current(strategy, columns) %}
380+
{% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "CAST(null as TIMESTAMP(9)" %}
381+
coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{dbt_valid_to_current}}))
382+
as {{ columns.dbt_valid_to }}
383+
{% endmacro %}
384+

dbt/include/oracle/macros/materializations/snapshot/snapshot_merge.sql

+17-9
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,34 @@
1818
{%- set insert_cols_csv = [] -%}
1919

2020
{% for column in insert_cols %}
21-
{% do insert_cols_csv.append("s." + column) %}
21+
{% do insert_cols_csv.append("DBT_INTERNAL_SOURCE." + column) %}
2222
{% endfor %}
2323

2424
{%- set dest_cols_csv = [] -%}
2525

2626
{% for column in insert_cols %}
27-
{% do dest_cols_csv.append("d." + column) %}
27+
{% do dest_cols_csv.append("DBT_INTERNAL_DEST." + column) %}
2828
{% endfor %}
2929

30-
merge into {{ target }} d
31-
using {{ source }} s
32-
on (s.dbt_scd_id = d.dbt_scd_id)
30+
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}
31+
32+
merge into {{ target }} DBT_INTERNAL_DEST
33+
using {{ source }} DBT_INTERNAL_SOURCE
34+
on (DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }})
3335

3436
when matched
3537
then update
36-
set dbt_valid_to = s.dbt_valid_to
37-
where d.dbt_valid_to is null
38-
and s.dbt_change_type in ('update', 'delete')
38+
set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}
39+
where
40+
{% if config.get("dbt_valid_to_current") %}
41+
(DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or
42+
DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null)
43+
{% else %}
44+
DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null
45+
{% endif %}
46+
and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
3947
when not matched
4048
then insert ({{ dest_cols_csv | join(', ') }})
4149
values ({{ insert_cols_csv | join(', ') }})
42-
where s.dbt_change_type = 'insert'
50+
where DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
4351
{% endmacro %}

0 commit comments

Comments
 (0)