[go: up one dir, main page]

0% found this document useful (0 votes)
28 views26 pages

Chapter 4

Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
28 views26 pages

Chapter 4

Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 26

Manually testing a

data pipeline
E T L A N D E LT I N P Y T H O N

Jake Roach
Data Engineer
Testing data pipelines
Data pipelines should be thoroughly tested Tools and techniques to test data pipelines

Validate that data is extracted, End-to-end testing


transformed, and loaded as expected
Validating data at "checkpoints"
Unit testing

Validating pipelines' limits maintenance


efforts after deployment

Identify and fix data quality issues

Improves data reliability

ETL AND ELT IN PYTHON


Testing and production environments

ETL AND ELT IN PYTHON


Testing a pipeline end-to-end
End-to-end testing

Confirm that pipeline runs on repeated


attempts

Validate data at pipeline checkpoints

Engage in peer review, incorporate


feedback

Ensure consumer access and satisfaction


with solution

ETL AND ELT IN PYTHON


Validating pipeline checkpoints
# Extract, transform, and load data as part of a pipeline
...

# Take a look at the data made available in a Postgres database


loaded_data = pd.read_sql("SELECT * FROM clean_stock_data", con=db_engine)
print(loaded_data.shape)

(6438, 4)

print(loaded_data.head())

timestamps volume open close


1997-05-15 13:30:00 1443120000 0.121875 0.097917
1997-05-16 13:30:00 294000000 0.098438 0.086458
1997-05-19 13:30:00 122136000 0.088021 0.085417

ETL AND ELT IN PYTHON


Validating DataFrames
# Extract, transform, and load data, as part of a pipeline
...

# Take a look at the data made available in a Postgres database


loaded_data = pd.read_sql("SELECT * FROM clean_stock_data", con=db_engine)

# Compare the two DataFrames.


print(clean_stock_data.equals(loaded_data))

True

ETL AND ELT IN PYTHON


Let's practice!
E T L A N D E LT I N P Y T H O N
Unit-testing a data
pipeline
E T L A N D E LT I N P Y T H O N

Jake Roach
Data Engineer
Validating a data pipeline with unit tests
Unit tests:

Commonly used in software engineering workflows

Ensure code works as expected


Help to validate data

ETL AND ELT IN PYTHON


pytest for unit testing
from pipeline import extract, transform, load

# Build a unit test, asserting the type of clean_stock_data


def test_transformed_data():
raw_stock_data = extract("raw_stock_data.csv")
clean_stock_data = transform(raw_data)
assert isinstance(clean_stock_data, pd.DataFrame)

> python -m pytest

test_transformed_data . [100%]
================================ 1 passed in 1.17s ===============================

ETL AND ELT IN PYTHON


assert and isinstance
pipeline_type = "ETL"

# Check if pipeline_type is an instance of a str


isinstance(pipeline_type, str)

True

# Assert that the pipeline does indeed take value "ETL"


assert pipeline_type == "ETL"

# Combine assert and isinstance


assert isinstance(pipeline_type, str)

ETL AND ELT IN PYTHON


AssertionError
pipeline_type = "ETL"

# Create an AssertionError
assert isinstance(pipeline_type, float)

Traceback (most recent call last):


File "<stdin>", line 4, in <module>
AssertionError

ETL AND ELT IN PYTHON


Mocking data pipeline components with fixtures
import pytest

@pytest.fixture()
def clean_data():
raw_stock_data = extract("raw_stock_data.csv")
clean_stock_data = transform(raw_data)
return clean_stock_data

def test_transformed_data(clean_data):
assert isinstance(clean_data, pd.DataFrame)

ETL AND ELT IN PYTHON


Unit testing DataFrames
def test_transformed_data(clean_data):
# Include other assert statements here
...

# Check number of columns


assert len(clean_data.columns) == 4

# Check the lower bound of a column


assert clean_data["open"].min() >= 0

# Check the range of a column by chaining statements with "and"


assert clean_data["open"].min() >= 0 and clean_data["open"].max() <= 1000

ETL AND ELT IN PYTHON


Let's practice!
E T L A N D E LT I N P Y T H O N
Running a data
pipeline in
production
E T L A N D E LT I N P Y T H O N

Jake Roach
Data Engineer
Data pipeline architecture patterns
# Define ETL function # Import extract, transform, and load functions
... from pipeline_utils import extract, transform, load
def load(clean_data):
... # Run the data pipeline
raw_stock_data = extract("raw_stock_data.csv")
# Run the data pipeline clean_stock_data = transform(raw_stock_data)
raw_stock_data = extract("raw_stock_data.csv") load(clean_stock_data)
clean_stock_data = transform(raw_stock_data)
load(clean_stock_data)

> ls
etl_pipeline.py
> ls pipeline_utils.py
etl_pipeline.py

ETL AND ELT IN PYTHON


Running a data pipeline end-to-end
import logging
from pipeline_utils import extract, transform, load

logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG)

try:
# Extract, transform, and load data
raw_stock_data = extract("raw_stock_data.csv")
clean_stock_data = transform(raw_stock_data)
load(clean_stock_data)

logging.info("Successfully extracted, transformed and loaded data.") # Log success message

# Handle exceptions, log messages


except Exception as e:
logging.error(f"Pipeline failed with error: {e}")

ETL AND ELT IN PYTHON


Orchestrating data pipelines in production

1https://open.substack.com/pub/seattledataguy/p/the-state-of-data-engineering-part?
r=1po78c&utm_campaign=post&utm_medium=web

ETL AND ELT IN PYTHON


Let's practice!
E T L A N D E LT I N P Y T H O N
Congratulations!
E T L A N D E LT I N P Y T H O N

Jake Roach
Data Engineer
Designing and building data pipelines

Designing sound data pipelines

Extract, transform, and load architecture


Exception handling and logging

ETL AND ELT IN PYTHON


Advanced ETL techniques

Handling nested JSON {


"863703000": {
Advanced transformation logic "volume": 1443120000,

Persisting data to SQL databases "price": {


"close": 0.09791,
"open": 0.12187
}
},
...
}

ETL AND ELT IN PYTHON


Deploying and maintaining data pipelines

Validate and test data pipelines

Running a pipeline in a production setting

Orchestration tools

ETL AND ELT IN PYTHON


Next steps

Introduction to Airflow in Python Course Apache Airflow

Data Engineer Career Track Astronomer

Associate Data Engineer Certification Snowflake

ETL AND ELT IN PYTHON


Thank you!
E T L A N D E LT I N P Y T H O N

You might also like