diff --git a/Makefile b/Makefile index 6ac8918..f3f845b 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ # Configuration variables -VERSION=1.9.0 +VERSION=1.9.1 PROJ_DIR?=$(shell pwd) VENV_DIR?=${PROJ_DIR}/.bldenv BUILD_DIR=${PROJ_DIR}/build diff --git a/dbt/adapters/oracle/__version__.py b/dbt/adapters/oracle/__version__.py index fa0d09a..a2725ee 100644 --- a/dbt/adapters/oracle/__version__.py +++ b/dbt/adapters/oracle/__version__.py @@ -14,4 +14,4 @@ See the License for the specific language governing permissions and limitations under the License. """ -version = "1.9.0" +version = "1.9.1" diff --git a/dbt/adapters/oracle/connection_helper.py b/dbt/adapters/oracle/connection_helper.py index 30a4db2..cb4827b 100644 --- a/dbt/adapters/oracle/connection_helper.py +++ b/dbt/adapters/oracle/connection_helper.py @@ -71,8 +71,6 @@ class OracleNetConfig(dict): 'wallet_password', 'wallet_location', 'expire_time', - 'https_proxy', - 'https_proxy_port', 'retry_count', 'retry_delay', 'tcp_connect_timeout', diff --git a/dbt/adapters/oracle/connections.py b/dbt/adapters/oracle/connections.py index 8221d5f..e4ebe4a 100644 --- a/dbt/adapters/oracle/connections.py +++ b/dbt/adapters/oracle/connections.py @@ -120,6 +120,10 @@ class OracleAdapterCredentials(Credentials): # session info is stored in v$session for each dbt run session_info: Optional[Dict[str, str]] = field(default_factory=dict) + # read http proxy from profiles.yml + https_proxy: Optional[str] = None + https_proxy_port: Optional[int] = None + _ALIASES = { 'dbname': 'database', @@ -243,6 +247,11 @@ def open(cls, connection): elif purity == 'default': conn_config['purity'] = oracledb.ATTR_PURITY_DEFAULT + if credentials.https_proxy and credentials.https_proxy_port: + conn_config['https_proxy'] = credentials.https_proxy + conn_config['https_proxy_port'] = credentials.https_proxy_port + + if SQLNET_ORA_CONFIG is not None: conn_config.update(SQLNET_ORA_CONFIG) diff --git a/dbt/include/oracle/macros/materializations/snapshot/snapshot.sql b/dbt/include/oracle/macros/materializations/snapshot/snapshot.sql index 6a85fb1..bb9175f 100644 --- a/dbt/include/oracle/macros/materializations/snapshot/snapshot.sql +++ b/dbt/include/oracle/macros/materializations/snapshot/snapshot.sql @@ -41,7 +41,12 @@ {% endmacro %} -{% macro snapshot_staging_table(strategy, source_sql, target_relation) -%} +{% macro oracle__snapshot_staging_table(strategy, source_sql, target_relation) -%} + + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} + {% if strategy.hard_deletes == 'new_record' %} + {% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %} + {% endif %} with snapshot_query as ( @@ -52,22 +57,27 @@ snapshotted_data as ( select {{ target_relation }}.*, - {{ strategy.unique_key }} as dbt_unique_key - + {{ unique_key_fields(strategy.unique_key) }} from {{ target_relation }} - where dbt_valid_to is null - + where + {% if config.get('dbt_valid_to_current') %} + {% set source_unique_key = columns.dbt_valid_to | trim %} + {% set target_unique_key = config.get('dbt_valid_to_current') | trim %} + ( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null ) + {% else %} + {{ columns.dbt_valid_to }} is null + {% endif %} ), insertions_source_data as ( select snapshot_query.*, - {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, - {{ strategy.scd_id }} as dbt_scd_id + {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ oracle__get_dbt_valid_to_current(strategy, columns) }}, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }} from snapshot_query ), @@ -76,21 +86,21 @@ select snapshot_query.*, - {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - {{ strategy.updated_at }} as dbt_valid_to + {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} from snapshot_query ), - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} deletes_source_data as ( select snapshot_query.*, - {{ strategy.unique_key }} as dbt_unique_key + {{ unique_key_fields(strategy.unique_key) }} from snapshot_query ), {% endif %} @@ -100,15 +110,16 @@ select 'insert' as dbt_change_type, source_data.* + {%- if strategy.hard_deletes == 'new_record' -%} + ,'False' as {{ columns.dbt_is_deleted }} + {%- endif %} from insertions_source_data source_data - left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where snapshotted_data.dbt_unique_key is null - or ( - snapshotted_data.dbt_unique_key is not null - and ( - {{ strategy.row_changed }} - ) + left outer join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }} + or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }}) + ) ), @@ -118,16 +129,20 @@ select 'update' as dbt_change_type, source_data.*, - snapshotted_data.dbt_scd_id + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} from updates_source_data source_data - join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} where ( {{ strategy.row_changed }} ) ) - {%- if strategy.invalidate_hard_deletes -%} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%} , deletes as ( @@ -135,36 +150,78 @@ select 'delete' as dbt_change_type, source_data.*, - {{ snapshot_get_time() }} as dbt_valid_from, - {{ snapshot_get_time() }} as dbt_updated_at, - {{ snapshot_get_time() }} as dbt_valid_to, - snapshotted_data.dbt_scd_id + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} from snapshotted_data - left join deletes_source_data source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where source_data.dbt_unique_key is null + left join deletes_source_data source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + ) + {%- endif %} + + {%- if strategy.hard_deletes == 'new_record' %} + {% set source_sql_cols = get_column_schema_from_query(source_sql) %} + , + deletion_records as ( + + select + 'insert' as dbt_change_type, + {%- for col in source_sql_cols -%} + snapshotted_data.{{ adapter.quote(col.column) }}, + {% endfor -%} + {%- if strategy.unique_key | is_list -%} + {%- for key in strategy.unique_key -%} + snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }}, + {% endfor -%} + {%- else -%} + snapshotted_data.dbt_unique_key as dbt_unique_key, + {% endif -%} + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, + {{ new_scd_id }} as {{ columns.dbt_scd_id }}, + 'True' as {{ columns.dbt_is_deleted }} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + ) {%- endif %} select * from insertions union all select * from updates - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} union all select * from deletes {%- endif %} + {%- if strategy.hard_deletes == 'new_record' %} + union all + select * from deletion_records + {%- endif %} {%- endmacro %} -{% macro build_snapshot_table(strategy, sql) %} +{% macro oracle__build_snapshot_table(strategy, sql) %} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} select sbq.*, - {{ strategy.scd_id }} as dbt_scd_id, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - cast(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as TIMESTAMP(9)) as dbt_valid_to + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ oracle__get_dbt_valid_to_current(strategy, columns) }} + {%- if strategy.hard_deletes == 'new_record' -%} + , 'False' as {{ columns.dbt_is_deleted }} + {% endif -%} from ( {{ sql }} ) sbq @@ -239,32 +296,38 @@ {% if not target_relation_exists %} {% set build_sql = build_snapshot_table(strategy, model['compiled_sql']) %} + {% set build_or_select_sql = build_sql %} {% set final_sql = create_table_as(False, target_relation, build_sql) %} {% else %} - {{ adapter.valid_snapshot_target(target_relation) }} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} + {{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }} + + {% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %} {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} -- this may no-op if the database does not require column expansion {% do adapter.expand_target_column_types(from_relation=staging_table, to_relation=target_relation) %} + {% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %} + {% if unique_key | is_list %} + {% for key in strategy.unique_key %} + {{ remove_columns.append('dbt_unique_key_' + loop.index|string) }} + {{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }} + {% endfor %} + {% endif %} + {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% do create_columns(target_relation, missing_columns) %} {% set source_columns = adapter.get_columns_in_relation(staging_table) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% set quoted_source_columns = [] %} @@ -281,6 +344,8 @@ {% endif %} + {{ check_time_data_types(build_or_select_sql) }} + {% call statement('main') %} {{ final_sql }} {% endcall %} @@ -305,9 +370,15 @@ {% endmaterialization %} {% macro oracle__snapshot_hash_arguments(args) -%} - ORA_HASH({%- for arg in args -%} - coalesce(cast({{ arg }} as varchar(50) ), '') + STANDARD_HASH({%- for arg in args -%} + coalesce(cast({{ arg }} as varchar(4000) ), '') {% if not loop.last %} || '|' || {% endif %} - {%- endfor -%}) + {%- endfor -%}, 'SHA256') {%- endmacro %} +{% macro oracle__get_dbt_valid_to_current(strategy, columns) %} + {% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "CAST(null as TIMESTAMP(9)" %} + coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{dbt_valid_to_current}})) + as {{ columns.dbt_valid_to }} +{% endmacro %} + diff --git a/dbt/include/oracle/macros/materializations/snapshot/snapshot_merge.sql b/dbt/include/oracle/macros/materializations/snapshot/snapshot_merge.sql index ad355fe..e7fcb70 100644 --- a/dbt/include/oracle/macros/materializations/snapshot/snapshot_merge.sql +++ b/dbt/include/oracle/macros/materializations/snapshot/snapshot_merge.sql @@ -18,26 +18,34 @@ {%- set insert_cols_csv = [] -%} {% for column in insert_cols %} - {% do insert_cols_csv.append("s." + column) %} + {% do insert_cols_csv.append("DBT_INTERNAL_SOURCE." + column) %} {% endfor %} {%- set dest_cols_csv = [] -%} {% for column in insert_cols %} - {% do dest_cols_csv.append("d." + column) %} + {% do dest_cols_csv.append("DBT_INTERNAL_DEST." + column) %} {% endfor %} - merge into {{ target }} d - using {{ source }} s - on (s.dbt_scd_id = d.dbt_scd_id) + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} + + merge into {{ target }} DBT_INTERNAL_DEST + using {{ source }} DBT_INTERNAL_SOURCE + on (DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }}) when matched then update - set dbt_valid_to = s.dbt_valid_to - where d.dbt_valid_to is null - and s.dbt_change_type in ('update', 'delete') + set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }} + where + {% if config.get("dbt_valid_to_current") %} + (DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or + DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null) + {% else %} + DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null + {% endif %} + and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') when not matched then insert ({{ dest_cols_csv | join(', ') }}) values ({{ insert_cols_csv | join(', ') }}) - where s.dbt_change_type = 'insert' + where DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' {% endmacro %} diff --git a/dbt/include/oracle/macros/materializations/snapshot/strategies.sql b/dbt/include/oracle/macros/materializations/snapshot/strategies.sql index 0350418..f825ee7 100644 --- a/dbt/include/oracle/macros/materializations/snapshot/strategies.sql +++ b/dbt/include/oracle/macros/materializations/snapshot/strategies.sql @@ -17,7 +17,8 @@ {% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} {% set check_cols_config = config['check_cols'] %} {% set primary_key = config['unique_key'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} + {% set hard_deletes = adapter.get_hard_deletes_behavior(config) %} + {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} {% set select_current_time -%} select {{ snapshot_get_time() }} FROM dual @@ -28,7 +29,7 @@ {% if now is none or now is undefined -%} {%- do exceptions.raise_compiler_error('Could not get a snapshot start time from the database') -%} {%- endif %} - {% set updated_at = snapshot_string_as_time(now) %} + {% set updated_at = config.get('updated_at') or snapshot_string_as_time(now) %} {% set column_added = false %} @@ -59,14 +60,16 @@ ) {%- endset %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} + {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% do return({ "unique_key": primary_key, "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes + "invalidate_hard_deletes": invalidate_hard_deletes, + "hard_deletes": hard_deletes }) %} {% endmacro %} @@ -100,21 +103,25 @@ {%- endmacro %} {% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set primary_key = config['unique_key'] %} - {% set updated_at = config['updated_at'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} + {% set primary_key = config.get('unique_key') %} + {% set updated_at = config.get('updated_at') %} + {% set hard_deletes = adapter.get_hard_deletes_behavior(config) %} + {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} {% set row_changed_expr -%} - ({{ snapshotted_rel }}.dbt_valid_from < {{ current_rel }}.{{ updated_at }}) + ({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }}) {%- endset %} {# updated_at should be cast as timestamp because in hash computation "CAST(date as VARCHAR)" truncates time fields #} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, 'CAST(' ~ updated_at ~ ' AS TIMESTAMP)']) %} + {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% do return({ "unique_key": primary_key, "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes + "invalidate_hard_deletes": invalidate_hard_deletes, + "hard_deletes": hard_deletes }) %} {% endmacro %} diff --git a/dbt/include/oracle/macros/utils/timestamps.sql b/dbt/include/oracle/macros/utils/timestamps.sql index f660abc..9e01e31 100644 --- a/dbt/include/oracle/macros/utils/timestamps.sql +++ b/dbt/include/oracle/macros/utils/timestamps.sql @@ -23,3 +23,11 @@ {%- set result = "TO_TIMESTAMP('"~ timestamp ~ "','yyyy/mm/dd hh24:mi:ss.FF')" -%} {{ return(result) }} {%- endmacro %} + +{% macro get_snapshot_get_time_data_type() %} + {% set snapshot_time = adapter.dispatch('snapshot_get_time', 'dbt')() %} + {% set time_data_type_sql = 'select ' ~ snapshot_time ~ ' as dbt_snapshot_time from dual' %} + {% set snapshot_time_column_schema = get_column_schema_from_query(time_data_type_sql) %} + {% set time_data_type = snapshot_time_column_schema[0].dtype %} + {{ return(time_data_type or none) }} +{% endmacro %} \ No newline at end of file diff --git a/dbt_adbs_test_project/snapshots/promotion_costs.sql b/dbt_adbs_test_project/snapshots/promotion_costs.sql index 722c29b..cbaf7a2 100644 --- a/dbt_adbs_test_project/snapshots/promotion_costs.sql +++ b/dbt_adbs_test_project/snapshots/promotion_costs.sql @@ -18,7 +18,7 @@ strategy='check', unique_key='promo_id', check_cols='all', - invalidate_hard_deletes=True + hard_deletes='invalidate' ) }} select * from {{ ref('promotion_costs') }} diff --git a/requirements.txt b/requirements.txt index 35ffb48..7519458 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ dbt-common>=1.1.0,<2.0 dbt-adapters>=1.2.1,<2.0 dbt-core>=1.9.1,<2.0 -oracledb==2.5.1 +oracledb==3.0.0 diff --git a/setup.cfg b/setup.cfg index 9e3caab..069a393 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = dbt-oracle -version = 1.9.0 +version = 1.9.1 description = dbt (data build tool) adapter for Oracle Autonomous Database long_description = file: README.md long_description_content_type = text/markdown @@ -35,7 +35,7 @@ install_requires = dbt-common>=1.1.0,<2.0 dbt-adapters>=1.2.1,<2.0 dbt-core~=1.9,<1.10 - oracledb==2.5.1 + oracledb==3.0.0 test_suite=tests test_requires = dbt-tests-adapter~=1.10,<1.11 diff --git a/setup.py b/setup.py index c0e3742..3c20748 100644 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ "dbt-common>=1.1.0,<2.0", "dbt-adapters>=1.2.1,<2.0", "dbt-core~=1.9,<1.10", - "oracledb==2.5.1" + "oracledb==3.0.0" ] test_requirements = [ @@ -61,7 +61,7 @@ url = 'https://github.com/oracle/dbt-oracle' -VERSION = '1.9.0' +VERSION = '1.9.1' setup( author="Oracle", python_requires='>=3.9',