Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Microbatch] Leverage static partitioning in dbt-bigquery materialization strategy #802

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Leverage static partitioning in dbt-bigquery materialization strategy
time: 2025-02-07T09:03:15.963098Z
custom:
Author: borjav
Issue: "538"
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{% macro generate_dates_in_range(start_date_datetime, end_date_datetime, granularity) %}
{#-- Generate a list of datetimes between two dates #}
{% set total_offset_seconds = (end_date_datetime - start_date_datetime).total_seconds() %}
{% set total_offset_hours = (total_offset_seconds/3600)|int %}
{% set total_offset_days = (total_offset_hours/24)|int %}

{% if granularity == "day" %}
{% set total_offset = total_offset_days %}
{% set fmt_out = '"%Y-%m-%d"' %}
{% elif granularity == "hour" %}
{% set total_offset = total_offset_hours %}
{% set fmt_out = '"%Y-%m-%d %H:%M:%S"' %}
{% endif %}

{% set date_list = [] %}
{% for i in range(0, total_offset ) -%}
{% if granularity == "day" %}
{% set delta = modules.datetime.timedelta(days = i) %}
{% elif granularity == "hour" %}
{% set delta = modules.datetime.timedelta(hours = i) %}
{% endif %}
{% set this_date = start_date_datetime + delta %}
{% set _ = date_list.append(this_date.strftime(fmt_out)) %}
{% endfor -%}
{{ return(date_list) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@

{% endmacro %}


{% macro set_partitions(config, strategy, partition_by) %}
{#-- We override the partitions to force a static insert overwrite on microbatch, significantly more performant --#}
{% if strategy == "microbatch" and partition_by.copy_partitions is false %}
{{ return(bq_generate_static_partitions(config, partition_by.granularity)) }}
{% else %}
{{ return(config.get('partitions', none)) }}
{% endif %}
{% endmacro %}

{% materialization incremental, adapter='bigquery', supported_languages=['sql', 'python'] -%}

{%- set unique_key = config.get('unique_key') -%}
Expand All @@ -84,7 +94,7 @@

{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{%- set partitions = config.get('partitions', none) -%}
{%- set partitions = set_partitions(config, strategy, partition_by) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@

{{ return(build_sql) }}
{% endmacro %}

{% macro bq_generate_static_partitions(config, granularity) %}
{% set event_time_start = config.get("__dbt_internal_microbatch_event_time_start") %}
{% set event_time_end = config.get("__dbt_internal_microbatch_event_time_end") %}
{% set range_of_dates = generate_dates_in_range(event_time_start, event_time_end, granularity) %}

{{ return(range_of_dates) }}
{% endmacro %}
Loading