An Airflow operator and hook to interface with the dbt-core
Python package.
Although dbt
is meant to be installed and used as a CLI, we may not have control of the environment where Airflow is running, disallowing us the option of using dbt
as a CLI.
This is exactly what happens when using Amazon's Managed Workflows for Apache Airflow or MWAA: although a list of Python requirements can be passed, the CLI cannot be found in the worker's PATH.
There is a workaround which involves using Airflow's BashOperator
and running Python from the command line:
from airflow.operators.bash import BashOperator
BASH_COMMAND = "python -c 'from dbt.main import main; main()' run"
operator = BashOperator(
task_id="dbt_run",
bash_command=BASH_COMMAND,
)
But it can get sloppy when appending all potential arguments a dbt run
command (or other subcommand) can take.
That's where airflow-dbt-python
comes in: it abstracts the complexity of interfacing with dbt-core
and exposes one operator for each dbt
subcommand that can be instantiated with all the corresponding arguments that the dbt
CLI would take.
The alternative airflow-dbt
package, by default, would not work if the dbt
CLI is not in PATH, which means it would not be usable in MWAA. There is a workaround via the dbt_bin
argument, which can be set to "python -c 'from dbt.main import main; main()' run"
, in similar fashion as the BashOperator
example. Yet this approach is not without its limitations:
airflow-dbt
works by wrapping thedbt
CLI, which makes our code dependent on the environment in which it runs.airflow-dbt
does not support the full range of arguments a command can take. For example,DbtRunOperator
does not have an attribute forfail_fast
.airflow-dbt
does not offer access todbt
artifacts created during execution.airflow-dbt-python
does so by pushing any artifacts to XCom.
Besides running dbt
as one would do if doing so manually, airflow-dbt-python
also supports a few additional features to bring dbt
closer to being a first-class citizen of Airflow.
The arguments profiles_dir
and project_dir
would normally point to a directory containing a profiles.yml
file and a dbt project in the local environment respectively. airflow-dbt-python
extends these arguments to also take an AWS S3 URL (identified by an s3://
scheme):
- If an S3 URL is used for
profiles_dir
, then this URL must point to a directory in S3 that contains aprofiles.yml
file. Theprofiles.yml
file will be downloaded and made available for the operator to use when running. - If an S3 URL is used for
project_dir
, then this URL must point to a directory in S3 containing all the files required for adbt
project to run. All of the contents of this directory will be downloaded and made available for the operator. The URL may also point to a zip file containing all the files of adbt
project, which will be downloaded, uncompressed, and made available for the operator.
This feature is intended to work in line with Airflow's description of the task concept:
Tasks don’t pass information to each other by default, and run entirely independently.
In our world, that means task should be responsible of fetching all the dbt
related files it needs in order to run independently. This is particularly relevant for an Airflow deployment with a remote executor, as Airflow does not guarantee which worker will run a particular task.
Each dbt
execution produces several JSON artifacts that may be valuable to obtain metrics, build conditional workflows, for reporting purposes, or other uses. airflow-dbt-python
can push these artifacts to XCom as requested by exposing a do_xcom_push_artifacts
argument, which takes a list of artifacts to push. This way, artifacts may be pulled and operated on by downstream tasks. For example:
with DAG(
dag_id="example_dbt_artifacts",
schedule_interval="0 0 * * *",
start_date=days_ago(1),
catchup=False,
dagrun_timeout=dt.timedelta(minutes=60),
) as dag:
dbt_run = DbtRunOperator(
task_id="dbt_run_daily",
project_dir="/path/to/my/dbt/project/",
profiles_dir="~/.dbt/",
select=["+tag:daily"],
exclude=["tag:deprecated"],
target="production",
profile="my-project",
full_refresh=True,
do_xcom_push_artifacts=["manifest.json", "run_results.json"],
)
process_artifacts = PythonOperator(
task_id="process_artifacts",
python_callable=process_dbt_artifacts,
provide_context=True,
)
dbt_run >> process_artifacts
See the full example here.
Currently, the following dbt
commands are supported:
clean
compile
debug
deps
ls
parse
run
run-operation
seed
snapshot
source
test
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_dbt_python.operators.dbt import (
DbtRunOperator,
DbtSeedOperator,
DbtTestoperator,
)
args = {
'owner': 'airflow',
}
with DAG(
dag_id='example_dbt_operator',
default_args=args,
schedule_interval='0 0 * * *',
start_date=days_ago(2),
dagrun_timeout=timedelta(minutes=60),
tags=['example', 'example2'],
) as dag:
dbt_test = DbtTestOperator(
task_id="dbt_test",
selector_name=["pre-run-tests"],
)
dbt_seed = DbtSeedOperator(
task_id="dbt_seed",
select=["/path/to/first.csv", "/path/to/second.csv"],
full_refresh=True,
)
dbt_run = DbtRunOperator(
task_id="dbt_run",
select=["/path/to/models"],
full_refresh=True,
fail_fast=True,
)
dbt_test >> dbt_seed >> dbt_run
More examples can be found in the examples/
directory.
To line up with dbt-core
, airflow-dbt-python
supports Python 3.7, 3.8, and 3.9. We also include Python 3.10 in our testing pipeline, although dbt-core
does not yet support it.
On the Airflow side, we unit test with version 1.10.12 and the latest version 2 release.
Finally, airflow-dbt-python
requires at least dbt-core
version 1.0.0. Since dbt-core
follows semantic versioning, we do not impose any restrictions on the minor and patch versions, but do keep in mind that the latest dbt-core
features incorporated as minor releases may not yet be supported.
pip install airflow-dbt-python
Any dbt
adapters you require may be installed by specifying extras:
pip install airflow-dby-python[snowflake,postgres]
Clone the repo:
git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python
With poetry:
poetry install
Install any extras you need, and only those you need:
poetry install -E postgres -E redshift
Add airflow-dbt-python
to your requirements.txt
file and edit your Airflow environment to use this new requirements.txt
file.
Tests are written using pytest
, can be located in tests/
, and they can be run locally with poetry
:
poetry run pytest tests/ -vv