From 18158103ad1349714eadd4ac6d840cf362e4a88b Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Wed, 23 Jul 2025 14:50:57 -0400 Subject: [PATCH 1/3] payments: initial staging and source tables for enghouse data --- .../staging/payments/enghouse/_enghouse.yml | 155 ++++++++++++++++++ .../enghouse/stg_enghouse__pay_windows.sql | 86 ++++++++++ .../payments/enghouse/stg_enghouse__taps.sql | 100 +++++++++++ .../enghouse/stg_enghouse__tickets.sql | 102 ++++++++++++ .../enghouse/stg_enghouse__transactions.sql | 102 ++++++++++++ 5 files changed, 545 insertions(+) create mode 100644 warehouse/models/staging/payments/enghouse/_enghouse.yml create mode 100644 warehouse/models/staging/payments/enghouse/stg_enghouse__pay_windows.sql create mode 100644 warehouse/models/staging/payments/enghouse/stg_enghouse__taps.sql create mode 100644 warehouse/models/staging/payments/enghouse/stg_enghouse__tickets.sql create mode 100644 warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql diff --git a/warehouse/models/staging/payments/enghouse/_enghouse.yml b/warehouse/models/staging/payments/enghouse/_enghouse.yml new file mode 100644 index 0000000000..146ab7ce8b --- /dev/null +++ b/warehouse/models/staging/payments/enghouse/_enghouse.yml @@ -0,0 +1,155 @@ +version: 2 + +sources: + - name: external_enghouse + description: Hive-partitioned external tables reading Enghouse payments data. + database: "{{ env_var('GOOGLE_CLOUD_PROJECT', var('GOOGLE_CLOUD_PROJECT')) }}" + schema: external_enghouse + tables: + - name: taps + - name: tickets + - name: transactions + - name: pay_windows + +models: + - name: stg_enghouse__taps + description: Add description + columns: + - name: tap_id + description: To do + - name: mapping_terminal_id + description: To do + - name: mapping_merchant_id + description: To do + - name: terminal + description: To do + - name: token + description: To do + - name: masked_pan + description: To do + - name: expiry + description: To do + - name: server_date + description: To do + - name: terminal_date + description: To do + - name: tx_number + description: To do + - name: tx_status + description: To do + - name: payment_reference + description: To do + - name: terminal_spdh_code + description: To do + - name: denylist_version + description: To do + - name: transit_data + description: To do + - name: currency + description: To do + - name: par + description: To do + + - name: stg_enghouse__tickets + description: Add description + columns: + - name: id + description: To do + - name: ticket_id + description: To do + - name: station_name + description: To do + - name: amount + description: To do + - name: clearing_id + description: To do + - name: operator_id + description: To do + - name: reason + description: To do + - name: tap_id + description: To do + - name: ticket_type + description: To do + - name: created_dttm + description: To do + - name: kafka_send_status + description: To do + - name: line + description: To do + - name: start_station + description: To do + - name: end_station + description: To do + - name: start_dttm + description: To do + - name: end_dttm + description: To do + - name: ticket_code + description: To do + - name: additional_infos + description: To do + + - name: stg_enghouse__transactions + description: Add description + columns: + - name: id + description: To do + - name: operation + description: To do + - name: terminal_id + description: To do + - name: mapping_terminal_id + description: To do + - name: mapping_merchant_id + description: To do + - name: timestamp + description: To do + - name: amount + description: To do + - name: payment_reference + description: To do + - name: spdh_response + description: To do + - name: response_type + description: To do + - name: response_message + description: To do + - name: token + description: To do + - name: issuer_response + description: To do + - name: core_response + description: To do + - name: rrn + description: To do + - name: authorization_code + description: To do + - name: par + description: To do + - name: brand + description: To do + + - name: stg_enghouse__pay_windows + description: Add description + columns: + - name: id + description: To do + - name: token + description: To do + - name: amount_settled + description: To do + - name: amount_to_settle + description: To do + - name: debt_settled + description: To do + - name: stage + description: To do + - name: vs + description: To do + - name: terminal_id + description: To do + - name: open_date + description: To do + - name: close_date + description: To do diff --git a/warehouse/models/staging/payments/enghouse/stg_enghouse__pay_windows.sql b/warehouse/models/staging/payments/enghouse/stg_enghouse__pay_windows.sql new file mode 100644 index 0000000000..062f2ca304 --- /dev/null +++ b/warehouse/models/staging/payments/enghouse/stg_enghouse__pay_windows.sql @@ -0,0 +1,86 @@ +WITH source AS ( + SELECT * FROM {{ source('external_enghouse', 'pay_windows') }} +), + +clean_columns AS ( + SELECT +-- reintroduce trim_make_empty_string_null/safe_cast for fields below + id, + token, + amount_settled, + amount_to_settle, + debt_settled, + stage, + vs, + terminal_id, + open_date, + close_date, + +-- reintroduce handling of content generated by us below + CAST(_line_number AS INTEGER) AS _line_number, + `instance`, + extract_filename, + -- revisit the use of this littlepay-specific macro + {{ extract_littlepay_filename_ts() }} AS enghouse_export_ts, + {{ extract_littlepay_filename_date() }} AS enghouse_export_date, + ts, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + -- {{ dbt_utils.generate_surrogate_key(['','', '', '']) }} AS _content_hash, + FROM source +), + +-- revisit deduplication logic below + +-- add_keys_drop_full_dupes AS ( +-- SELECT +-- *, +-- -- generate keys now that input columns have been trimmed & cast +-- {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, +-- {{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key, +-- FROM clean_columns +-- {{ qualify_dedupe_full_duplicate_lp_rows() }} +-- ), + +-- -- we have some authorisations where the same aggregation has multiple rows with the same timestamp +-- -- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped +-- -- the rest need to be handled downstream by checking against settlements data +-- same_timestamp_simple_dupes AS ( +-- SELECT +-- _payments_key, +-- (COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate, +-- FROM add_keys_drop_full_dupes +-- GROUP BY 1 +-- ), + +stg_enghouse__pay_windows AS ( + SELECT + id, + token, + amount_settled, + amount_to_settle, + debt_settled, + stage, + vs, + terminal_id, + open_date, + close_date, + + _line_number, + `instance`, + extract_filename, + enghouse_export_ts, + enghouse_export_date, + ts, + -- _key, + -- _payments_key, + -- _content_hash, + FROM clean_columns + -- FROM add_keys_drop_full_dupes + -- LEFT JOIN same_timestamp_simple_dupes + -- USING(_payments_key) + -- -- rows to drop are those where RRN is null and it's a duplicate + -- WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL +) + +SELECT * FROM stg_enghouse__pay_windows diff --git a/warehouse/models/staging/payments/enghouse/stg_enghouse__taps.sql b/warehouse/models/staging/payments/enghouse/stg_enghouse__taps.sql new file mode 100644 index 0000000000..7765c2d76e --- /dev/null +++ b/warehouse/models/staging/payments/enghouse/stg_enghouse__taps.sql @@ -0,0 +1,100 @@ +WITH source AS ( + SELECT * FROM {{ source('external_enghouse', 'taps') }} +), + +clean_columns AS ( + SELECT +-- reintroduce trim_make_empty_string_null/safe_cast for fields below + tap_id, + mapping_terminal_id, + mapping_merchant_id, + terminal, + token, + masked_pan, + expiry, + server_date, + terminal_date, + tx_number, + tx_status, + payment_reference, + terminal_spdh_code, + denylist_version, + transit_data, + currency, + par, + +-- reintroduce handling of content generated by us below + CAST(_line_number AS INTEGER) AS _line_number, + `instance`, + extract_filename, + -- revisit the use of this littlepay-specific macro + {{ extract_littlepay_filename_ts() }} AS enghouse_export_ts, + {{ extract_littlepay_filename_date() }} AS enghouse_export_date, + ts, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + -- {{ dbt_utils.generate_surrogate_key(['','', '', '']) }} AS _content_hash, + FROM source +), + +-- revisit deduplication logic below + +-- add_keys_drop_full_dupes AS ( +-- SELECT +-- *, +-- -- generate keys now that input columns have been trimmed & cast +-- {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, +-- {{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key, +-- FROM clean_columns +-- {{ qualify_dedupe_full_duplicate_lp_rows() }} +-- ), + +-- -- we have some authorisations where the same aggregation has multiple rows with the same timestamp +-- -- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped +-- -- the rest need to be handled downstream by checking against settlements data +-- same_timestamp_simple_dupes AS ( +-- SELECT +-- _payments_key, +-- (COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate, +-- FROM add_keys_drop_full_dupes +-- GROUP BY 1 +-- ), + +stg_enghouse__taps AS ( + SELECT + tap_id, + mapping_terminal_id, + mapping_merchant_id, + terminal, + token, + masked_pan, + expiry, + server_date, + terminal_date, + tx_number, + tx_status, + payment_reference, + terminal_spdh_code, + denylist_version, + transit_data, + currency, + par, + + _line_number, + `instance`, + extract_filename, + enghouse_export_ts, + enghouse_export_date, + ts, + -- _key, + -- _payments_key, + -- _content_hash, + FROM clean_columns + -- FROM add_keys_drop_full_dupes + -- LEFT JOIN same_timestamp_simple_dupes + -- USING(_payments_key) + -- -- rows to drop are those where RRN is null and it's a duplicate + -- WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL +) + +SELECT * FROM stg_enghouse__taps diff --git a/warehouse/models/staging/payments/enghouse/stg_enghouse__tickets.sql b/warehouse/models/staging/payments/enghouse/stg_enghouse__tickets.sql new file mode 100644 index 0000000000..651b07fee7 --- /dev/null +++ b/warehouse/models/staging/payments/enghouse/stg_enghouse__tickets.sql @@ -0,0 +1,102 @@ +WITH source AS ( + SELECT * FROM {{ source('external_enghouse', 'tickets') }} +), + +clean_columns AS ( + SELECT +-- reintroduce trim_make_empty_string_null/safe_cast for fields below + id, + ticket_id, + station_name, + amount, + clearing_id, + operator_id, + reason, + tap_id, + ticket_type, + created_dttm, + kafka_send_status, + line, + start_station, + end_station, + start_dttm, + end_dttm, + ticket_code, + additional_infos, + +-- reintroduce handling of content generated by us below + CAST(_line_number AS INTEGER) AS _line_number, + `instance`, + extract_filename, + -- revisit the use of this littlepay-specific macro + {{ extract_littlepay_filename_ts() }} AS enghouse_export_ts, + {{ extract_littlepay_filename_date() }} AS enghouse_export_date, + ts, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + -- {{ dbt_utils.generate_surrogate_key(['','', '', '']) }} AS _content_hash, + FROM source +), + +-- revisit deduplication logic below + +-- add_keys_drop_full_dupes AS ( +-- SELECT +-- *, +-- -- generate keys now that input columns have been trimmed & cast +-- {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, +-- {{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key, +-- FROM clean_columns +-- {{ qualify_dedupe_full_duplicate_lp_rows() }} +-- ), + +-- -- we have some authorisations where the same aggregation has multiple rows with the same timestamp +-- -- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped +-- -- the rest need to be handled downstream by checking against settlements data +-- same_timestamp_simple_dupes AS ( +-- SELECT +-- _payments_key, +-- (COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate, +-- FROM add_keys_drop_full_dupes +-- GROUP BY 1 +-- ), + +stg_enghouse__tickets AS ( + SELECT + id, + ticket_id, + station_name, + amount, + clearing_id, + operator_id, + reason, + tap_id, + ticket_type, + created_dttm, + kafka_send_status, + line, + start_station, + end_station, + start_dttm, + end_dttm, + ticket_code, + additional_infos, + + _line_number, + `instance`, + extract_filename, + enghouse_export_ts, + enghouse_export_date, + ts, + -- _key, + -- _payments_key, + -- _content_hash, + FROM clean_columns + -- FROM add_keys_drop_full_dupes + -- LEFT JOIN same_timestamp_simple_dupes + -- USING(_payments_key) + -- -- rows to drop are those where RRN is null and it's a duplicate + -- WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL +) + +SELECT * FROM stg_enghouse__tickets diff --git a/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql b/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql new file mode 100644 index 0000000000..023c193821 --- /dev/null +++ b/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql @@ -0,0 +1,102 @@ +WITH source AS ( + SELECT * FROM {{ source('external_enghouse', 'transactions') }} +), + +clean_columns AS ( + SELECT +-- reintroduce trim_make_empty_string_null/safe_cast for fields below + id, + operation, + terminal_id, + mapping_terminal_id, + mapping_merchant_id, + timestamp, + amount, + payment_reference, + spdh_response, + response_type, + response_message, + token, + issuer_response, + core_response, + rrn, + authorization_code, + par, + brand, + +-- reintroduce handling of content generated by us below + CAST(_line_number AS INTEGER) AS _line_number, + `instance`, + extract_filename, + -- revisit the use of this littlepay-specific macro + {{ extract_littlepay_filename_ts() }} AS enghouse_export_ts, + {{ extract_littlepay_filename_date() }} AS enghouse_export_date, + ts, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + -- {{ dbt_utils.generate_surrogate_key(['','', '', '']) }} AS _content_hash, + FROM source +), + +-- revisit deduplication logic below + +-- add_keys_drop_full_dupes AS ( +-- SELECT +-- *, +-- -- generate keys now that input columns have been trimmed & cast +-- {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, +-- {{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key, +-- FROM clean_columns +-- {{ qualify_dedupe_full_duplicate_lp_rows() }} +-- ), + +-- -- we have some authorisations where the same aggregation has multiple rows with the same timestamp +-- -- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped +-- -- the rest need to be handled downstream by checking against settlements data +-- same_timestamp_simple_dupes AS ( +-- SELECT +-- _payments_key, +-- (COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate, +-- FROM add_keys_drop_full_dupes +-- GROUP BY 1 +-- ), + +stg_enghouse__transactions AS ( + SELECT + id, + operation, + terminal_id, + mapping_terminal_id, + mapping_merchant_id, + timestamp, + amount, + payment_reference, + spdh_response, + response_type, + response_message, + token, + issuer_response, + core_response, + rrn, + authorization_code, + par, + brand, + + _line_number, + `instance`, + extract_filename, + enghouse_export_ts, + enghouse_export_date, + ts, + -- _key, + -- _payments_key, + -- _content_hash, + FROM clean_columns + -- FROM add_keys_drop_full_dupes + -- LEFT JOIN same_timestamp_simple_dupes + -- USING(_payments_key) + -- -- rows to drop are those where RRN is null and it's a duplicate + -- WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL +) + +SELECT * FROM stg_enghouse__transactions From 526815fc72c5fe82205e483cf45e5ae9e57fa7b5 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Thu, 24 Jul 2025 12:29:22 -0400 Subject: [PATCH 2/3] add casting/handling, reintroduce content hash --- .../enghouse/stg_enghouse__pay_windows.sql | 57 ++++---------- .../payments/enghouse/stg_enghouse__taps.sql | 73 ++++++------------ .../enghouse/stg_enghouse__tickets.sql | 74 ++++++------------ .../enghouse/stg_enghouse__transactions.sql | 76 ++++++------------- 4 files changed, 81 insertions(+), 199 deletions(-) diff --git a/warehouse/models/staging/payments/enghouse/stg_enghouse__pay_windows.sql b/warehouse/models/staging/payments/enghouse/stg_enghouse__pay_windows.sql index 062f2ca304..6a99625bac 100644 --- a/warehouse/models/staging/payments/enghouse/stg_enghouse__pay_windows.sql +++ b/warehouse/models/staging/payments/enghouse/stg_enghouse__pay_windows.sql @@ -4,19 +4,16 @@ WITH source AS ( clean_columns AS ( SELECT --- reintroduce trim_make_empty_string_null/safe_cast for fields below - id, - token, - amount_settled, - amount_to_settle, - debt_settled, - stage, - vs, - terminal_id, - open_date, - close_date, - --- reintroduce handling of content generated by us below + {{ trim_make_empty_string_null('id') }} AS id, + {{ trim_make_empty_string_null('token') }} AS token, + {{ safe_cast('amount_settled', type_numeric()) }} AS amount_settled, + {{ safe_cast('amount_to_settle', type_numeric()) }} AS amount_to_settle, + {{ safe_cast('debt_settled', type_numeric()) }} AS debt_settled, + {{ trim_make_empty_string_null('stage') }} AS stage, + {{ trim_make_empty_string_null('vs') }} AS vs, + {{ trim_make_empty_string_null('terminal_id') }} AS terminal_id, + {{ safe_cast('open_date', type_timestamp()) }} AS open_date, + {{ safe_cast('close_date', type_timestamp()) }} AS close_date, CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, @@ -26,33 +23,11 @@ clean_columns AS ( ts, -- hash all content not generated by us to enable deduping full dup rows -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream - -- {{ dbt_utils.generate_surrogate_key(['','', '', '']) }} AS _content_hash, + {{ dbt_utils.generate_surrogate_key(['id', 'token', 'amount_settled', 'amount_to_settle', + 'debt_settled', 'stage', 'vs', 'terminal_id', 'open_date', 'close_date']) }} AS _content_hash, FROM source ), --- revisit deduplication logic below - --- add_keys_drop_full_dupes AS ( --- SELECT --- *, --- -- generate keys now that input columns have been trimmed & cast --- {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, --- {{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key, --- FROM clean_columns --- {{ qualify_dedupe_full_duplicate_lp_rows() }} --- ), - --- -- we have some authorisations where the same aggregation has multiple rows with the same timestamp --- -- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped --- -- the rest need to be handled downstream by checking against settlements data --- same_timestamp_simple_dupes AS ( --- SELECT --- _payments_key, --- (COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate, --- FROM add_keys_drop_full_dupes --- GROUP BY 1 --- ), - stg_enghouse__pay_windows AS ( SELECT id, @@ -65,7 +40,6 @@ stg_enghouse__pay_windows AS ( terminal_id, open_date, close_date, - _line_number, `instance`, extract_filename, @@ -74,13 +48,8 @@ stg_enghouse__pay_windows AS ( ts, -- _key, -- _payments_key, - -- _content_hash, + _content_hash, FROM clean_columns - -- FROM add_keys_drop_full_dupes - -- LEFT JOIN same_timestamp_simple_dupes - -- USING(_payments_key) - -- -- rows to drop are those where RRN is null and it's a duplicate - -- WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL ) SELECT * FROM stg_enghouse__pay_windows diff --git a/warehouse/models/staging/payments/enghouse/stg_enghouse__taps.sql b/warehouse/models/staging/payments/enghouse/stg_enghouse__taps.sql index 7765c2d76e..2bc0d01552 100644 --- a/warehouse/models/staging/payments/enghouse/stg_enghouse__taps.sql +++ b/warehouse/models/staging/payments/enghouse/stg_enghouse__taps.sql @@ -4,26 +4,23 @@ WITH source AS ( clean_columns AS ( SELECT --- reintroduce trim_make_empty_string_null/safe_cast for fields below - tap_id, - mapping_terminal_id, - mapping_merchant_id, - terminal, - token, - masked_pan, - expiry, - server_date, - terminal_date, - tx_number, - tx_status, - payment_reference, - terminal_spdh_code, - denylist_version, - transit_data, - currency, - par, - --- reintroduce handling of content generated by us below + {{ trim_make_empty_string_null('tap_id') }} AS tap_id, + {{ trim_make_empty_string_null('mapping_terminal_id') }} AS mapping_terminal_id, + {{ trim_make_empty_string_null('mapping_merchant_id') }} AS mapping_merchant_id, + {{ trim_make_empty_string_null('terminal') }} AS terminal, + {{ trim_make_empty_string_null('token') }} AS token, + {{ trim_make_empty_string_null('masked_pan') }} AS masked_pan, + {{ safe_cast('expiry', type_int()) }} AS expiry, + {{ safe_cast('server_date', type_timestamp()) }} AS server_date, + {{ safe_cast('terminal_date', type_timestamp()) }} AS terminal_date, + {{ safe_cast('tx_number', type_int()) }} AS tx_number, + {{ trim_make_empty_string_null('tx_status') }} AS tx_status, + {{ trim_make_empty_string_null('payment_reference') }} AS payment_reference, + {{ trim_make_empty_string_null('terminal_spdh_code') }} AS terminal_spdh_code, + {{ trim_make_empty_string_null('denylist_version') }} AS denylist_version, + {{ trim_make_empty_string_null('transit_data') }} AS transit_data, + {{ trim_make_empty_string_null('currency') }} AS currency, + {{ trim_make_empty_string_null('par') }} AS par, CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, @@ -33,33 +30,13 @@ clean_columns AS ( ts, -- hash all content not generated by us to enable deduping full dup rows -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream - -- {{ dbt_utils.generate_surrogate_key(['','', '', '']) }} AS _content_hash, + {{ dbt_utils.generate_surrogate_key(['tap_id', 'mapping_terminal_id', 'mapping_merchant_id', 'terminal', + 'token', 'masked_pan', 'expiry', 'server_date', 'terminal_date', 'tx_number', 'tx_status', + 'payment_reference', 'terminal_spdh_code', 'denylist_version', 'transit_data', + 'currency', 'par']) }} AS _content_hash, FROM source ), --- revisit deduplication logic below - --- add_keys_drop_full_dupes AS ( --- SELECT --- *, --- -- generate keys now that input columns have been trimmed & cast --- {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, --- {{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key, --- FROM clean_columns --- {{ qualify_dedupe_full_duplicate_lp_rows() }} --- ), - --- -- we have some authorisations where the same aggregation has multiple rows with the same timestamp --- -- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped --- -- the rest need to be handled downstream by checking against settlements data --- same_timestamp_simple_dupes AS ( --- SELECT --- _payments_key, --- (COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate, --- FROM add_keys_drop_full_dupes --- GROUP BY 1 --- ), - stg_enghouse__taps AS ( SELECT tap_id, @@ -79,7 +56,6 @@ stg_enghouse__taps AS ( transit_data, currency, par, - _line_number, `instance`, extract_filename, @@ -88,13 +64,8 @@ stg_enghouse__taps AS ( ts, -- _key, -- _payments_key, - -- _content_hash, + _content_hash, FROM clean_columns - -- FROM add_keys_drop_full_dupes - -- LEFT JOIN same_timestamp_simple_dupes - -- USING(_payments_key) - -- -- rows to drop are those where RRN is null and it's a duplicate - -- WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL ) SELECT * FROM stg_enghouse__taps diff --git a/warehouse/models/staging/payments/enghouse/stg_enghouse__tickets.sql b/warehouse/models/staging/payments/enghouse/stg_enghouse__tickets.sql index 651b07fee7..f92fec20e9 100644 --- a/warehouse/models/staging/payments/enghouse/stg_enghouse__tickets.sql +++ b/warehouse/models/staging/payments/enghouse/stg_enghouse__tickets.sql @@ -4,27 +4,24 @@ WITH source AS ( clean_columns AS ( SELECT --- reintroduce trim_make_empty_string_null/safe_cast for fields below - id, - ticket_id, - station_name, - amount, - clearing_id, - operator_id, - reason, - tap_id, - ticket_type, - created_dttm, - kafka_send_status, - line, - start_station, - end_station, - start_dttm, - end_dttm, - ticket_code, - additional_infos, - --- reintroduce handling of content generated by us below + {{ trim_make_empty_string_null('id') }} AS id, + {{ trim_make_empty_string_null('ticket_id') }} AS ticket_id, + {{ trim_make_empty_string_null('station_name') }} AS station_name, + {{ safe_cast('amount', type_numeric()) }} AS amount, + {{ trim_make_empty_string_null('clearing_id') }} AS clearing_id, + {{ trim_make_empty_string_null('operator_id') }} AS operator_id, + {{ trim_make_empty_string_null('reason') }} AS reason, + {{ trim_make_empty_string_null('tap_id') }} AS tap_id, + {{ trim_make_empty_string_null('ticket_type') }} AS ticket_type, + {{ safe_cast('created_dttm', type_timestamp()) }} AS created_dttm, + {{ trim_make_empty_string_null('kafka_send_status') }} AS kafka_send_status, + {{ trim_make_empty_string_null('line') }} AS line, + {{ trim_make_empty_string_null('start_station') }} AS start_station, + {{ trim_make_empty_string_null('end_station') }} AS end_station, + {{ safe_cast('start_dttm', type_timestamp()) }} AS start_dttm, + {{ safe_cast('end_dttm', type_timestamp()) }} AS end_dttm, + {{ trim_make_empty_string_null('ticket_code') }} AS ticket_code, + {{ trim_make_empty_string_null('additional_infos') }} AS additional_infos, CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, @@ -34,33 +31,12 @@ clean_columns AS ( ts, -- hash all content not generated by us to enable deduping full dup rows -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream - -- {{ dbt_utils.generate_surrogate_key(['','', '', '']) }} AS _content_hash, + {{ dbt_utils.generate_surrogate_key(['id', 'ticket_id', 'station_name', 'amount', 'clearing_id', 'operator_id', + 'reason', 'tap_id', 'ticket_type', 'created_dttm', 'kafka_send_status', 'line', 'start_station', + 'end_station', 'start_dttm', 'end_dttm', 'ticket_code', 'additional_infos']) }} AS _content_hash, FROM source ), --- revisit deduplication logic below - --- add_keys_drop_full_dupes AS ( --- SELECT --- *, --- -- generate keys now that input columns have been trimmed & cast --- {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, --- {{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key, --- FROM clean_columns --- {{ qualify_dedupe_full_duplicate_lp_rows() }} --- ), - --- -- we have some authorisations where the same aggregation has multiple rows with the same timestamp --- -- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped --- -- the rest need to be handled downstream by checking against settlements data --- same_timestamp_simple_dupes AS ( --- SELECT --- _payments_key, --- (COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate, --- FROM add_keys_drop_full_dupes --- GROUP BY 1 --- ), - stg_enghouse__tickets AS ( SELECT id, @@ -81,7 +57,6 @@ stg_enghouse__tickets AS ( end_dttm, ticket_code, additional_infos, - _line_number, `instance`, extract_filename, @@ -90,13 +65,8 @@ stg_enghouse__tickets AS ( ts, -- _key, -- _payments_key, - -- _content_hash, + _content_hash, FROM clean_columns - -- FROM add_keys_drop_full_dupes - -- LEFT JOIN same_timestamp_simple_dupes - -- USING(_payments_key) - -- -- rows to drop are those where RRN is null and it's a duplicate - -- WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL ) SELECT * FROM stg_enghouse__tickets diff --git a/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql b/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql index 023c193821..dd00e43688 100644 --- a/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql +++ b/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql @@ -4,27 +4,24 @@ WITH source AS ( clean_columns AS ( SELECT --- reintroduce trim_make_empty_string_null/safe_cast for fields below - id, - operation, - terminal_id, - mapping_terminal_id, - mapping_merchant_id, - timestamp, - amount, - payment_reference, - spdh_response, - response_type, - response_message, - token, - issuer_response, - core_response, - rrn, - authorization_code, - par, - brand, - --- reintroduce handling of content generated by us below + {{ trim_make_empty_string_null('id') }} AS id, + {{ trim_make_empty_string_null('operation') }} AS operation, + {{ trim_make_empty_string_null('terminal_id') }} AS terminal_id, + {{ trim_make_empty_string_null('mapping_terminal_id') }} AS mapping_terminal_id, + {{ trim_make_empty_string_null('mapping_merchant_id') }} AS mapping_merchant_id, + {{ safe_cast('timestamp', type_timestamp()) }} AS timestamp, + {{ safe_cast('amount', type_numeric()) }} AS amount, + {{ trim_make_empty_string_null('payment_reference') }} AS payment_reference, + {{ trim_make_empty_string_null('spdh_response') }} AS spdh_response, + {{ trim_make_empty_string_null('response_type') }} AS response_type, + {{ trim_make_empty_string_null('response_message') }} AS response_message, + {{ trim_make_empty_string_null('token') }} AS token, + {{ trim_make_empty_string_null('issuer_response') }} AS issuer_response, + {{ trim_make_empty_string_null('core_response') }} AS core_response, + {{ trim_make_empty_string_null('rrn') }} AS rrn, + {{ trim_make_empty_string_null('authorization_code') }} AS authorization_code, + {{ trim_make_empty_string_null('par') }} AS par, + {{ trim_make_empty_string_null('brand') }} AS brand, CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, @@ -34,33 +31,13 @@ clean_columns AS ( ts, -- hash all content not generated by us to enable deduping full dup rows -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream - -- {{ dbt_utils.generate_surrogate_key(['','', '', '']) }} AS _content_hash, + {{ dbt_utils.generate_surrogate_key(['id', 'operation', 'terminal_id', 'mapping_terminal_id', + 'mapping_merchant_id', 'timestamp', 'amount', 'payment_reference', 'spdh_response', + 'response_type', 'response_message', 'token', 'issuer_response', 'core_response','rrn', + 'authorization_code', 'par', 'brand']) }} AS _content_hash, FROM source ), --- revisit deduplication logic below - --- add_keys_drop_full_dupes AS ( --- SELECT --- *, --- -- generate keys now that input columns have been trimmed & cast --- {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, --- {{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key, --- FROM clean_columns --- {{ qualify_dedupe_full_duplicate_lp_rows() }} --- ), - --- -- we have some authorisations where the same aggregation has multiple rows with the same timestamp --- -- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped --- -- the rest need to be handled downstream by checking against settlements data --- same_timestamp_simple_dupes AS ( --- SELECT --- _payments_key, --- (COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate, --- FROM add_keys_drop_full_dupes --- GROUP BY 1 --- ), - stg_enghouse__transactions AS ( SELECT id, @@ -82,7 +59,7 @@ stg_enghouse__transactions AS ( par, brand, - _line_number, + _line_number, `instance`, extract_filename, enghouse_export_ts, @@ -90,13 +67,8 @@ stg_enghouse__transactions AS ( ts, -- _key, -- _payments_key, - -- _content_hash, + _content_hash, FROM clean_columns - -- FROM add_keys_drop_full_dupes - -- LEFT JOIN same_timestamp_simple_dupes - -- USING(_payments_key) - -- -- rows to drop are those where RRN is null and it's a duplicate - -- WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL ) SELECT * FROM stg_enghouse__transactions From 2c526acacb90cbf223328304cb4edceafb5427ae Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Mon, 4 Aug 2025 10:21:12 -0400 Subject: [PATCH 3/3] line spacing --- .../staging/payments/enghouse/stg_enghouse__transactions.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql b/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql index dd00e43688..3f6451ae2a 100644 --- a/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql +++ b/warehouse/models/staging/payments/enghouse/stg_enghouse__transactions.sql @@ -58,7 +58,6 @@ stg_enghouse__transactions AS ( authorization_code, par, brand, - _line_number, `instance`, extract_filename,