dbt integration with Airflow

Dilmeet Malhi
Drop Engineering
Published in
7 min readDec 22, 2022

--

As Drop continues to grow, it becomes increasingly difficult to deal with enormous amounts of data. Transforming, removing duplicates, organizing and filtering data are some of the challenges that the data engineers face on a daily basis, so how do we overcome them with relative ease and speed?

In todays fast-paced market, we constantly see new tools emerging that help accelerate data transformation processes but dbt is one such tool that has caught everyones eye.

So, what is dbt?

dbt or data build tool is an open source tool written in python that is used as a part of the transformation layer in Extract-Load-Transform (ELT) pipelines. dbt helps improve and fasten all the data transformation processes by creating and modularizing data models which are written in SQL.

At Drop, dbt will be used to build dimension and fact tables, and write tests to ensure and preserve data quality in the process of building a more reliable data warehouse in Redshift.

Image source

Why do we want dbt?

dbt has become a key component of the modern data engineering workflow and here’s why.

  • dbt comes with an automated testing framework. There are multiple packages you can find that help with complex tests. You can also develop custom tests for your data using Jinja or SQL.
  • dbt has various materializations like views, tables and incremental tables. It optimizes the workflow for incremental models as we no longer need to code incremental logic in long running SQL scripts.
  • dbt automatically creates data documentation and definitions. It centralizes documentation into a consistent and consumable repository that can be accessed by anyone.
  • dbt has a surprisingly low learning curve. Anyone who can write SQL queries and has basic knowledge of python can write dbt models!

How to set up dbt with Airflow?

At Drop, Airflow and dbt will be used together to achieve the goal of providing trustworthy data to internal teams while utilizing a common interface.

In this blogpost, we will walk through the integration of dbt with Airflow in detail. First, we will install dbt and run some basic dbt commands to get it to work. Then, we will walk through setting up dbt with Airflow and finally, write a basic DAG which would invoke dbt commands to run and test all of your models in the data warehouse.

Image source

Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow’s Python framework allows you to develop dbt workflows. It provides a scalable environment to run your SQL dbt models. One of the key features of Airflow that we will be using is that it enables you to trigger models only after the necessary tasks have run successfully. It also has an interactive web interface that helps you monitor and debug your pipelines in case of failure.

dbt set up in an existing airflow project

  • Create dbt project

First step is to create your dbt repository which would include all of the data models, tests, sources and configurations. dbt has great tutorials on how to write data models and tests and you can find them here. The best practice would be to include requirements.txt file which would include all the packages to be installed to run dbt jobs like dbt-core and dbt-redshift.

You will need to set credentials to database in redshift in profiles.yml file to run dbt jobs successfully. Storing your credentials as environment variables is recommended.

Credentials can be set by using iam roles or by using username and password method. For the purpose of this demo, we will use username-password method and your profiles.yml should look like the below.

Drop:
target: dev
outputs:
dev:
type: redshift
host: hostname.region.redshift.amazonaws.com
user: username
password: password
port: 5439
dbname: dev
schema: personal_schema
threads: 4
keepalives_idle: 240 # default 240 seconds
connect_timeout: 10 # default 10 seconds
# search_path: public # optional, not recommended
ra3_node: true # enables cross-database sources
  • Create virtual environment

Run the following command to create a virtual environment and install all the required packages.

python -m venv .venv  
source .venv/bin/activate
pip install -r requirements.txt
  • At this point, you should be able to run dbt commands. Run the following command to test if the connection to redshift is successful.
dbt debug
  • The following commands run and test data models and create automated documentation. It is important to note that when you run these commands dbt creates a file called manifest.json. This file is very handy, as it has the name of every model, every test or assumptions, and the dependency relationships between the models!
dbt run
dbt test
dbt docs generate

How to orchestrate dbt jobs using Airflow?

In order to build an analytics workflow in Airflow, we need to understand how it works. A workflow is represented as a DAG (a Directed Acyclic Graph) written in Python. It consists of tasks which behave as individual units, organized by considering all the dependencies. Here, each task will represent a dbt command to be run and Airflow will trigger each task when it is ready to run by taking the data flow into account. The workflow would ideally consist of updating staging tables and performing the transformation to create dimension tables followed by fact tables.

First step would be to move the dbt repository to the folder where Airflow is set up. At this point, the Airflow repository should include all of your DAGs and the dbt project.

  • Initial set up

If you have an existing Airflow docker container running, the requirements.txt file of the Airflow docker container needs to be updated to include dbt-redshift, dbt-core and airflow-dbt in order to run dbt commands. dbt-redshift package includes base models for Redshift and airflow-dbt package includes Airflow operators to provide easy integration with dbt. Once that is completed locally, rebuild the docker image.

  • Building the DAG

For the purpose of writing tasks in the DAG, I used a PYPI package airflow-dbt. It has the following operators which provide a predefined template to run your dbt commands with ease:

  1. DbtDocsGenerateOperator: Calls dbt docs generate
  2. DbtDepsOperator: Calls dbt deps
  3. DbtSeedOperator: Calls dbt seed
  4. DbtSnapshotOperator: Calls dbt snapshot
  5. DbtRunOperator: Calls dbt run
  6. DbtTestOperator: Calls dbt test

Each of the above operators accept the following arguments:

  • profiles_dir: If set, passed as the — profiles-dir argument to the dbt command
  • target: If set, passed as the — target argument to the dbt command
  • dir: The directory to run the dbt command in
  • full_refresh: If set to True, passes — full-refresh
  • models: If set, passed as the — models argument to the dbt command
  • exclude: If set, passed as the — exclude argument to the dbt command
  • select: If set, passed as the — select argument to the dbt command

You can run Airflow tasks by using the above dbt operators which provide a predefined task template to run various data models. A basic DAG should look something like this

from airflow_dbt.operators.dbt_operator import (
DbtRunOperator,
DbtTestOperator
)

with DAG(
"dbt_basic_dag",
start_date=datetime(2022, 11, 23),
description="An Airflow DAG to invoke dbt runs to update and test tables",
schedule_interval="@once",
catchup=False,
default_args=args

) as dag:
dbt_run = DbtRunOperator(
task_id='dbt_run',
target = target_env,
profiles_dir =profiles_dir,
dir = project_dir,
full_refresh = False,
select = 'example_model', # Allows you to specify models to run
)
dbt_test = DbtTestOperator(
task_id='dbt_test',
target = target_env,
profiles_dir =profiles_dir,
dir = project_dir,
select = 'example_model', # Allows you to specify models to run
)

dbt_run >> dbt_test

You can also run dbt commands by using bash operators. The following task will test your database connection using bash operator which ideally would be the first task you run in your DAG.

dbt_debug = BashOperator(
task_id="dbt_debug",
bash_command=f"dbt debug --profiles-dir {profiles_dir} --project-dir {project_dir} --target {target_env}",
)

Once the DAG is built, you can trigger the DAG manually on your local Airflow or trigger it by executing commands on the command line once you run the Airflow container.

Airflow interface

Once you open the Airflow interface locally, you will be to check if the DAG has run successfully and your tasks should turn green! You can also verify if the data has been updated by checking your database.

In case of failure, Airflow has a mechanism to send alerts using automated emails or slack notifications. You will be able to see the logs on the interface which are very helpful to troubleshoot.

One of the challenges I faced was backfilling an incremental fact table. It contained billions of rows and a complex SQL logic so running a simple dbt run command took a lot of time and CPU usage. Luckily, dbt has another materialization called insert_by_period model which allows you to insert records into a table one period a time. We were able to backfill the table using yearly periods and once that was done, we proceeded with our incremental runs.

Future Ideas

With the help of manifest.json file which contains information about all of the models, we should be able to automate writing DAG files for all dbt projects and make the pipelines dynamic!

Impact

With dbt, we were able to produce a more reliable warehouse without needing to write complex queries to load incremental tables. With dbt’s additional functionality, we are able to test our tables for duplicates actively which we had no way of doing before. It posed as a big challenge while dealing with some large fact tables. With improved data lineage and traceability throughout the lifecycle all the way out to reporting, we now know which models connect to which which further allows us to refresh data upstream/downstream as needed with simple commands. We are also able to provide transparency and visibility into what the data is for and how it is produced in the age of data-driven decision making.

With it’s compatibility with Airflow, it is easy to orchestrate and schedule dbt jobs to provide usable data on a daily basis in an optimal fashion!

Helpful Sources

Datafold , dbt labs and Astronomer have multiple blogs explaining different ways on how to run dbt and Airflow together. I particularly found one such blog very useful and it is attached here.

--

--