Skip to content

v1.9.1 #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Configuration variables
VERSION=1.9.0
VERSION=1.9.1
PROJ_DIR?=$(shell pwd)
VENV_DIR?=${PROJ_DIR}/.bldenv
BUILD_DIR=${PROJ_DIR}/build
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/oracle/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
version = "1.9.0"
version = "1.9.1"
2 changes: 0 additions & 2 deletions dbt/adapters/oracle/connection_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ class OracleNetConfig(dict):
'wallet_password',
'wallet_location',
'expire_time',
'https_proxy',
'https_proxy_port',
'retry_count',
'retry_delay',
'tcp_connect_timeout',
Expand Down
9 changes: 9 additions & 0 deletions dbt/adapters/oracle/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ class OracleAdapterCredentials(Credentials):
# session info is stored in v$session for each dbt run
session_info: Optional[Dict[str, str]] = field(default_factory=dict)

# read http proxy from profiles.yml
https_proxy: Optional[str] = None
https_proxy_port: Optional[int] = None


_ALIASES = {
'dbname': 'database',
Expand Down Expand Up @@ -243,6 +247,11 @@ def open(cls, connection):
elif purity == 'default':
conn_config['purity'] = oracledb.ATTR_PURITY_DEFAULT

if credentials.https_proxy and credentials.https_proxy_port:
conn_config['https_proxy'] = credentials.https_proxy
conn_config['https_proxy_port'] = credentials.https_proxy_port


if SQLNET_ORA_CONFIG is not None:
conn_config.update(SQLNET_ORA_CONFIG)

Expand Down
171 changes: 121 additions & 50 deletions dbt/include/oracle/macros/materializations/snapshot/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@
{% endmacro %}


{% macro snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% macro oracle__snapshot_staging_table(strategy, source_sql, target_relation) -%}

{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
{% if strategy.hard_deletes == 'new_record' %}
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
{% endif %}

with snapshot_query as (

Expand All @@ -52,22 +57,27 @@
snapshotted_data as (

select {{ target_relation }}.*,
{{ strategy.unique_key }} as dbt_unique_key

{{ unique_key_fields(strategy.unique_key) }}
from {{ target_relation }}
where dbt_valid_to is null

where
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
),

insertions_source_data as (

select
snapshot_query.*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ oracle__get_dbt_valid_to_current(strategy, columns) }},
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}

from snapshot_query
),
Expand All @@ -76,21 +86,21 @@

select
snapshot_query.*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
{{ strategy.updated_at }} as dbt_valid_to
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}

from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}

deletes_source_data as (

select
snapshot_query.*,
{{ strategy.unique_key }} as dbt_unique_key
{{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
),
{% endif %}
Expand All @@ -100,15 +110,16 @@
select
'insert' as dbt_change_type,
source_data.*
{%- if strategy.hard_deletes == 'new_record' -%}
,'False' as {{ columns.dbt_is_deleted }}
{%- endif %}

from insertions_source_data source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }})

)

),
Expand All @@ -118,53 +129,99 @@
select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}

from updates_source_data source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where (
{{ strategy.row_changed }}
)
)

{%- if strategy.invalidate_hard_deletes -%}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%}
,

deletes as (

select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}

from snapshotted_data
left join deletes_source_data source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
left join deletes_source_data source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}

{%- if strategy.hard_deletes == 'new_record' %}
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
,
deletion_records as (

select
'insert' as dbt_change_type,
{%- for col in source_sql_cols -%}
snapshotted_data.{{ adapter.quote(col.column) }},
{% endfor -%}
{%- if strategy.unique_key | is_list -%}
{%- for key in strategy.unique_key -%}
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
{% endfor -%}
{%- else -%}
snapshotted_data.dbt_unique_key as dbt_unique_key,
{% endif -%}
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}

)
{%- endif %}

select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
union all
select * from deletes
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
union all
select * from deletion_records
{%- endif %}

{%- endmacro %}



{% macro build_snapshot_table(strategy, sql) %}
{% macro oracle__build_snapshot_table(strategy, sql) %}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

select sbq.*,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
cast(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as TIMESTAMP(9)) as dbt_valid_to
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ oracle__get_dbt_valid_to_current(strategy, columns) }}
{%- if strategy.hard_deletes == 'new_record' -%}
, 'False' as {{ columns.dbt_is_deleted }}
{% endif -%}
from (
{{ sql }}
) sbq
Expand Down Expand Up @@ -239,32 +296,38 @@
{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['compiled_sql']) %}
{% set build_or_select_sql = build_sql %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }}

{% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %}
{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
{% if unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
{% endfor %}
{% endif %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% set quoted_source_columns = [] %}
Expand All @@ -281,6 +344,8 @@

{% endif %}

{{ check_time_data_types(build_or_select_sql) }}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}
Expand All @@ -305,9 +370,15 @@
{% endmaterialization %}

{% macro oracle__snapshot_hash_arguments(args) -%}
ORA_HASH({%- for arg in args -%}
coalesce(cast({{ arg }} as varchar(50) ), '')
STANDARD_HASH({%- for arg in args -%}
coalesce(cast({{ arg }} as varchar(4000) ), '')
{% if not loop.last %} || '|' || {% endif %}
{%- endfor -%})
{%- endfor -%}, 'SHA256')
{%- endmacro %}

{% macro oracle__get_dbt_valid_to_current(strategy, columns) %}
{% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "CAST(null as TIMESTAMP(9)" %}
coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{dbt_valid_to_current}}))
as {{ columns.dbt_valid_to }}
{% endmacro %}

Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,34 @@
{%- set insert_cols_csv = [] -%}

{% for column in insert_cols %}
{% do insert_cols_csv.append("s." + column) %}
{% do insert_cols_csv.append("DBT_INTERNAL_SOURCE." + column) %}
{% endfor %}

{%- set dest_cols_csv = [] -%}

{% for column in insert_cols %}
{% do dest_cols_csv.append("d." + column) %}
{% do dest_cols_csv.append("DBT_INTERNAL_DEST." + column) %}
{% endfor %}

merge into {{ target }} d
using {{ source }} s
on (s.dbt_scd_id = d.dbt_scd_id)
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}

merge into {{ target }} DBT_INTERNAL_DEST
using {{ source }} DBT_INTERNAL_SOURCE
on (DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }})

when matched
then update
set dbt_valid_to = s.dbt_valid_to
where d.dbt_valid_to is null
and s.dbt_change_type in ('update', 'delete')
set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}
where
{% if config.get("dbt_valid_to_current") %}
(DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or
DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null)
{% else %}
DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null
{% endif %}
and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
when not matched
then insert ({{ dest_cols_csv | join(', ') }})
values ({{ insert_cols_csv | join(', ') }})
where s.dbt_change_type = 'insert'
where DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
{% endmacro %}
Loading