An end-to-end data engineering pipeline that consolidates sales data from three sources — a transactional PostgreSQL database, a logistics CSV file drop, and a marketing REST API — into a single analytics-ready data warehouse built on a star schema.
- Architecture
- Tech Stack
- Project Structure
- Prerequisites
- Setup Instructions
- Data Sources
- Warehouse Schema
- Data Quality Checks
- Airflow DAG
- Adding a New Data Source
- Known Limitations
- Troubleshooting
- Key Gotchas
- What I Learned
- Version History
- Contributors
- Licence
┌──────────────────────────────────────────────────────┐
│ SOURCES │
│ PostgreSQL DB CSV (SFTP) Marketing REST API │
└────────┬─────────────────┬──────────────┬────────────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────────┐
│ EXTRACT (Python scripts) │
│ extract_postgres.py │
│ extract_logistics_csv.py │
│ extract_marketing_api.py │
└──────────────────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ raw schema │
│ Unmodified copies of all source data │
└──────────────────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ TRANSFORM (dbt staging models) │
│ Clean, cast types, deduplicate, normalise │
└──────────────────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ staging schema │
│ stg_orders stg_customers stg_products │
│ stg_order_items stg_shipments stg_campaigns │
└──────────────────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ TRANSFORM (dbt marts models) │
│ Build star schema — facts + dimensions │
└──────────────────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ marts schema │
│ fact_sales dim_customers dim_products dim_date │
└──────────────────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ DATA QUALITY CHECKS (Python) │
│ 5 automated checks → raw.pipeline_audit │
└──────────────────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ ORCHESTRATION (Apache Airflow) │
│ Scheduled daily at 06:30 UTC │
│ Parallel extraction → dbt → checks → notify │
└──────────────────────────────────────────────────────┘
| Component | Tool | Version |
|---|---|---|
| Warehouse | PostgreSQL | 15 |
| Extraction | Python | 3.10+ |
| Transformation | dbt-core + dbt-postgres | 1.11 |
| Orchestration | Apache Airflow | 2.7 |
| Containerisation | Docker + Docker Compose | Latest |
de_project/
├── airflow/
│ ├── dags/
│ │ └── pipeline_dag.py # Airflow DAG definition
│ ├── dbt/
│ │ └── profiles.yml # dbt connection profile for Docker
│ └── requirements.txt # Python packages for Airflow container
├── data/
│ └── csv/ # Logistics CSV file drop location
├── dbt_project/
│ ├── models/
│ │ ├── staging/ # Cleaned, typed views per source
│ │ └── marts/ # Star schema: facts + dimensions
│ └── macros/
│ └── generate_schema_name.sql # Custom schema naming macro
├── extract/
│ ├── extract_postgres.py # Pulls from transactional source DB
│ ├── extract_logistics_csv.py # Reads logistics CSV files
│ └── extract_marketing_api.py # Calls marketing REST API
├── seed/
│ └── seed_postgres.py # Populates source DB with sample data
├── data_quality_checks.py # Automated pipeline health checks
├── setup_warehouse.py # Creates warehouse schemas
├── health_check.py # Verifies all connections
├── Dockerfile # Custom Airflow image with dependencies
├── docker-compose.yml # All services: DBs + Airflow
├── .env.example # Template for environment variables
├── DATA_DICTIONARY.md # Column-level documentation
├── LINEAGE.md # Data lineage diagram
└── ADR.md # Architecture decision records
- Docker Desktop (running, with at least 4GB RAM allocated)
- Python 3.10+
- Git
git clone https://github.com/thenaijacarguy/de_project.git
cd de_projectcp .env.example .env
# Open .env and update values if needed
# The defaults work out of the box for local developmentdocker-compose up -dThis starts:
| Container | Purpose | Port |
|---|---|---|
| source_db | Transactional source database | 5435 |
| warehouse_db | Data warehouse | 5434 |
| airflow_db | Airflow metadata database | 5432 |
| airflow-scheduler | DAG scheduler | — |
| airflow-webserver | Airflow UI | 8080 |
First run takes 3–5 minutes while Docker builds the custom Airflow image.
python seed/seed_postgres.pypython setup_warehouse.pypython health_check.pyYou should see two green checkmarks. If not, check that Docker is running and all containers are healthy with docker ps.
# Extract all three sources
python extract/extract_postgres.py
python extract/extract_logistics_csv.py
python extract/extract_marketing_api.py
# Transform with dbt
cd dbt_project && dbt run && dbt test && cd ..
# Run quality checks
python data_quality_checks.pyOpen http://localhost:8080 and log in with admin / admin.
Find the sales_pipeline DAG, enable the toggle, then click the play button to trigger a manual run. Watch each task turn green in the Graph view.
| Source | Type | Raw Tables | Frequency |
|---|---|---|---|
| Transactional DB | PostgreSQL | orders, order_items, customers, products | Daily |
| Logistics Provider | CSV (SFTP) | shipments | Daily at 06:00 UTC |
| Marketing Platform | REST API | campaigns | Daily |
Unmodified copies of source data. All columns stored as TEXT. Never queried directly by analysts. Overwritten on each pipeline run (full refresh) or appended idempotently (CSV and API sources).
Cleaned, typed, deduplicated views built by dbt. One view per raw table. No joins across sources. Materialised as views so they always reflect the latest raw data without storing a copy.
Star schema tables materialised as physical tables for fast query performance.
| Table | Type | Description |
|---|---|---|
| fact_sales | Fact | One row per order line item — the central table |
| dim_customers | Dimension | Customer name, email, region, days since signup |
| dim_products | Dimension | Product name, category, cost price |
| dim_date | Dimension | Calendar attributes for every day from 2023–2025 |
Five automated checks run after every pipeline execution. Results are logged to raw.pipeline_audit with timestamps so you can track pipeline health over time.
| Check | What It Catches |
|---|---|
| row_count_validation | fact_sales has fewer rows than expected |
| null_rate_{column} | Critical columns contain NULL values |
| referential_integrity_customers | Orphaned customer_id foreign keys in fact_sales |
| referential_integrity_products | Orphaned product_id foreign keys in fact_sales |
| data_freshness | Most recent order date is unexpectedly old |
| revenue_sanity | Line revenue outliers more than 10x the average |
To view the audit log:
SELECT check_name, status, details, checked_at
FROM raw.pipeline_audit
ORDER BY checked_at DESC;The sales_pipeline DAG runs daily at 06:30 UTC with the following task graph:
extract_postgres ──┐
extract_csv ───┼──► dbt_run ──► dbt_test ──► quality_checks ──► branch ──► notify_success
extract_api ──┘ └──► notify_failure
Key settings:
- Schedule:
30 6 * * *(06:30 UTC daily) - Retries: 3 attempts per task, 5 minute delay between retries
- Parallelism: The three extraction tasks run simultaneously
- Branching: The final notification task routes to success or failure based on quality check results
- Write an extraction script in
extract/following the same pattern as existing scripts — load credentials from.env, write raw data to the warehouse, log row counts, handle errors - Create a staging model in
dbt_project/models/staging/ - Add column descriptions and tests to
schema.yml - Join the new staging model into
fact_sales.sqlif relevant - Add a new task to the DAG in
airflow/dags/pipeline_dag.py - Add the new package to
Dockerfileif needed, then rebuild withdocker-compose build --no-cache
- Seed data is frozen in 2024. The sample data generated by
seed_postgres.pyhas order dates up to December 2024. The data freshness quality check threshold has been relaxed to 500 days to accommodate this. In a production pipeline with live data the threshold would be 1–2 days. - Full refresh only. The extraction scripts reload all data on every run. For tables with millions of rows this would be too slow — incremental loading with CDC (Change Data Capture) would be required at scale.
- Single-node Airflow. We use the LocalExecutor which runs tasks on a single machine. A production deployment would use the CeleryExecutor or KubernetesExecutor to distribute tasks across multiple workers.
- Mock marketing API. The marketing API extraction uses JSONPlaceholder as a mock endpoint. The response structure differs from a real marketing platform — the staging model would need updating to match a real API's schema.
- No SFTP. The logistics CSV extraction reads from a local folder rather than an actual SFTP server. In production you would use the
paramikolibrary to connect to the SFTP server and download files automatically.
Tasks stuck yellow in Airflow for more than 2 minutes
docker ps # check all containers are running
docker-compose logs airflow-scheduler | tail -30 # check scheduler logsConnection refused errors in extraction scripts
Scripts running on your Mac use localhost with external ports (5435, 5434). Scripts running inside Airflow containers use container names (source_db, warehouse_db) on port 5432. If you see connection errors inside Airflow, check the environment variable overrides in docker-compose.yml.
dbt writes to wrong schema (e.g. public_marts instead of marts)
Ensure dbt_project/macros/generate_schema_name.sql exists and that dbt_project.yml has +schema: staging and +schema: marts under the respective model groups.
pip install permission denied inside Airflow container
The Airflow image blocks runtime pip installs. Add the package to the Dockerfile RUN command and rebuild:
docker-compose down
docker-compose build --no-cache
docker-compose up -dGitHub push rejected
GitHub does not accept account passwords for Git operations. Use a Personal Access Token (PAT) with repo scope. Generate one at GitHub → Settings → Developer Settings → Personal Access Tokens.
These are things that caused real debugging time during this project and are worth knowing upfront:
- Docker networking vs localhost. Inside Docker, containers reach each other by container name on port 5432, not by localhost on your exposed ports. The same script needs different connection settings depending on whether it runs on your Mac or inside Airflow.
- dbt schema prefixing. By default dbt prefixes your custom schema names with the default schema, producing names like
public_martsinstead ofmarts. The fix is a customgenerate_schema_namemacro and explicit+schemaconfig indbt_project.yml. - Airflow pip restrictions. The official Airflow Docker image intentionally blocks pip at runtime. All Python dependencies must be baked into a custom Docker image at build time via a
Dockerfile. - dbt init creates a nested project. Running
dbt initinside an existing project folder creates a duplicate nested subfolder. Delete it withrm -rf dbt_project/de_project. - provide_context deprecation. In Airflow 2.0+
provide_context=Trueis no longer needed on PythonOperator but causes deprecation warnings if included. It is harmless but can be removed.
| Version | Date | Description |
|---|---|---|
| 1.0.0 | 2026-02-26 | Initial complete pipeline |
| Name | Role | GitHub |
|---|---|---|
| Gabriel James | Analytics Engineer | @thenaijacarguy |
This project is for educational and portfolio purposes. Feel free to fork it, adapt it, and use it as a reference for your own projects.