Building a Data Pipeline with DBT, PostgreSQL, and Airflow

*based on a video from Data Engineering Academy. Steps still need verification

Overview

This technical guide demonstrates how to set up a complete data pipeline using Docker, PostgreSQL, DBT, and Airflow in a Linux environment (Ubuntu) running on Windows Subsystem for Linux (WSL). The pipeline includes data ingestion from a weather API, data storage in PostgreSQL, data transformation using DBT, and orchestration with Airflow.

Prerequisites

  • Windows machine with administrative access
  • Basic understanding of Linux commands, SQL, and Python
  • Familiarity with data engineering concepts

System Setup

1. Enable Windows Subsystem for Linux (WSL)

  1. Search for "Windows features" in the Windows search bar
  2. Check the box for "Windows Subsystem for Linux"
  3. Click "OK" and restart if prompted

2. Install Ubuntu on WSL

  1. Open Visual Studio Code
  2. Click the "Remote Window" button in the bottom left corner
  3. Select "Connect to WSL"
  4. Choose "Install new distro" and select "Ubuntu 24"
  5. Create a username and password when prompted

3. Configure Docker with WSL

  1. Install Docker Desktop for Windows if not already installed
  2. Open Docker Desktop settings
  3. Navigate to "Resources" > "WSL Integration"
  4. Enable integration with Ubuntu
  5. Apply changes and restart Docker

Project Setup

1. Create Project Structure

# Navigate to home directory
cd ~

# Create project directories
mkdir -p repos/dbt_postgres_airflow/{dbt,postgres,airflow}

2. Set Up PostgreSQL with Docker

Start PostgreSQL:

docker compose up -d

Create a docker-compose.yaml file:

services:
  dw:
    image: postgres:latest
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: dw
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      DB_NAME: dw
      DB_USER: dw_user
      DB_PASSWORD: dw_password

Navigate to the postgres folder:

cd ~/repos/dbt_postgres_airflow/postgres

Installing Required Software

1. Install Homebrew

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

# Add Homebrew to PATH (follow terminal instructions after installation)
# The exact command will be provided after installation 

2. Install Python Using Pyenv

# Install prerequisites
sudo apt-get update
sudo apt-get install -y build-essential

# Install additional libraries required for Python
sudo apt-get install -y libssl-dev zlib1g-dev libbz2-dev \
  libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev \
  libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev \
  python3-openssl git

# Install Pyenv
brew install pyenv

# Add Pyenv to shell configuration
echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.profile
echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.profile
echo 'eval "$(pyenv init --path)"' >> ~/.profile

echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc
echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc
echo 'eval "$(pyenv init --path)"' >> ~/.bashrc

# Load Pyenv
source ~/.profile
source ~/.bashrc

# Enable automatic activation of Pyenv
echo 'eval "$(pyenv virtualenv-init -)"' >> ~/.bashrc
source ~/.bashrc

# Install Python 3.10
pyenv install 3.10.17
pyenv global 3.10.17

Setting Up Virtual Environments

1. Create and Configure DBT Environment

# Navigate to the DBT directory
cd ~/repos/dbt_postgres_airflow/dbt

# Create a virtual environment
pyenv local demo-dbt

# Install DBT with PostgreSQL adapter
pip install --upgrade pip
pip install dbt-postgres

# Initialize DBT project
dbt init my_project

# Project configuration
# - Database type: postgres
# - Host: localhost
# - Port: 5432
# - User: dw_user
# - Password: dw_password
# - Database name: dw
# - Schema: dev
# - Threads: 1

2. Create and Configure Airflow Environment

# Navigate to the Airflow directory
cd ~/repos/dbt_postgres_airflow/airflow

# Create a virtual environment
pyenv local demo-airflow

# Set Airflow home directory
echo 'export AIRFLOW_HOME="$(pwd)"' >> ~/.bashrc
source ~/.bashrc

# Install Airflow
pip install --upgrade pip
pip install "apache-airflow==2.6.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt"

# Start Airflow in standalone mode
airflow standalone

Setting Up DBT

1. Configure DBT Project

Run example models:

dbt run

Test DBT connection:

dbt debug

Navigate to your DBT project:

cd ~/repos/dbt_postgres_airflow/dbt/my_project

2. Create Custom Models

Create macros:

-- macros/delete_insert.sql
{% macro macro_delete_insert() %}
  {% if adapter.get_relation(this.database, this.schema, this.identifier) %}
    truncate table {{ this }};
  {% endif %}
{% endmacro %}

Create incremental models:

-- models/mart/delete_insert.sql
{{
  config(
    materialized='incremental',
    unique_key='id',
    pre_hook="{{ macro_delete_insert() }}"
  )
}}

with ranked as (
  select
    *,
    row_number() over (partition by id order by inserted_at desc) as row_num
  from {{ ref('stage_weather_report') }}
)
select
  id,
  city,
  temperature,
  weather_descriptions,
  wind_speed,
  weather_time_local,
  inserted_at,
  inserted_at_local,
  utc_offset
from ranked
where row_num = 1

Create staging model:

-- models/staging/stage_weather_report.sql
select
  id,
  city,
  temperature,
  weather_descriptions,
  wind_speed,
  time as weather_time_local,
  inserted_at,
  -- Convert UTC time to local time using the offset
  inserted_at::timestamp + utc_offset::interval as inserted_at_local,
  utc_offset
from {{ source('dev', 'weather_report') }}

Create source definitions:

# models/sources/sources.yaml
version: 2
sources:
  - name: dev
    database: dw
    tables:
      - name: weather_report
        columns:
          - name: id
          - name: city
          - name: temperature
          - name: weather_descriptions
          - name: wind_speed
          - name: time
          - name: inserted_at
          - name: utc_offset

Create a models structure:

cd ~/repos/dbt_postgres_airflow/dbt/my_project/models
mkdir -p {sources,staging,mart}

Setting Up API Data Ingestion

1. Create Helper Functions

Install required packages:

pip install requests python-dotenv psycopg2-binary logging

Create helper functions:

# utilities/helper_functions.py
import os
import requests
import logging
import psycopg2
from dotenv import load_dotenv

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

def fetch_data():
    """Fetch weather data from the WeatherStack API"""
    logger.info("Fetching weather data from WeatherStack API")
    try:
        url = "http://api.weatherstack.com/current"
        params = {
            "access_key": os.getenv("WEATHERSTACK_API_KEY"),
            "query": "New York"  # You can parameterize this
        }
        response = requests.get(url, params=params)
        response.raise_for_status()
        logger.info("API response received successfully")
        return response.json()
    except requests.exceptions.RequestException as e:
        logger.error(f"API request error: {e}", exc_info=True)
        raise

def connect_to_db():
    """Connect to the PostgreSQL database"""
    logger.info("Connecting to the PostgreSQL database")
    try:
        connection = psycopg2.connect(
            host="localhost",
            port=5432,
            dbname="dw",
            user="dw_user",
            password="dw_password"
        )
        logger.info("Database connection established")
        return connection
    except psycopg2.Error as e:
        logger.error(f"Database connection failed: {e}", exc_info=True)
        raise

def create_schema_table(conn):
    """Create schema and table if they don't exist"""
    logger.info("Creating schema and table if not exists")
    try:
        cursor = conn.cursor()
        cursor.execute("""
            CREATE SCHEMA IF NOT EXISTS dev;
            CREATE TABLE IF NOT EXISTS dev.weather_report (
                id SERIAL PRIMARY KEY,
                city TEXT,
                temperature FLOAT,
                weather_descriptions TEXT,
                wind_speed FLOAT,
                time TIMESTAMP,
                inserted_at TIMESTAMP DEFAULT NOW(),
                utc_offset TEXT
            );
        """)
        conn.commit()
        logger.info("Schema and table created")
    except psycopg2.Error as e:
        logger.error(f"Failed to create schema or table: {e}", exc_info=True)
        raise

def insert_records(conn, data):
    """Insert weather data into the database"""
    logger.info("Inserting weather data into the database")
    try:
        cursor = conn.cursor()
        weather = data['current']
        location = data['location']
        
        logger.info(f"Preparing insert for city: {location['name']}")
        
        cursor.execute("""
            INSERT INTO dev.weather_report (
                city, 
                temperature, 
                weather_descriptions, 
                wind_speed, 
                time, 
                inserted_at, 
                utc_offset
            ) VALUES (%s, %s, %s, %s, %s, NOW(), %s)
        """, (
            location['name'],
            weather['temperature'],
            weather['weather_descriptions'][0],
            weather['wind_speed'],
            location['localtime'],
            location['utc_offset']
        ))
        
        conn.commit()
        logger.info("Insert successfully completed")
    except psycopg2.Error as e:
        logger.error(f"Error inserting data into the database: {e}", exc_info=True)
        raise

def main():
    """Main function to execute the ETL process"""
    connection = None
    try:
        logger.info("Starting weather data ETL process")
        data = fetch_data()
        logger.info("Fetched data from API")
        connection = connect_to_db()
        logger.info("Connected to the database")
        create_schema_table(connection)
        insert_records(connection, data)
    except Exception as e:
        logger.error(f"An error occurred during execution: {e}", exc_info=True)
    finally:
        if connection:
            connection.close()
            logger.info("Database connection closed")

if __name__ == "__main__":
    main()

Create .env file for API key:

# utilities/.env
WEATHERSTACK_API_KEY=your_api_key_here

Create utilities folder:

mkdir -p ~/repos/dbt_postgres_airflow/airflow/utilities

2. Create Airflow DAGs

API Orchestrator DAG

# dags/api_orchestrator.py
from datetime import datetime
import os
import sys
import logging
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator

# Add utilities folder to Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'utilities')))
from helper_functions import main

logger = logging.getLogger(__name__)

with DAG(
    dag_id="data_fetch_daily",
    start_date=pendulum.today(),
    schedule_interval="@daily",
    catchup=False
) as dag:
    
    fetch_weather_task = PythonOperator(
        task_id="ingest_weather_data",
        python_callable=main
    )

DBT Orchestrator DAG

# dags/dbt_orchestrator.py
import os
import json
from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator

# Set paths
home_dir = os.environ.get('HOME')
dbt_project_dir = os.path.join(home_dir, 'repos', 'dbt_postgres_airflow', 'dbt', 'my_project')
manifest_path = os.path.join(dbt_project_dir, 'target', 'manifest.json')

# Load the manifest file
with open(manifest_path) as f:
    manifest = json.load(f)

# Define DAG
with DAG(
    dag_id="dbt_orchestrator",
    start_date=pendulum.today(),
    schedule_interval="@hourly",
    catchup=False
) as dag:
    
    # Loop through nodes in the manifest file
    for node_name, node in manifest['nodes'].items():
        # Only include models, not sources
        if node.get('resource_type') == 'model':
            task_id = f"dbt_run_{node.get('name')}"
            bash_command = f"cd {dbt_project_dir} && dbt run --select {node.get('name')}"
            
            # Create task
            task = BashOperator(
                task_id=task_id,
                bash_command=bash_command
            )
            
            # Set up dependencies based on the refs in the model
            for upstream_node in node.get('depends_on', {}).get('nodes', []):
                # Only create dependencies to other models (not sources)
                if upstream_node in manifest['nodes'] and manifest['nodes'][upstream_node].get('resource_type') == 'model':
                    upstream_task_id = f"dbt_run_{manifest['nodes'][upstream_node].get('name')}"
                    # Use the existing task
                    dag.get_task(upstream_task_id) >> task

Querying and Validating

1. Query PostgreSQL Database

# Connect to PostgreSQL
docker compose exec dw psql -U dw_user -d dw

# List schemas
\dn

# List tables in dev schema
\dt dev.*

# Query weather data
SELECT * FROM dev.weather_report;

# Exit PostgreSQL
\q

2. Validate DBT Models

# Run DBT models
cd ~/repos/dbt_postgres_airflow/dbt/my_project
dbt run

# Check DBT documentation
dbt docs generate
dbt docs serve

3. Monitor Airflow DAGs

  1. Access the Airflow UI at http://localhost:8080
  2. Default credentials are displayed in the terminal when starting Airflow
  3. Monitor the execution of both DAGs:
    • data_fetch_daily: Runs once per day to fetch weather data
    • dbt_orchestrator: Runs hourly to process the models

Conclusion

You have successfully set up a complete data pipeline that:

  1. Fetches data from an external API
  2. Stores it in PostgreSQL
  3. Transforms it using DBT models
  4. Orchestrates the entire pipeline with Airflow

This foundation can be extended to support more complex ETL processes, additional data sources, and sophisticated transformations.

Questions for Further Clarification

  1. What specific version of PostgreSQL is being used? The documentation uses "latest," but a specific version might be preferable for production.
  2. What is the purpose of having both "delete_insert" and "merge_insert" models? The documentation mentions them both but only shows code for one.
  3. What are the specific requirements for the WeatherStack API account? (Free tier limitations, API rate limits, etc.)
  4. Are there any specific settings needed for running this in a production environment rather than a development setup?
  5. What types of data validation or quality checks should be implemented alongside this pipeline?
  6. How should the pipeline handle errors such as API failures or database connectivity issues?
  7. What monitoring solutions would be recommended for long-term operation of this pipeline?

Read more