Building a Data Pipeline with DBT, PostgreSQL, and Airflow
*based on a video from Data Engineering Academy. Steps still need verification
Overview
This technical guide demonstrates how to set up a complete data pipeline using Docker, PostgreSQL, DBT, and Airflow in a Linux environment (Ubuntu) running on Windows Subsystem for Linux (WSL). The pipeline includes data ingestion from a weather API, data storage in PostgreSQL, data transformation using DBT, and orchestration with Airflow.
Prerequisites
- Windows machine with administrative access
- Basic understanding of Linux commands, SQL, and Python
- Familiarity with data engineering concepts
System Setup
1. Enable Windows Subsystem for Linux (WSL)
- Search for "Windows features" in the Windows search bar
- Check the box for "Windows Subsystem for Linux"
- Click "OK" and restart if prompted
2. Install Ubuntu on WSL
- Open Visual Studio Code
- Click the "Remote Window" button in the bottom left corner
- Select "Connect to WSL"
- Choose "Install new distro" and select "Ubuntu 24"
- Create a username and password when prompted
3. Configure Docker with WSL
- Install Docker Desktop for Windows if not already installed
- Open Docker Desktop settings
- Navigate to "Resources" > "WSL Integration"
- Enable integration with Ubuntu
- Apply changes and restart Docker
Project Setup
1. Create Project Structure
# Navigate to home directory
cd ~
# Create project directories
mkdir -p repos/dbt_postgres_airflow/{dbt,postgres,airflow}
2. Set Up PostgreSQL with Docker
Start PostgreSQL:
docker compose up -d
Create a docker-compose.yaml
file:
services:
dw:
image: postgres:latest
ports:
- "5432:5432"
environment:
POSTGRES_DB: dw
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
DB_NAME: dw
DB_USER: dw_user
DB_PASSWORD: dw_password
Navigate to the postgres folder:
cd ~/repos/dbt_postgres_airflow/postgres
Installing Required Software
1. Install Homebrew
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
# Add Homebrew to PATH (follow terminal instructions after installation)
# The exact command will be provided after installation
2. Install Python Using Pyenv
# Install prerequisites
sudo apt-get update
sudo apt-get install -y build-essential
# Install additional libraries required for Python
sudo apt-get install -y libssl-dev zlib1g-dev libbz2-dev \
libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev \
libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev \
python3-openssl git
# Install Pyenv
brew install pyenv
# Add Pyenv to shell configuration
echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.profile
echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.profile
echo 'eval "$(pyenv init --path)"' >> ~/.profile
echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc
echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc
echo 'eval "$(pyenv init --path)"' >> ~/.bashrc
# Load Pyenv
source ~/.profile
source ~/.bashrc
# Enable automatic activation of Pyenv
echo 'eval "$(pyenv virtualenv-init -)"' >> ~/.bashrc
source ~/.bashrc
# Install Python 3.10
pyenv install 3.10.17
pyenv global 3.10.17
Setting Up Virtual Environments
1. Create and Configure DBT Environment
# Navigate to the DBT directory
cd ~/repos/dbt_postgres_airflow/dbt
# Create a virtual environment
pyenv local demo-dbt
# Install DBT with PostgreSQL adapter
pip install --upgrade pip
pip install dbt-postgres
# Initialize DBT project
dbt init my_project
# Project configuration
# - Database type: postgres
# - Host: localhost
# - Port: 5432
# - User: dw_user
# - Password: dw_password
# - Database name: dw
# - Schema: dev
# - Threads: 1
2. Create and Configure Airflow Environment
# Navigate to the Airflow directory
cd ~/repos/dbt_postgres_airflow/airflow
# Create a virtual environment
pyenv local demo-airflow
# Set Airflow home directory
echo 'export AIRFLOW_HOME="$(pwd)"' >> ~/.bashrc
source ~/.bashrc
# Install Airflow
pip install --upgrade pip
pip install "apache-airflow==2.6.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt"
# Start Airflow in standalone mode
airflow standalone
Setting Up DBT
1. Configure DBT Project
Run example models:
dbt run
Test DBT connection:
dbt debug
Navigate to your DBT project:
cd ~/repos/dbt_postgres_airflow/dbt/my_project
2. Create Custom Models
Create macros:
-- macros/delete_insert.sql
{% macro macro_delete_insert() %}
{% if adapter.get_relation(this.database, this.schema, this.identifier) %}
truncate table {{ this }};
{% endif %}
{% endmacro %}
Create incremental models:
-- models/mart/delete_insert.sql
{{
config(
materialized='incremental',
unique_key='id',
pre_hook="{{ macro_delete_insert() }}"
)
}}
with ranked as (
select
*,
row_number() over (partition by id order by inserted_at desc) as row_num
from {{ ref('stage_weather_report') }}
)
select
id,
city,
temperature,
weather_descriptions,
wind_speed,
weather_time_local,
inserted_at,
inserted_at_local,
utc_offset
from ranked
where row_num = 1
Create staging model:
-- models/staging/stage_weather_report.sql
select
id,
city,
temperature,
weather_descriptions,
wind_speed,
time as weather_time_local,
inserted_at,
-- Convert UTC time to local time using the offset
inserted_at::timestamp + utc_offset::interval as inserted_at_local,
utc_offset
from {{ source('dev', 'weather_report') }}
Create source definitions:
# models/sources/sources.yaml
version: 2
sources:
- name: dev
database: dw
tables:
- name: weather_report
columns:
- name: id
- name: city
- name: temperature
- name: weather_descriptions
- name: wind_speed
- name: time
- name: inserted_at
- name: utc_offset
Create a models structure:
cd ~/repos/dbt_postgres_airflow/dbt/my_project/models
mkdir -p {sources,staging,mart}
Setting Up API Data Ingestion
1. Create Helper Functions
Install required packages:
pip install requests python-dotenv psycopg2-binary logging
Create helper functions:
# utilities/helper_functions.py
import os
import requests
import logging
import psycopg2
from dotenv import load_dotenv
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
def fetch_data():
"""Fetch weather data from the WeatherStack API"""
logger.info("Fetching weather data from WeatherStack API")
try:
url = "http://api.weatherstack.com/current"
params = {
"access_key": os.getenv("WEATHERSTACK_API_KEY"),
"query": "New York" # You can parameterize this
}
response = requests.get(url, params=params)
response.raise_for_status()
logger.info("API response received successfully")
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API request error: {e}", exc_info=True)
raise
def connect_to_db():
"""Connect to the PostgreSQL database"""
logger.info("Connecting to the PostgreSQL database")
try:
connection = psycopg2.connect(
host="localhost",
port=5432,
dbname="dw",
user="dw_user",
password="dw_password"
)
logger.info("Database connection established")
return connection
except psycopg2.Error as e:
logger.error(f"Database connection failed: {e}", exc_info=True)
raise
def create_schema_table(conn):
"""Create schema and table if they don't exist"""
logger.info("Creating schema and table if not exists")
try:
cursor = conn.cursor()
cursor.execute("""
CREATE SCHEMA IF NOT EXISTS dev;
CREATE TABLE IF NOT EXISTS dev.weather_report (
id SERIAL PRIMARY KEY,
city TEXT,
temperature FLOAT,
weather_descriptions TEXT,
wind_speed FLOAT,
time TIMESTAMP,
inserted_at TIMESTAMP DEFAULT NOW(),
utc_offset TEXT
);
""")
conn.commit()
logger.info("Schema and table created")
except psycopg2.Error as e:
logger.error(f"Failed to create schema or table: {e}", exc_info=True)
raise
def insert_records(conn, data):
"""Insert weather data into the database"""
logger.info("Inserting weather data into the database")
try:
cursor = conn.cursor()
weather = data['current']
location = data['location']
logger.info(f"Preparing insert for city: {location['name']}")
cursor.execute("""
INSERT INTO dev.weather_report (
city,
temperature,
weather_descriptions,
wind_speed,
time,
inserted_at,
utc_offset
) VALUES (%s, %s, %s, %s, %s, NOW(), %s)
""", (
location['name'],
weather['temperature'],
weather['weather_descriptions'][0],
weather['wind_speed'],
location['localtime'],
location['utc_offset']
))
conn.commit()
logger.info("Insert successfully completed")
except psycopg2.Error as e:
logger.error(f"Error inserting data into the database: {e}", exc_info=True)
raise
def main():
"""Main function to execute the ETL process"""
connection = None
try:
logger.info("Starting weather data ETL process")
data = fetch_data()
logger.info("Fetched data from API")
connection = connect_to_db()
logger.info("Connected to the database")
create_schema_table(connection)
insert_records(connection, data)
except Exception as e:
logger.error(f"An error occurred during execution: {e}", exc_info=True)
finally:
if connection:
connection.close()
logger.info("Database connection closed")
if __name__ == "__main__":
main()
Create .env
file for API key:
# utilities/.env
WEATHERSTACK_API_KEY=your_api_key_here
Create utilities folder:
mkdir -p ~/repos/dbt_postgres_airflow/airflow/utilities
2. Create Airflow DAGs
API Orchestrator DAG
# dags/api_orchestrator.py
from datetime import datetime
import os
import sys
import logging
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
# Add utilities folder to Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'utilities')))
from helper_functions import main
logger = logging.getLogger(__name__)
with DAG(
dag_id="data_fetch_daily",
start_date=pendulum.today(),
schedule_interval="@daily",
catchup=False
) as dag:
fetch_weather_task = PythonOperator(
task_id="ingest_weather_data",
python_callable=main
)
DBT Orchestrator DAG
# dags/dbt_orchestrator.py
import os
import json
from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
# Set paths
home_dir = os.environ.get('HOME')
dbt_project_dir = os.path.join(home_dir, 'repos', 'dbt_postgres_airflow', 'dbt', 'my_project')
manifest_path = os.path.join(dbt_project_dir, 'target', 'manifest.json')
# Load the manifest file
with open(manifest_path) as f:
manifest = json.load(f)
# Define DAG
with DAG(
dag_id="dbt_orchestrator",
start_date=pendulum.today(),
schedule_interval="@hourly",
catchup=False
) as dag:
# Loop through nodes in the manifest file
for node_name, node in manifest['nodes'].items():
# Only include models, not sources
if node.get('resource_type') == 'model':
task_id = f"dbt_run_{node.get('name')}"
bash_command = f"cd {dbt_project_dir} && dbt run --select {node.get('name')}"
# Create task
task = BashOperator(
task_id=task_id,
bash_command=bash_command
)
# Set up dependencies based on the refs in the model
for upstream_node in node.get('depends_on', {}).get('nodes', []):
# Only create dependencies to other models (not sources)
if upstream_node in manifest['nodes'] and manifest['nodes'][upstream_node].get('resource_type') == 'model':
upstream_task_id = f"dbt_run_{manifest['nodes'][upstream_node].get('name')}"
# Use the existing task
dag.get_task(upstream_task_id) >> task
Querying and Validating
1. Query PostgreSQL Database
# Connect to PostgreSQL
docker compose exec dw psql -U dw_user -d dw
# List schemas
\dn
# List tables in dev schema
\dt dev.*
# Query weather data
SELECT * FROM dev.weather_report;
# Exit PostgreSQL
\q
2. Validate DBT Models
# Run DBT models
cd ~/repos/dbt_postgres_airflow/dbt/my_project
dbt run
# Check DBT documentation
dbt docs generate
dbt docs serve
3. Monitor Airflow DAGs
- Access the Airflow UI at http://localhost:8080
- Default credentials are displayed in the terminal when starting Airflow
- Monitor the execution of both DAGs:
data_fetch_daily
: Runs once per day to fetch weather datadbt_orchestrator
: Runs hourly to process the models
Conclusion
You have successfully set up a complete data pipeline that:
- Fetches data from an external API
- Stores it in PostgreSQL
- Transforms it using DBT models
- Orchestrates the entire pipeline with Airflow
This foundation can be extended to support more complex ETL processes, additional data sources, and sophisticated transformations.
Questions for Further Clarification
- What specific version of PostgreSQL is being used? The documentation uses "latest," but a specific version might be preferable for production.
- What is the purpose of having both "delete_insert" and "merge_insert" models? The documentation mentions them both but only shows code for one.
- What are the specific requirements for the WeatherStack API account? (Free tier limitations, API rate limits, etc.)
- Are there any specific settings needed for running this in a production environment rather than a development setup?
- What types of data validation or quality checks should be implemented alongside this pipeline?
- How should the pipeline handle errors such as API failures or database connectivity issues?
- What monitoring solutions would be recommended for long-term operation of this pipeline?