Data Pipeline Design for Carbon Tracking System
# Data Pipeline (Design Department)
**Goals:** reliable ingestion → clean, unified storage → fast analytics → ML insights →
dashboards & alerts.
## 1) Sources
- **IoT sensors (CO₂/CO₂e, temp, flow, runtime):** MQTT/HTTP → gateway.
- **Electricity (JEPCO):** API/CSV drops (hourly/daily).
- **Raw materials & transport:** operator entry (web app) + supplier CSV.
- **Maintenance logs:** CMMS or in-house form.
## 2) Transport & Ingestion
- **Protocol:** MQTT (sensors) → Kafka (topic fan-out) or MQTT → Telegraf for simplicity.
- **Batch loads:** JEPCO, suppliers via Airflow scheduled jobs.
- **Back-pressure & retries:** Kafka acks / DLQ topic; Airflow retries + alerting.
## 3) Processing (ELT)
- **Stream (near-real-time):** Kafka → Flink/Spark Structured Streaming (validate, enrich
with asset IDs, unit normalization).
- **Batch (daily/hourly):** dbt models or Spark jobs (joins, dimension lookups, CO₂e factor
application).
- **Data quality:** Great Expectations (range checks, nulls, schema drift).
## 4) Storage (Tiered)
- **Raw (immutable):** Object store (S3/minio) partitioned by source/date.
- **Operational store:**
- **Time-series:** InfluxDB/TimescaleDB for sensor telemetry.
- **Relational (OLTP):** Postgres for assets, sites, materials, users.
- **Analytics (OLAP):** DuckDB/BigQuery/Snowflake for dashboards & reports.
## 5) Models & Business Logic
- **Emission factors:** versioned tables by standard + validity date.
- **Core marts:** fact_emissions_hourly, fact_energy_use, dim_asset, dim_site, fact_materials.
- **KPIs:** tCO₂e by scope/site/line, intensity per unit output, uptime, MTBF, energy per
unit.
## 6) ML/AI
- **Predictive maintenance:** anomaly detection (Z-score/Prophet) + failure classification
(XGBoost).
- **Forecasting:** energy/emissions forecasts (Prophet/ARIMA).
- **Generative design hooks:** write-back constraints & results to design_variants.
## 7) Serving & Visualization
- **Dashboards:** Grafana/Metabase/Power BI over OLAP + Timescale.
- **APIs:** FastAPI layer for apps (read KPIs, write operator inputs).
- **Alerts:** Grafana/Alertmanager → Email/Slack when thresholds or anomalies trigger.
## 8) Orchestration & Ops
- **Scheduler:** Airflow for batch; Git-versioned DAGs.
- **CI/CD:** GitHub Actions → staging → prod with data smoke tests.
- **Monitoring:** Prometheus + Grafana for pipeline health; logs in Loki/ELK.
## 9) Security & Governance
- **AuthN/Z:** Keycloak/OAuth; RBAC (Design vs Manufacturing vs Exec).
- **PII:** likely minimal—still classify and mask where needed.
- **Lineage & catalog:** OpenMetadata/Amundsen; dataset SLAs documented.
- **Backups & retention:** Raw: 2–3 yrs; processed marts: 12–24 mo.
## 10) Schemas (examples)
**Telemetry (Timescale):**
telemetry(site_id, asset_id, sensor_type, ts_utc, value, unit, qflag)
**Emissions mart (OLAP):**
fact_emissions_hourly(site_id, asset_id, ts_utc, scope, activity_type,
energy_kwh, materials_kg, emission_factor_id, tco2e)
**Emission factor:**
emission_factor(id, name, source_std, region, activity, unit_in, unit_out,
factor, valid_from, valid_to, version)
## 11) Latency Targets
- **Live ops (alarms):** 5–30s end-to-end.
- **Dashboards:** near-real-time (≤1 min) for sensors; hourly/daily for JEPCO & materials.
- **Reports:** daily close + monthly regulatory exports.
## 12) Quick Diagram (text)
Sensors → MQTT → Kafka/Telegraf ─┐
├─ Stream proc (Flink) → Timescale (RT) → Grafana/Alerts
JEPCO API / CSV → Airflow ───────┤
Supplier CSV / App → API ────────┤
└─ Batch (dbt/Spark) → OLAP (DuckDB/BigQuery) → Dashboards/Reports
↘ Models (PM/Forecast) → Alerts & API
## 13) How this helps the Software Specialist
- **Clean contracts:** well-defined schemas + API layer mean fewer ad-hoc queries.
- **Faster features:** standardized marts (dbt) speed up new KPIs.
- **Fewer fires:** monitoring, DLQs, and data tests catch issues early.
- **Scalable by design:** streaming + batch tiers handle growth without rewrites.
## 14) Suggested Stack (balanced complexity)
- **Ingestion:** MQTT + Telegraf (simple) or + Kafka (scalable)
- **Processing:** dbt (+ DuckDB locally; Spark if big)
- **Storage:** TimescaleDB + Postgres + MinIO
- **Orchestration:** Airflow
- **Quality:** Great Expectations
- **Serving:** FastAPI + Grafana/Metabase