diff --git a/.changes/unreleased/Fixes-20250124-001152.yaml b/.changes/unreleased/Fixes-20250124-001152.yaml new file mode 100644 index 000000000..26596f9df --- /dev/null +++ b/.changes/unreleased/Fixes-20250124-001152.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Fix Snapshot to avoid inserting redundant entries for records already marked as deleted +time: 2025-01-24T00:11:52.499074243+01:00 +custom: + Author: lpillmann + Issue: "654" diff --git a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py index 310057635..1df275c3f 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py +++ b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py @@ -182,7 +182,13 @@ delete from {database}.{schema}.seed where id = 1 """ -# If the deletion worked correctly, this should return two rows, with one of them representing the deletion. +# SQL to insert a record back into the snapshot source data with a new updated_at value +_insert_sql = """ +insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2016-01-01 12:19:28'); +""" + +# SQL to fetch the snapshotted entries of the record being used in deletion tests _delete_check_sql = """ select dbt_valid_to, dbt_scd_id, dbt_is_deleted from {schema}.snapshot_actual where id = 1 """ @@ -216,6 +222,10 @@ def update_sql(self): def delete_sql(self): return _delete_sql + @pytest.fixture(scope="class") + def insert_sql(self): + return _insert_sql + def test_snapshot_new_record_mode( self, project, seed_new_record_mode, invalidate_sql, update_sql ): @@ -234,7 +244,7 @@ def test_snapshot_new_record_mode( project.run_sql(_delete_sql) results = run_dbt(["snapshot"]) - assert len(results) == 1 + assert len(results) == 2 check_result = project.run_sql(_delete_check_sql, fetch="all") valid_to = 0 @@ -264,3 +274,29 @@ def test_snapshot_new_record_mode( == 1 ) assert check_result[0][scd_id] != check_result[1][scd_id] + + # run snapshot with the same source data; neither insert or update should happen + run_dbt(["snapshot"]) + assert len(results) == 0 + check_result = project.run_sql(_delete_check_sql, fetch="all") + assert len(check_result) == 2 + + # insert the record back and run the snapshot again; update and insert expected + project.run_sql(_insert_sql) + results = run_dbt(["snapshot"]) + assert len(results) == 2 + check_result = project.run_sql(_delete_check_sql, fetch="all") + assert len(check_result) == 3 + + # delete it once again and run the snapshot; update and insert expected + project.run_sql(_delete_sql) + results = run_dbt(["snapshot"]) + assert len(results) == 2 + check_result = project.run_sql(_delete_check_sql, fetch="all") + assert len(check_result) == 4 + + # run snapshot with the same source data; neither insert or update should happen + results = run_dbt(["snapshot"]) + assert len(results) == 0 + check_result = project.run_sql(_delete_check_sql, fetch="all") + assert len(check_result) == 4 diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index e91d66113..42ef6e33a 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -148,6 +148,14 @@ left join deletes_source_data as source_data on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + + {%- if strategy.hard_deletes == 'new_record' %} + and not ( + --avoid updating the record's valid_to if the latest entry is marked as deleted + snapshotted_data.{{ columns.dbt_is_deleted }} + and snapshotted_data.{{ columns.dbt_valid_to }} is null + ) + {%- endif %} ) {%- endif %} @@ -177,6 +185,11 @@ left join deletes_source_data as source_data on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + and not ( + --avoid inserting a new record if the latest one is marked as deleted + snapshotted_data.{{ columns.dbt_is_deleted }} + and snapshotted_data.{{ columns.dbt_valid_to }} is null + ) ) {%- endif %}