DataFrame column
operations
C L E A N I N G D ATA W I T H P Y S PA R K
Mike Metzger
Data Engineering Consultant
DataFrame refresher
DataFrames:
Made up of rows & columns
Immutable
Use various transformation operations to modify data
# Return rows where name starts with "M"
voter_df.filter(voter_df.name.like('M%'))
# Return name and position only
voters = voter_df.select('name', 'position')
CLEANING DATA WITH PYSPARK
Common DataFrame transformations
Filter / Where
voter_df.filter(voter_df.date > '1/1/2019') # or voter_df.where(...)
Select
voter_df.select(voter_df.name)
withColumn
voter_df.withColumn('year', voter_df.date.year)
drop
voter_df.drop('unused_column')
CLEANING DATA WITH PYSPARK
Filtering data
Remove nulls
Remove odd entries
Split data from combined sources
Negate with ~
voter_df.filter(voter_df['name'].isNotNull())
voter_df.filter(voter_df.date.year > 1800)
voter_df.where(voter_df['_c0'].contains('VOTE'))
voter_df.where(~ voter_df._c1.isNull())
CLEANING DATA WITH PYSPARK
Column string transformations
Contained in pyspark.sql.functions
import pyspark.sql.functions as F
Applied per column as transformation
voter_df.withColumn('upper', F.upper('name'))
Can create intermediary columns
voter_df.withColumn('splits', F.split('name', ' '))
Can cast to other types
voter_df.withColumn('year', voter_df['_c4'].cast(IntegerType()))
CLEANING DATA WITH PYSPARK
ArrayType() column functions
Various utility functions / transformations to interact with ArrayType()
.size(<column>) - returns length of arrayType() column
.getItem(<index>) - used to retrieve a speci c item at index of list column.
CLEANING DATA WITH PYSPARK
Let's practice!
C L E A N I N G D ATA W I T H P Y S PA R K
Conditional
DataFrame column
operations
C L E A N I N G D ATA W I T H P Y S PA R K
Mike Metzger
Data Engineering Consultant
Conditional clauses
Conditional Clauses are:
Inline version of if / then / else
.when()
.otherwise()
CLEANING DATA WITH PYSPARK
Conditional example
.when(<if condition>, <then x>)
df.select(df.Name, df.Age, F.when(df.Age >= 18, "Adult"))
Name Age
Alice 14
Bob 18 Adult
Candice 38 Adult
CLEANING DATA WITH PYSPARK
Another example
Multiple .when()
df.select(df.Name, df.Age,
.when(df.Age >= 18, "Adult")
.when(df.Age < 18, "Minor"))
Name Age
Alice 14 Minor
Bob 18 Adult
Candice 38 Adult
CLEANING DATA WITH PYSPARK
Otherwise
.otherwise() is like else
df.select(df.Name, df.Age,
.when(df.Age >= 18, "Adult")
.otherwise("Minor"))
Name Age
Alice 14 Minor
Bob 18 Adult
Candice 38 Adult
CLEANING DATA WITH PYSPARK
Let's practice!
C L E A N I N G D ATA W I T H P Y S PA R K
User de ned
functions
C L E A N I N G D ATA W I T H P Y S PA R K
Mike Metzger
Data Engineering Consultant
De ned...
User de ned functions or UDFs
Python method
Wrapped via the pyspark.sql.functions.udf method
Stored as a variable
Called like a normal Spark function
CLEANING DATA WITH PYSPARK
Reverse string UDF
De ne a Python method
def reverseString(mystr):
return mystr[::-1]
Wrap the function and store as a variable
udfReverseString = udf(reverseString, StringType())
Use with Spark
user_df = user_df.withColumn('ReverseName',
udfReverseString(user_df.Name))
CLEANING DATA WITH PYSPARK
Argument-less example
def sortingCap():
return random.choice(['G', 'H', 'R', 'S'])
udfSortingCap = udf(sortingCap, StringType())
user_df = user_df.withColumn('Class', udfSortingCap())
Name Age Class
Alice 14 H
Bob 18 S
Candice 63 G
CLEANING DATA WITH PYSPARK
Let's practice!
C L E A N I N G D ATA W I T H P Y S PA R K
Partitioning and lazy
processing
C L E A N I N G D ATA W I T H P Y S PA R K
Mike Metzger
Data Engineering Consultant
Partitioning
DataFrames are broken up into partitions
Partition size can vary
Each partition is handled independently
CLEANING DATA WITH PYSPARK
Lazy processing
Transformations are lazy
.withColumn(...)
.select(...)
Nothing is actually done until an action is performed
.count()
.write(...)
Transformations can be re-ordered for best performance
Sometimes causes unexpected behavior
CLEANING DATA WITH PYSPARK
Adding IDs
Normal ID elds:
Common in relational databases
Most usually an integer increasing, sequential and unique
Not very parallel
id last name rst name state
0 Smith John TX
1 Wilson A. IL
2 Adams Wendy OR
CLEANING DATA WITH PYSPARK
Monotonically increasing IDs
pyspark.sql.functions.monotonically_increasing_id()
Integer (64-bit), increases in value, unique
Not necessarily sequential (gaps exist)
Completely parallel
id last name rst name state
0 Smith John TX
134520871 Wilson A. IL
675824594 Adams Wendy OR
CLEANING DATA WITH PYSPARK
Notes
Remember, Spark is lazy!
Occasionally out of order
If performing a join, ID may be assigned after the join
Test your transformations
CLEANING DATA WITH PYSPARK
Let's practice!
C L E A N I N G D ATA W I T H P Y S PA R K