diff --git a/dbt_project.yml b/dbt_project.yml index 96bd155b1..fd96bcae9 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,5 +1,5 @@ name: automate_dv -version: 0.10.1 +version: 0.10.2 require-dbt-version: [">=1.0.0", "<2.0.0"] config-version: 2 diff --git a/macros/internal/metadata_processing/process_payload_column_excludes.sql b/macros/internal/metadata_processing/process_payload_column_excludes.sql index 5fe2462f5..104ace7de 100644 --- a/macros/internal/metadata_processing/process_payload_column_excludes.sql +++ b/macros/internal/metadata_processing/process_payload_column_excludes.sql @@ -3,7 +3,7 @@ * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault */ -{%- macro process_payload_column_excludes(src_pk, src_hashdiff, src_payload, src_extra_columns, +{%- macro process_payload_column_excludes(src_pk, src_hashdiff, src_payload, src_extra_columns, src_cdk, src_eff, src_ldts, src_source, source_model) -%} {%- if src_payload is not mapping -%} @@ -11,9 +11,10 @@ {%- endif -%} {%- set source_model_cols = adapter.get_columns_in_relation(ref(source_model)) -%} - {%- set columns_in_metadata = automate_dv.expand_column_list(columns=[src_pk, src_hashdiff, - src_payload, src_extra_columns, - src_eff, src_ldts, src_source]) | map('lower') | list -%} + {%- set columns_in_metadata = automate_dv.expand_column_list( + columns=[src_pk, src_hashdiff, src_cdk, + src_payload, src_extra_columns, + src_eff, src_ldts, src_source]) | map('lower') | list -%} {%- set payload_cols = [] -%} {%- for col in source_model_cols -%} diff --git a/macros/materialisations/error_messages.sql b/macros/materialisations/error_messages.sql index 1bebc27ff..a54a16c49 100644 --- a/macros/materialisations/error_messages.sql +++ b/macros/materialisations/error_messages.sql @@ -87,3 +87,20 @@ {%- endif -%} {%- endmacro -%} + + +{%- macro currently_disabled_error(func_name) -%} + + {%- set message -%} + This functionality ({{ func_name }}) is currently disabled for dbt-sqlserver 1.7.x, + please revert to dbt-sqlserver 1.4.3 and AutomateDV 0.10.1 to use {{ func_name }}. + + This is due to a suspected bug with the SQLServer Adapter in the 1.7.x version. + We are actively working to get this fixed. Thank you for your understanding. + {%- endset -%} + + {%- if execute -%} + {{- exceptions.raise_compiler_error(automate_dv.wrap_warning(message)) -}} + {%- endif -%} + +{%- endmacro -%} diff --git a/macros/materialisations/period_mat_helpers/get_period_boundaries.sql b/macros/materialisations/period_mat_helpers/get_period_boundaries.sql index 269b766ff..0b1cce71a 100644 --- a/macros/materialisations/period_mat_helpers/get_period_boundaries.sql +++ b/macros/materialisations/period_mat_helpers/get_period_boundaries.sql @@ -100,7 +100,7 @@ CAST(COALESCE({{ automate_dv.timestamp_add(datepart, interval, from_date_or_timestamp) }}, {{ current_timestamp() }} ) AS DATETIME2) AS stop_timestamp FROM {{ target_relation }} - ) + ) SELECT start_timestamp, stop_timestamp, diff --git a/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql b/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql index 8f205f4de..dbfde703e 100644 --- a/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql +++ b/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql @@ -34,7 +34,6 @@ {% macro sqlserver__get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} - {%- set filtered_sql = {'sql': base_sql} -%} {%- do filtered_sql.update({'sql': automate_dv.replace_placeholder_with_period_filter(core_sql=filtered_sql.sql, diff --git a/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql b/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql index bc5fe9a52..8e359cd8c 100644 --- a/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql +++ b/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql @@ -19,10 +19,11 @@ {% macro default__replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) %} {%- set period_filter -%} - (TO_TIMESTAMP({{ timestamp_field }}) - >= DATE_TRUNC('{{ period }}', TO_TIMESTAMP('{{ start_timestamp }}') + INTERVAL '{{ offset }} {{ period }}') AND - TO_TIMESTAMP({{ timestamp_field }}) < DATE_TRUNC('{{ period }}', TO_TIMESTAMP('{{ start_timestamp }}') + INTERVAL '{{ offset }} {{ period }}' + INTERVAL '1 {{ period }}')) - AND (TO_TIMESTAMP({{ timestamp_field }}) >= TO_TIMESTAMP('{{ start_timestamp }}')) + ( + TO_TIMESTAMP({{ timestamp_field }}) >= DATE_TRUNC('{{ period }}', TO_TIMESTAMP('{{ start_timestamp }}') + INTERVAL '{{ offset }} {{ period }}') + AND TO_TIMESTAMP({{ timestamp_field }}) < DATE_TRUNC('{{ period }}', TO_TIMESTAMP('{{ start_timestamp }}') + INTERVAL '{{ offset }} {{ period }}' + INTERVAL '1 {{ period }}')) + AND (TO_TIMESTAMP({{ timestamp_field }}) >= TO_TIMESTAMP('{{ start_timestamp }}') + ) {%- endset -%} {%- set filtered_sql = core_sql | replace("__PERIOD_FILTER__", period_filter) -%} @@ -58,10 +59,13 @@ {# MSSQL cannot CAST datetime2 strings with more than 7 decimal places #} {% set start_timestamp_mssql = start_timestamp[0:27] %} + {%- set period_filter -%} - (CAST({{ timestamp_field }} AS DATETIME2) >= DATEADD({{ period }}, DATEDIFF({{ period }}, 0, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))), 0) AND - CAST({{ timestamp_field }} AS DATETIME2) < DATEADD({{ period }}, 1, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))) - AND (CAST({{ timestamp_field }} AS DATETIME2) >= CAST('{{ start_timestamp_mssql }}' AS DATETIME2))) + ( + CAST({{ timestamp_field }} AS DATETIME2) >= DATEADD({{ period }}, DATEDIFF({{ period }}, 0, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))), 0) + AND CAST({{ timestamp_field }} AS DATETIME2) < DATEADD({{ period }}, 1, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))) + AND (CAST({{ timestamp_field }} AS DATETIME2) >= CAST('{{ start_timestamp_mssql }}' AS DATETIME2)) + ) {%- endset -%} {%- set filtered_sql = core_sql | replace("__PERIOD_FILTER__", period_filter) -%} @@ -74,8 +78,8 @@ {%- set period_filter -%} {{ timestamp_field }}::TIMESTAMP >= DATE_TRUNC('{{ period }}', TIMESTAMP '{{ start_timestamp }}' + INTERVAL '{{ offset }} {{ period }}') - AND {{ timestamp_field }}::TIMESTAMP < DATE_TRUNC('{{ period }}', TIMESTAMP '{{ start_timestamp }}' + INTERVAL '{{ offset }} {{ period }}' + INTERVAL '1 {{ period }}') - AND {{ timestamp_field }}::TIMESTAMP >= TIMESTAMP '{{ start_timestamp }}' + AND {{ timestamp_field }}::TIMESTAMP < DATE_TRUNC('{{ period }}', TIMESTAMP '{{ start_timestamp }}' + INTERVAL '{{ offset }} {{ period }}' + INTERVAL '1 {{ period }}') + AND {{ timestamp_field }}::TIMESTAMP >= TIMESTAMP '{{ start_timestamp }}' {%- endset -%} {%- set filtered_sql = core_sql | replace("__PERIOD_FILTER__", period_filter) -%} diff --git a/macros/materialisations/vault_insert_by_period_materialization.sql b/macros/materialisations/vault_insert_by_period_materialization.sql index 840358fd4..4f0305d9c 100644 --- a/macros/materialisations/vault_insert_by_period_materialization.sql +++ b/macros/materialisations/vault_insert_by_period_materialization.sql @@ -18,12 +18,7 @@ {{ automate_dv.experimental_not_recommended_warning(func_name='vault_insert_by_period') }} - {% if target.type == "sqlserver" %} - {%- set target_relation = this.incorporate(type='table') -%} - {% else %} - {%- set target_relation = this -%} - {% endif %} - + {%- set target_relation = this.incorporate(type='table') -%} {%- set existing_relation = load_relation(this) -%} {%- set tmp_relation = make_temp_relation(target_relation) -%} @@ -44,7 +39,6 @@ {{ run_hooks(pre_hooks, inside_transaction=True) }} {% if existing_relation is none %} - {% set filtered_sql = automate_dv.replace_placeholder_with_period_filter(core_sql=sql, timestamp_field=timestamp_field, start_timestamp=start_stop_dates.start_date, stop_timestamp=start_stop_dates.stop_date, @@ -53,7 +47,6 @@ {% do to_drop.append(tmp_relation) %} {% elif existing_relation.is_view %} - {{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table (vault_insert_by_period).") }} {% do adapter.drop_relation(existing_relation) %} {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} @@ -66,21 +59,19 @@ {% elif full_refresh_mode %} {% set filtered_sql = automate_dv.replace_placeholder_with_period_filter(core_sql=sql, timestamp_field=timestamp_field, - start_timestamp=start_stop_dates.start_date, - stop_timestamp=start_stop_dates.stop_date, - offset=0, period=period) %} + start_timestamp=start_stop_dates.start_date, + stop_timestamp=start_stop_dates.stop_date, + offset=0, period=period) %} {% if target.type in ['postgres', 'sqlserver'] %} {{ automate_dv.drop_temporary_special(target_relation) }} {% endif %} {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} {% else %} - {% set period_boundaries = automate_dv.get_period_boundaries(target_relation, - timestamp_field, - start_stop_dates.start_date, - start_stop_dates.stop_date, - period) %} - + {% set period_boundaries = automate_dv.get_period_boundaries(target_relation, timestamp_field, + start_stop_dates.start_date, + start_stop_dates.stop_date, + period) %} {% set target_columns = adapter.get_columns_in_relation(target_relation) %} {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} {%- set loop_vars = {'sum_rows_inserted': 0} -%} @@ -96,10 +87,8 @@ {% set tmp_relation = make_temp_relation(target_relation) %} {% set tmp_table_sql = automate_dv.get_period_filter_sql(target_cols_csv, sql, timestamp_field, period, - period_boundaries.start_timestamp, - period_boundaries.stop_timestamp, i) %} - - + period_boundaries.start_timestamp, + period_boundaries.stop_timestamp, i) %} {# This call statement drops and then creates a temporary table #} {# but MSSQL will fail to drop any temporary table created by a previous loop iteration #} @@ -211,4 +200,10 @@ {{ return({'relations': [target_relation]}) }} -{%- endmaterialization %} \ No newline at end of file +{%- endmaterialization %} + +{% materialization vault_insert_by_period, adapter='sqlserver' %} + +{{ automate_dv.currently_disabled_error(func_name='vault_insert_by_period') }} + +{% endmaterialization %} \ No newline at end of file diff --git a/macros/materialisations/vault_insert_by_rank_materialization.sql b/macros/materialisations/vault_insert_by_rank_materialization.sql index e22f9c25f..0e3227ca2 100644 --- a/macros/materialisations/vault_insert_by_rank_materialization.sql +++ b/macros/materialisations/vault_insert_by_rank_materialization.sql @@ -166,4 +166,10 @@ {{ return({'relations': [target_relation]}) }} -{%- endmaterialization %} \ No newline at end of file +{%- endmaterialization %} + +{% materialization vault_insert_by_rank, adapter='sqlserver' %} + +{{ automate_dv.currently_disabled_error(func_name='vault_insert_by_rank') }} + +{% endmaterialization %} \ No newline at end of file diff --git a/macros/supporting/as_of_date_window.sql b/macros/supporting/as_of_date_window.sql index 728e764b9..736a6ac65 100644 --- a/macros/supporting/as_of_date_window.sql +++ b/macros/supporting/as_of_date_window.sql @@ -79,7 +79,7 @@ new_rows_as_of AS ( WHERE a.AS_OF_DATE >= (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime) UNION {%- endif %} - SELECT as_of_date + SELECT AS_OF_DATE FROM as_of_grain_new_entries ), diff --git a/macros/supporting/data_types/type_binary.sql b/macros/supporting/data_types/type_binary.sql index c1b929407..e8ef30f85 100644 --- a/macros/supporting/data_types/type_binary.sql +++ b/macros/supporting/data_types/type_binary.sql @@ -12,6 +12,8 @@ BINARY(16) {%- elif var('hash', 'MD5') | lower == 'sha' -%} BINARY(32) + {%- elif var('hash', 'MD5') | lower == 'sha1' -%} + BINARY(20) {%- else -%} BINARY(16) {%- endif -%} diff --git a/macros/supporting/data_types/type_string.sql b/macros/supporting/data_types/type_string.sql index 791bbb0ae..2ddd22e5e 100644 --- a/macros/supporting/data_types/type_string.sql +++ b/macros/supporting/data_types/type_string.sql @@ -25,6 +25,8 @@ VARCHAR(16) {%- elif var('hash', 'MD5') | lower == 'sha' -%} VARCHAR(32) + {%- elif var('hash', 'MD5') | lower == 'sha1' -%} + VARCHAR(20) {%- endif -%} {%- else -%} VARCHAR({{ char_length }}) diff --git a/macros/supporting/ghost_records/binary_ghost.sql b/macros/supporting/ghost_records/binary_ghost.sql index cfd418a74..69ee10ce0 100644 --- a/macros/supporting/ghost_records/binary_ghost.sql +++ b/macros/supporting/ghost_records/binary_ghost.sql @@ -13,6 +13,8 @@ {{ automate_dv.cast_binary(column_str=modules.itertools.repeat('0', 32) | join (''), alias=alias, quote=true) }} {%- elif hash | lower == 'sha' -%} {{ automate_dv.cast_binary(column_str=modules.itertools.repeat('0', 64) | join (''), alias=alias, quote=true) }} + {%- elif hash | lower == 'sha1' -%} + {{ automate_dv.cast_binary(column_str=modules.itertools.repeat('0', 40) | join (''), alias=alias, quote=true) }} {%- else -%} {{ automate_dv.cast_binary(column_str=modules.itertools.repeat('0', 32) | join (''), alias=alias, quote=true) }} {%- endif -%} @@ -23,6 +25,8 @@ CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) {%- elif hash | lower == 'sha' -%} CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(32)), 32) AS BINARY(32)) + {%- elif hash | lower == 'sha1' -%} + CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(20)), 20) AS BINARY(20)) {%- else -%} CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) {%- endif -%} diff --git a/macros/supporting/hash_components/select_hash_alg.sql b/macros/supporting/hash_components/select_hash_alg.sql index d0a74ec26..2c707ae48 100644 --- a/macros/supporting/hash_components/select_hash_alg.sql +++ b/macros/supporting/hash_components/select_hash_alg.sql @@ -5,16 +5,18 @@ {%- macro select_hash_alg(hash) -%} - {%- set available_hash_algorithms = ['md5', 'sha'] -%} + {%- set available_hash_algorithms = ['md5', 'sha', 'sha1'] -%} {%- if execute and hash | lower not in available_hash_algorithms %} - {%- do exceptions.warn("Configured hash ('{}') not recognised. Must be one of: {} (case insensitive)".format(hash | lower, available_hash_algorithms | join(', '))) -%} + {%- do exceptions.warn("Configured hash ('{}') not recognised. Must be one of: {} (case insensitive). Defaulting to MD5 hashing.".format(hash | lower, available_hash_algorithms | join(', '))) -%} {%- endif -%} {%- if hash | lower == 'md5' -%} {%- do return(automate_dv.hash_alg_md5()) -%} {%- elif hash | lower == 'sha' -%} {%- do return(automate_dv.hash_alg_sha256()) -%} + {%- elif hash | lower == 'sha1' -%} + {%- do return(automate_dv.hash_alg_sha1()) -%} {%- else -%} {%- do return(automate_dv.hash_alg_md5()) -%} {%- endif -%} @@ -102,3 +104,43 @@ {% do return('UPPER(SHA2([HASH_STRING_PLACEHOLDER], 256))') %} {% endmacro %} + +{#- SHA1 -#} + +{%- macro hash_alg_sha1() -%} + + {{- adapter.dispatch('hash_alg_sha1', 'automate_dv')() -}} + +{%- endmacro %} + +{% macro default__hash_alg_sha1() -%} + + {% do return(automate_dv.cast_binary('SHA1_BINARY([HASH_STRING_PLACEHOLDER])', quote=false)) %} + +{% endmacro %} + +{% macro bigquery__hash_alg_sha1() -%} + + {% do return(automate_dv.cast_binary('UPPER(TO_HEX(SHA1([HASH_STRING_PLACEHOLDER])))', quote=false)) %} + +{% endmacro %} + +{% macro sqlserver__hash_alg_sha1() -%} + + {% do return(automate_dv.cast_binary("HASHBYTES('SHA1', [HASH_STRING_PLACEHOLDER])", quote=false)) %} + +{% endmacro %} + +{% macro postgres__hash_alg_sha1() -%} + + {%- do exceptions.warn("Configured hash (SHA-1) is not supported on Postgres. + Defaulting to hash 'MD5', alternatively configure your hash as 'SHA' for SHA256 hashing.") -%} + {{ automate_dv.hash_alg_md5() }} + +{% endmacro %} + +{% macro databricks__hash_alg_sha1() -%} + + {% do return('UPPER(SHA1([HASH_STRING_PLACEHOLDER]))') %} + +{% endmacro %} diff --git a/macros/tables/postgres/sat.sql b/macros/tables/postgres/sat.sql index d05d5bb5a..54adb5a94 100644 --- a/macros/tables/postgres/sat.sql +++ b/macros/tables/postgres/sat.sql @@ -37,7 +37,7 @@ latest_records AS ( SELECT {{ automate_dv.prefix(source_cols, 'b', alias_target='target') }} FROM ( SELECT {{ automate_dv.prefix(source_cols, 'current_records', alias_target='target') }}, - RANK() OVER ( + ROW_NUMBER() OVER ( PARTITION BY {{ automate_dv.prefix([src_pk], 'current_records') }} ORDER BY {{ automate_dv.prefix([src_ldts], 'current_records') }} DESC ) AS rank @@ -57,12 +57,12 @@ valid_stg AS ( SELECT {{ automate_dv.prefix(source_cols, 's', alias_target='source') }} FROM source_data AS s LEFT JOIN latest_records AS sat - ON {{ automate_dv.multikey(src_pk, prefix=['s', 'sat'], condition='=') }} - WHERE {{ automate_dv.multikey(src_pk, prefix='sat', condition='IS NULL') }} - OR {{ automate_dv.prefix([src_ldts], 's') }} > ( - SELECT MAX({{ src_ldts }}) FROM latest_records AS sat - WHERE {{ automate_dv.multikey(src_pk, prefix=['sat','s'], condition='=') }} - ) + ON {{ automate_dv.multikey(src_pk, prefix=['s', 'sat'], condition='=') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='sat', condition='IS NULL') }} + OR {{ automate_dv.prefix([src_ldts], 's') }} > ( + SELECT MAX({{ src_ldts }}) FROM latest_records AS sat + WHERE {{ automate_dv.multikey(src_pk, prefix=['sat','s'], condition='=') }} + ) ), {%- endif %} @@ -72,14 +72,14 @@ first_record_in_set AS ( SELECT * FROM ( SELECT {{ automate_dv.prefix(source_cols, 'sd', alias_target='source') }}, - RANK() OVER ( + ROW_NUMBER() OVER ( PARTITION BY {{ automate_dv.prefix([src_pk], 'sd', alias_target='source') }} ORDER BY {{ automate_dv.prefix([src_ldts], 'sd', alias_target='source') }} ASC - ) as asc_rank + ) AS asc_rank {%- if automate_dv.is_any_incremental() and apply_source_filter %} - FROM valid_stg as sd + FROM valid_stg AS sd {%- else %} - FROM source_data as sd + FROM source_data AS sd {%- endif %} ) AS rin WHERE rin.asc_rank = 1 @@ -93,11 +93,11 @@ unique_source_records AS ( {{ automate_dv.prefix(source_cols, 'sd', alias_target='source') }}, LAG({{ automate_dv.prefix([src_hashdiff], 'sd', alias_target='source') }}) OVER ( PARTITION BY {{ automate_dv.prefix([src_pk], 'sd', alias_target='source') }} - ORDER BY {{ automate_dv.prefix([src_ldts], 'sd', alias_target='source') }} ASC) as prev_hashdiff + ORDER BY {{ automate_dv.prefix([src_ldts], 'sd', alias_target='source') }} ASC) AS prev_hashdiff {%- if automate_dv.is_any_incremental() and apply_source_filter %} - FROM valid_stg as sd + FROM valid_stg AS sd {%- else %} - FROM source_data as sd + FROM source_data AS sd {%- endif %} ) AS b WHERE {{ automate_dv.prefix([src_hashdiff], 'b', alias_target='source') }} != b.prev_hashdiff @@ -108,9 +108,9 @@ unique_source_records AS ( ghost AS ( {{ automate_dv.create_ghost_record(src_pk=src_pk, src_hashdiff=src_hashdiff, - src_payload=src_payload, src_extra_columns=src_extra_columns, - src_eff=src_eff, src_ldts=src_ldts, - src_source=src_source, source_model=source_model) }} + src_payload=src_payload, src_extra_columns=src_extra_columns, + src_eff=src_eff, src_ldts=src_ldts, + src_source=src_source, source_model=source_model) }} ), {%- endif %} @@ -121,21 +121,22 @@ records_to_insert AS ( {{ automate_dv.alias_all(source_cols, 'g') }} FROM ghost AS g {%- if automate_dv.is_any_incremental() %} - WHERE NOT EXISTS ( SELECT 1 FROM {{ this }} AS h WHERE {{ automate_dv.prefix([src_hashdiff], 'h', alias_target='target') }} = {{ automate_dv.prefix([src_hashdiff], 'g') }} ) + WHERE NOT EXISTS ( SELECT 1 FROM {{ this }} AS h + WHERE {{ automate_dv.prefix([src_hashdiff], 'h', alias_target='target') }} = {{ automate_dv.prefix([src_hashdiff], 'g') }} ) {%- endif %} UNION {%- endif %} SELECT {{ automate_dv.alias_all(source_cols, 'frin') }} FROM first_record_in_set AS frin {%- if automate_dv.is_any_incremental() %} - LEFT JOIN LATEST_RECORDS lr + LEFT JOIN latest_records AS lr ON {{ automate_dv.multikey(src_pk, prefix=['lr','frin'], condition='=') }} AND {{ automate_dv.prefix([src_hashdiff], 'lr', alias_target='target') }} = {{ automate_dv.prefix([src_hashdiff], 'frin') }} WHERE {{ automate_dv.prefix([src_hashdiff], 'lr', alias_target='target') }} IS NULL {%- endif %} UNION SELECT {{ automate_dv.prefix(source_cols, 'usr', alias_target='source') }} - FROM unique_source_records as usr + FROM unique_source_records AS usr ) SELECT * FROM records_to_insert diff --git a/macros/tables/snowflake/ma_sat.sql b/macros/tables/snowflake/ma_sat.sql index 8b1921177..f50e1b495 100644 --- a/macros/tables/snowflake/ma_sat.sql +++ b/macros/tables/snowflake/ma_sat.sql @@ -6,8 +6,13 @@ {%- macro ma_sat(src_pk, src_cdk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%} {{- automate_dv.check_required_parameters(src_pk=src_pk, src_cdk=src_cdk, src_hashdiff=src_hashdiff, - src_payload=src_payload, src_ldts=src_ldts, src_source=src_source, - source_model=source_model) -}} + src_payload=src_payload, src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + + {%- set src_payload = automate_dv.process_payload_column_excludes( + src_pk=src_pk, src_cdk=src_cdk, src_hashdiff=src_hashdiff, src_eff=src_eff, + src_payload=src_payload, src_extra_columns=src_extra_columns, + src_ldts=src_ldts, src_source=src_source, source_model=source_model) -%} {{- automate_dv.prepend_generated_by() }} diff --git a/macros/tables/snowflake/ref_table.sql b/macros/tables/snowflake/ref_table.sql index e38aa21f0..5b83f4297 100644 --- a/macros/tables/snowflake/ref_table.sql +++ b/macros/tables/snowflake/ref_table.sql @@ -23,26 +23,26 @@ {%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_extra_columns, src_ldts, src_source]) %} -WITH to_insert AS ( +WITH source_data AS ( {%- for src in source_model %} SELECT DISTINCT - {{ automate_dv.prefix(source_cols, 'a') }} + {{ automate_dv.prefix(source_cols, 'a') }} FROM {{ ref(src) }} AS a WHERE a.{{ src_pk }} IS NOT NULL {%- endfor %} ), -non_historized AS ( +records_to_insert AS ( SELECT - {{ automate_dv.prefix(source_cols, 'a') }} - FROM to_insert AS a + {{ automate_dv.prefix(source_cols, 'a') }} + FROM source_data AS a {%- if automate_dv.is_any_incremental() %} LEFT JOIN {{ this }} AS d - ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }} - WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }} + ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }} {%- endif %} ) -SELECT * FROM non_historized +SELECT * FROM records_to_insert {%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/snowflake/sat.sql b/macros/tables/snowflake/sat.sql index 80b206239..b64876c7e 100644 --- a/macros/tables/snowflake/sat.sql +++ b/macros/tables/snowflake/sat.sql @@ -6,8 +6,8 @@ {%- macro sat(src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%} {{- automate_dv.check_required_parameters(src_pk=src_pk, src_hashdiff=src_hashdiff, src_payload=src_payload, - src_ldts=src_ldts, src_source=src_source, - source_model=source_model) -}} + src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} {%- set src_payload = automate_dv.process_payload_column_excludes( src_pk=src_pk, src_hashdiff=src_hashdiff, @@ -17,9 +17,9 @@ {{ automate_dv.prepend_generated_by() }} {{ adapter.dispatch('sat', 'automate_dv')(src_pk=src_pk, src_hashdiff=src_hashdiff, - src_payload=src_payload, src_extra_columns=src_extra_columns, - src_eff=src_eff, src_ldts=src_ldts, - src_source=src_source, source_model=source_model) -}} + src_payload=src_payload, src_extra_columns=src_extra_columns, + src_eff=src_eff, src_ldts=src_ldts, + src_source=src_source, source_model=source_model) -}} {%- endmacro -%} @@ -55,16 +55,16 @@ WITH source_data AS ( latest_records AS ( SELECT {{ automate_dv.prefix(source_cols, 'current_records', alias_target='target') }}, - RANK() OVER ( + ROW_NUMBER() OVER ( PARTITION BY {{ automate_dv.prefix([src_pk], 'current_records') }} ORDER BY {{ automate_dv.prefix([src_ldts], 'current_records') }} DESC ) AS rank_num FROM {{ this }} AS current_records - JOIN ( - SELECT DISTINCT {{ automate_dv.prefix([src_pk], 'source_data') }} - FROM source_data - ) AS source_records - ON {{ automate_dv.multikey(src_pk, prefix=['source_records','current_records'], condition='=') }} + JOIN ( + SELECT DISTINCT {{ automate_dv.prefix([src_pk], 'source_data') }} + FROM source_data + ) AS source_records + ON {{ automate_dv.multikey(src_pk, prefix=['source_records','current_records'], condition='=') }} QUALIFY rank_num = 1 ), @@ -74,12 +74,12 @@ valid_stg AS ( SELECT {{ automate_dv.prefix(source_cols, 's', alias_target='source') }} FROM source_data AS s LEFT JOIN latest_records AS sat - ON {{ automate_dv.multikey(src_pk, prefix=['s', 'sat'], condition='=') }} - WHERE {{ automate_dv.multikey(src_pk, prefix='sat', condition='IS NULL') }} - OR {{ automate_dv.prefix([src_ldts], 's') }} > ( - SELECT MAX({{ src_ldts }}) FROM latest_records AS sat - WHERE {{ automate_dv.multikey(src_pk, prefix=['sat','s'], condition='=') }} - ) + ON {{ automate_dv.multikey(src_pk, prefix=['s', 'sat'], condition='=') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='sat', condition='IS NULL') }} + OR {{ automate_dv.prefix([src_ldts], 's') }} > ( + SELECT MAX({{ src_ldts }}) FROM latest_records AS sat + WHERE {{ automate_dv.multikey(src_pk, prefix=['sat','s'], condition='=') }} + ) ), {%- endif %} @@ -88,14 +88,14 @@ valid_stg AS ( first_record_in_set AS ( SELECT {{ automate_dv.prefix(source_cols, 'sd', alias_target='source') }}, - RANK() OVER ( + ROW_NUMBER() OVER ( PARTITION BY {{ automate_dv.prefix([src_pk], 'sd', alias_target='source') }} ORDER BY {{ automate_dv.prefix([src_ldts], 'sd', alias_target='source') }} ASC - ) as asc_rank + ) AS asc_rank {%- if automate_dv.is_any_incremental() and apply_source_filter %} - FROM valid_stg as sd + FROM valid_stg AS sd {%- else %} - FROM source_data as sd + FROM source_data AS sd {%- endif %} QUALIFY asc_rank = 1 ), @@ -104,13 +104,14 @@ unique_source_records AS ( SELECT DISTINCT {{ automate_dv.prefix(source_cols, 'sd', alias_target='source') }} {%- if automate_dv.is_any_incremental() and apply_source_filter %} - FROM valid_stg as sd + FROM valid_stg AS sd {%- else %} - FROM source_data as sd + FROM source_data AS sd {%- endif %} QUALIFY {{ automate_dv.prefix([src_hashdiff], 'sd', alias_target='source') }} != LAG({{ automate_dv.prefix([src_hashdiff], 'sd', alias_target='source') }}) OVER ( PARTITION BY {{ automate_dv.prefix([src_pk], 'sd', alias_target='source') }} - ORDER BY {{ automate_dv.prefix([src_ldts], 'sd', alias_target='source') }} ASC) + ORDER BY {{ automate_dv.prefix([src_ldts], 'sd', alias_target='source') }} ASC + ) ), @@ -145,7 +146,7 @@ records_to_insert AS ( {%- endif %} UNION {%- if target.type == 'bigquery' %} DISTINCT {%- endif %} SELECT {{ automate_dv.prefix(source_cols, 'usr', alias_target='source') }} - FROM unique_source_records as usr + FROM unique_source_records AS usr ) SELECT * FROM records_to_insert