Manually testing a
data pipeline
E T L A N D E LT I N P Y T H O N
Jake Roach
Data Engineer
Testing data pipelines
Data pipelines should be thoroughly tested Tools and techniques to test data pipelines
Validate that data is extracted, End-to-end testing
transformed, and loaded as expected
Validating data at "checkpoints"
Unit testing
Validating pipelines' limits maintenance
efforts after deployment
Identify and fix data quality issues
Improves data reliability
ETL AND ELT IN PYTHON
Testing and production environments
ETL AND ELT IN PYTHON
Testing a pipeline end-to-end
End-to-end testing
Confirm that pipeline runs on repeated
attempts
Validate data at pipeline checkpoints
Engage in peer review, incorporate
feedback
Ensure consumer access and satisfaction
with solution
ETL AND ELT IN PYTHON
Validating pipeline checkpoints
# Extract, transform, and load data as part of a pipeline
...
# Take a look at the data made available in a Postgres database
loaded_data = pd.read_sql("SELECT * FROM clean_stock_data", con=db_engine)
print(loaded_data.shape)
(6438, 4)
print(loaded_data.head())
timestamps volume open close
1997-05-15 13:30:00 1443120000 0.121875 0.097917
1997-05-16 13:30:00 294000000 0.098438 0.086458
1997-05-19 13:30:00 122136000 0.088021 0.085417
ETL AND ELT IN PYTHON
Validating DataFrames
# Extract, transform, and load data, as part of a pipeline
...
# Take a look at the data made available in a Postgres database
loaded_data = pd.read_sql("SELECT * FROM clean_stock_data", con=db_engine)
# Compare the two DataFrames.
print(clean_stock_data.equals(loaded_data))
True
ETL AND ELT IN PYTHON
Let's practice!
E T L A N D E LT I N P Y T H O N
Unit-testing a data
pipeline
E T L A N D E LT I N P Y T H O N
Jake Roach
Data Engineer
Validating a data pipeline with unit tests
Unit tests:
Commonly used in software engineering workflows
Ensure code works as expected
Help to validate data
ETL AND ELT IN PYTHON
pytest for unit testing
from pipeline import extract, transform, load
# Build a unit test, asserting the type of clean_stock_data
def test_transformed_data():
raw_stock_data = extract("raw_stock_data.csv")
clean_stock_data = transform(raw_data)
assert isinstance(clean_stock_data, pd.DataFrame)
> python -m pytest
test_transformed_data . [100%]
================================ 1 passed in 1.17s ===============================
ETL AND ELT IN PYTHON
assert and isinstance
pipeline_type = "ETL"
# Check if pipeline_type is an instance of a str
isinstance(pipeline_type, str)
True
# Assert that the pipeline does indeed take value "ETL"
assert pipeline_type == "ETL"
# Combine assert and isinstance
assert isinstance(pipeline_type, str)
ETL AND ELT IN PYTHON
AssertionError
pipeline_type = "ETL"
# Create an AssertionError
assert isinstance(pipeline_type, float)
Traceback (most recent call last):
File "<stdin>", line 4, in <module>
AssertionError
ETL AND ELT IN PYTHON
Mocking data pipeline components with fixtures
import pytest
@pytest.fixture()
def clean_data():
raw_stock_data = extract("raw_stock_data.csv")
clean_stock_data = transform(raw_data)
return clean_stock_data
def test_transformed_data(clean_data):
assert isinstance(clean_data, pd.DataFrame)
ETL AND ELT IN PYTHON
Unit testing DataFrames
def test_transformed_data(clean_data):
# Include other assert statements here
...
# Check number of columns
assert len(clean_data.columns) == 4
# Check the lower bound of a column
assert clean_data["open"].min() >= 0
# Check the range of a column by chaining statements with "and"
assert clean_data["open"].min() >= 0 and clean_data["open"].max() <= 1000
ETL AND ELT IN PYTHON
Let's practice!
E T L A N D E LT I N P Y T H O N
Running a data
pipeline in
production
E T L A N D E LT I N P Y T H O N
Jake Roach
Data Engineer
Data pipeline architecture patterns
# Define ETL function # Import extract, transform, and load functions
... from pipeline_utils import extract, transform, load
def load(clean_data):
... # Run the data pipeline
raw_stock_data = extract("raw_stock_data.csv")
# Run the data pipeline clean_stock_data = transform(raw_stock_data)
raw_stock_data = extract("raw_stock_data.csv") load(clean_stock_data)
clean_stock_data = transform(raw_stock_data)
load(clean_stock_data)
> ls
etl_pipeline.py
> ls pipeline_utils.py
etl_pipeline.py
ETL AND ELT IN PYTHON
Running a data pipeline end-to-end
import logging
from pipeline_utils import extract, transform, load
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG)
try:
# Extract, transform, and load data
raw_stock_data = extract("raw_stock_data.csv")
clean_stock_data = transform(raw_stock_data)
load(clean_stock_data)
logging.info("Successfully extracted, transformed and loaded data.") # Log success message
# Handle exceptions, log messages
except Exception as e:
logging.error(f"Pipeline failed with error: {e}")
ETL AND ELT IN PYTHON
Orchestrating data pipelines in production
1https://open.substack.com/pub/seattledataguy/p/the-state-of-data-engineering-part?
r=1po78c&utm_campaign=post&utm_medium=web
ETL AND ELT IN PYTHON
Let's practice!
E T L A N D E LT I N P Y T H O N
Congratulations!
E T L A N D E LT I N P Y T H O N
Jake Roach
Data Engineer
Designing and building data pipelines
Designing sound data pipelines
Extract, transform, and load architecture
Exception handling and logging
ETL AND ELT IN PYTHON
Advanced ETL techniques
Handling nested JSON {
"863703000": {
Advanced transformation logic "volume": 1443120000,
Persisting data to SQL databases "price": {
"close": 0.09791,
"open": 0.12187
}
},
...
}
ETL AND ELT IN PYTHON
Deploying and maintaining data pipelines
Validate and test data pipelines
Running a pipeline in a production setting
Orchestration tools
ETL AND ELT IN PYTHON
Next steps
Introduction to Airflow in Python Course Apache Airflow
Data Engineer Career Track Astronomer
Associate Data Engineer Certification Snowflake
ETL AND ELT IN PYTHON
Thank you!
E T L A N D E LT I N P Y T H O N