Introduction to Data
Pipelines
C L E A N I N G D ATA W I T H P Y S PA R K
Mike Metzger
Data Engineering Consultant
What is a data pipeline?
A set of steps to process data from source(s) to nal output
Can consist of any number of steps or components
Can span many systems
We will focus on data pipelines within Spark
CLEANING DATA WITH PYSPARK
What does a data pipeline look like?
Input(s)
CSV, JSON, web services, databases
Transformations
withColumn() , .filter() , .drop()
Output(s)
CSV, Parquet, database
Validation
Analysis
CLEANING DATA WITH PYSPARK
Pipeline details
Not formally de ned in Spark
Typically all normal Spark code required for task
schema = StructType([
StructField('name', StringType(), False),
StructField('age', StringType(), False)
])
df = spark.read.format('csv').load('datafile').schema(schema)
df = df.withColumn('id', monotonically_increasing_id())
...
df.write.parquet('outdata.parquet')
df.write.json('outdata.json')
CLEANING DATA WITH PYSPARK
Let's Practice!
C L E A N I N G D ATA W I T H P Y S PA R K
Data handling
techniques
C L E A N I N G D ATA W I T H P Y S PA R K
Mike Metzger
Data Engineering Consultant
What are we trying to parse?
Incorrect data width, height, image
Empty rows
Commented lines # This is a comment
Headers
200 300 affenpinscher;0
Nested structures
Multiple delimiters
600 450 Collie;307 Collie;101
Non-regular data 600 449 Japanese_spaniel;23
Differing numbers of columns per row
Focused on CSV data
CLEANING DATA WITH PYSPARK
Stanford ImageNet annotations
Identi es dog breeds in images
Provides list of all identi ed dogs in image
Other metadata (base folder, image size, etc.)
Example rows:
02111277 n02111277_3206 500 375 Newfoundland,110,73,416,298
02108422 n02108422_4375 500 375 bull_mastiff,101,90,214,356 \
bull_mastiff,282,74,416,370
CLEANING DATA WITH PYSPARK
Removing blank lines, headers, and comments
Spark's CSV parser:
Automatically removes blank lines
Can remove comments using an optional argument
df1 = spark.read.csv('datafile.csv.gz', comment='#')
Handles header elds
De ned via argument
Ignored if a schema is de ned
df1 = spark.read.csv('datafile.csv.gz', header='True')
CLEANING DATA WITH PYSPARK
Automatic column creation
Spark will:
Automatically create columns in a DataFrame based on sep argument
df1 = spark.read.csv('datafile.csv.gz', sep=',')
Defaults to using ,
Can still successfully parse if sep is not in string
df1 = spark.read.csv('datafile.csv.gz', sep='*')
Stores data in column defaulting to _c0
Allows you to properly handle nested separators
CLEANING DATA WITH PYSPARK
Let's practice!
C L E A N I N G D ATA W I T H P Y S PA R K
Data validation
C L E A N I N G D ATA W I T H P Y S PA R K
Mike Metzger
Data Engineering Consultant
De nition
Validation is:
Verifying that a dataset complies with the expected format
Number of rows / columns
Data types
Complex validation rules
CLEANING DATA WITH PYSPARK
Validating via joins
Compares data against known values
Easy to nd data in a given set
Comparatively fast
parsed_df = spark.read.parquet('parsed_data.parquet')
company_df = spark.read.parquet('companies.parquet')
verified_df = parsed_df.join(company_df, parsed_df.company == company_df.company)
This automatically removes any rows with a company not in the valid_df !
CLEANING DATA WITH PYSPARK
Complex rule validation
Using Spark components to validate logic:
Calculations
Verifying against external source
Likely uses a UDF to modify / verify the DataFrame
CLEANING DATA WITH PYSPARK
Let's practice!
C L E A N I N G D ATA W I T H P Y S PA R K
Final analysis and
delivery
C L E A N I N G D ATA W I T H P Y S PA R K
Mike Metzger
Data Engineering Consultant
Analysis calculations (UDF)
Calculations using UDF
def getAvgSale(saleslist):
totalsales = 0
count = 0
for sale in saleslist:
totalsales += sale[2] + sale[3]
count += 2
return totalsales / count
udfGetAvgSale = udf(getAvgSale, DoubleType())
df = df.withColumn('avg_sale', udfGetAvgSale(df.sales_list))
CLEANING DATA WITH PYSPARK
Analysis calculations (inline)
Inline calculations
df = df.read.csv('datafile')
df = df.withColumn('avg', (df.total_sales / df.sales_count))
df = df.withColumn('sq_ft', df.width * df.length)
df = df.withColumn('total_avg_size', udfComputeTotal(df.entries) / df.numEntries)
CLEANING DATA WITH PYSPARK
Let's practice!
C L E A N I N G D ATA W I T H P Y S PA R K
Congratulations and
next steps
C L E A N I N G D ATA W I T H P Y S PA R K
Mike Metzger
Data Engineering Consultant
Next Steps
Review Spark documentation
Try working with data on actual clusters
Work with various datasets
CLEANING DATA WITH PYSPARK
Thank you!
C L E A N I N G D ATA W I T H P Y S PA R K