[go: up one dir, main page]

0% found this document useful (0 votes)
65 views51 pages

Caching in Spark

The document discusses caching in Spark SQL. It explains that caching keeps data in memory to improve performance for repeated operations. It describes how to cache and uncache DataFrames using df.cache() and df.unpersist(). It also covers caching tables using the Spark catalog and provides tips for effective caching.

Uploaded by

Tarun Singh
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
65 views51 pages

Caching in Spark

The document discusses caching in Spark SQL. It explains that caching keeps data in memory to improve performance for repeated operations. It describes how to cache and uncache DataFrames using df.cache() and df.unpersist(). It also covers caching tables using the Spark catalog and provides tips for effective caching.

Uploaded by

Tarun Singh
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 51

Caching

I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N

Mark Plutowski
Data Scientist
What is caching?
Keeping data in memory

Spark tends to unload memory aggressively

INTRODUCTION TO SPARK SQL IN PYTHON


Eviction Policy
Least Recently Used (LRU)

Eviction happens independently on each worker

Depends on memory available to each worker

INTRODUCTION TO SPARK SQL IN PYTHON


Caching a dataframe
TO CACHE A DATAFRAME:

df.cache()

TO UNCACHE IT:

df.unpersist()

INTRODUCTION TO SPARK SQL IN PYTHON


Determining whether a dataframe is cached
df.is_cached

False

df.cache()
df.is_cached

True

INTRODUCTION TO SPARK SQL IN PYTHON


Uncaching a dataframe
df.unpersist()
df.is_cached()

False

INTRODUCTION TO SPARK SQL IN PYTHON


Storage level
df.unpersist()
df.cache()
df.storageLevel

StorageLevel(True, True, False, True, 1)

In the storage level above the following hold:

1. useDisk = True

2. useMemory = True

3. useOffHeap = False

4. deserialized = True

5. replication = 1

INTRODUCTION TO SPARK SQL IN PYTHON


Persisting a dataframe
The following are equivalent in Spark 2.1+ :

df.persist()

df.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK)

df.cache() is the same as df.persist()

INTRODUCTION TO SPARK SQL IN PYTHON


Caching a table
df.createOrReplaceTempView('df')
spark.catalog.isCached(tableName='df')

False

spark.catalog.cacheTable('df')
spark.catalog.isCached(tableName='df')

True

INTRODUCTION TO SPARK SQL IN PYTHON


Uncaching a table
spark.catalog.uncacheTable('df')
spark.catalog.isCached(tableName='df')

False

spark.catalog.clearCache()

INTRODUCTION TO SPARK SQL IN PYTHON


Tips
Caching is lazy

Only cache if more than one operation is to be performed

Unpersist when you no longer need the object

Cache selectively

INTRODUCTION TO SPARK SQL IN PYTHON


Let's practice
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
The Spark UI
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N

Mark Plutowski
Data Scientist
Use the Spark UI inspect execution
Spark Task is a unit of execution that runs on a single cpu

Spark Stage a group of tasks that perform the same computation in parallel, each task typically
running on a different subset of the data

Spark Job is a computation triggered by an action, sliced into one or more stages.

INTRODUCTION TO SPARK SQL IN PYTHON


Finding the Spark UI
1. http://[DRIVER_HOST]:4040

2. http://[DRIVER_HOST]:4041

3. http://[DRIVER_HOST]:4042

4. http://[DRIVER_HOST]:4043
...

INTRODUCTION TO SPARK SQL IN PYTHON


INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
Spark catalog operations
spark.catalog.cacheTable('table1')

spark.catalog.uncacheTable('table1')

spark.catalog.isCached('table1')

spark.catalog.dropTempView('table1')

INTRODUCTION TO SPARK SQL IN PYTHON


Spark Catalog
spark.catalog.listTables()

[Table(name='text', database=None, description=None, tableType='TEMPORARY', isTemporary=

INTRODUCTION TO SPARK SQL IN PYTHON


INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
Spark UI Storage Tab
Shows where data partitions exist

in memory,

or on disk,

across the cluster,

at a snapshot in time.

INTRODUCTION TO SPARK SQL IN PYTHON


Spark UI SQL tab
query3agg = """
SELECT w1, w2, w3, COUNT(*) as count FROM (
SELECT
word AS w1,
LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3
FROM df
)
GROUP BY w1, w2, w3
ORDER BY count DESC
"""

spark.sql(query3agg).show()

INTRODUCTION TO SPARK SQL IN PYTHON


INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
Let's practice
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
Logging
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N

Mark Plutowski
Data Scientist
Logging primer
import logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Hello %s", "world")
logging.debug("Hello, take %d", 2)

2019-03-14 15:92:65,359 - INFO - Hello world

INTRODUCTION TO SPARK SQL IN PYTHON


Logging with DEBUG level
import logging
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Hello %s", "world")
logging.debug("Hello, take %d", 2)

2018-03-14 12:00:00,000 - INFO - Hello world


2018-03-14 12:00:00,001 - DEBUG - Hello, take 2

INTRODUCTION TO SPARK SQL IN PYTHON


Debugging lazy evaluation
lazy evaluation

distributed execution

INTRODUCTION TO SPARK SQL IN PYTHON


A simple timer
t = timer()
t.elapsed()

1. elapsed: 0.0 sec

t.elapsed() # Do something that takes 2 seconds

2. elapsed: 2.0 sec

t.reset() # Do something else that takes time: reset


t.elapsed()

3. elapsed: 0.0 sec

INTRODUCTION TO SPARK SQL IN PYTHON


class timer
class timer:
start_time = time.time()
step = 0

def elapsed(self, reset=True):


self.step += 1
print("%d. elapsed: %.1f sec %s"
% (self.step, time.time() - self.start_time))
if reset:
self.reset()

def reset(self):
self.start_time = time.time()

INTRODUCTION TO SPARK SQL IN PYTHON


Stealth CPU wastage
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')

# < create dataframe df here >

t = timer()
logging.info("No action here.")
t.elapsed()
logging.debug("df has %d rows.", df.count())
t.elapsed()

2018-12-23 22:24:20,472 - INFO - No action here.


1. elapsed: 0.0 sec
2. elapsed: 2.0 sec

INTRODUCTION TO SPARK SQL IN PYTHON


Disable actions
ENABLED = False

t = timer()
logger.info("No action here.")
t.elapsed()
if ENABLED:
logger.info("df has %d rows.", df.count())
t.elapsed()

2019-03-14 12:34:56,789 - Pyspark - INFO - No action here.


1. elapsed: 0.0 sec
2. elapsed: 0.0 sec

INTRODUCTION TO SPARK SQL IN PYTHON


Enabling actions
Rerunning the previous example with ENABLED = True triggers the action:

2019-03-14 12:34:56,789 - INFO - No action here.


1. elapsed: 0.0 sec
2019-03-14 12:34:58,789 - INFO - df has 1107014 rows.
2. elapsed: 2.0 sec

INTRODUCTION TO SPARK SQL IN PYTHON


Let's practice!
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
Query Plans
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N

Mark Plutowski
Data Scientist
Explain
EXPLAIN SELECT * FROM table1

INTRODUCTION TO SPARK SQL IN PYTHON


Load dataframe and register
df = sqlContext.read.load('/temp/df.parquet')

df.registerTempTable('df')

INTRODUCTION TO SPARK SQL IN PYTHON


Running an EXPLAIN query
spark.sql('EXPLAIN SELECT * FROM df').first()

Row(plan='== Physical Plan ==\n


*FileScan parquet [word#1928,id#1929L,title#1930,part#1931]
Batched: true,
Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>')

INTRODUCTION TO SPARK SQL IN PYTHON


Interpreting an EXPLAIN query
== Physical Plan ==

FileScan parquet [word#1928,id#1929L,title#1930,part#1931]

Batched: true,

Format: Parquet,

Location: InMemoryFileIndex[file:/temp/df.parquet],

PartitionFilters: [],

PushedFilters: [],

ReadSchema: struct<word:string,id:bigint,title:string,part:int>'

INTRODUCTION TO SPARK SQL IN PYTHON


df.explain()
df.explain()

== Physical Plan ==
FileScan parquet [word#963,id#964L,title#965,part#966]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>

spark.sql("SELECT * FROM df").explain()

== Physical Plan ==
FileScan parquet [word#712,id#713L,title#714,part#715]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>

INTRODUCTION TO SPARK SQL IN PYTHON


df.explain(), on cached dataframe
df.cache()
df.explain()

== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
+- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan parquet [word#0,id#1L,title#2,part#3]
Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>

spark.sql("SELECT * FROM df").explain()

== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
+- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan parquet [word#0,id#1L,title#2,part#3]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>

INTRODUCTION TO SPARK SQL IN PYTHON


Words sorted by frequency query
SELECT word, COUNT(*) AS count
FROM df
GROUP BY word
ORDER BY count DESC

Equivalent dot notation approach:

df.groupBy('word')
.count()
.sort(desc('count'))
.explain()

INTRODUCTION TO SPARK SQL IN PYTHON


Same query using dataframe dot notation
== Physical Plan ==
*Sort [count#1040L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1040L DESC NULLS LAST, 200)
+- *HashAggregate(keys=[word#963], functions=[count(1)])
+- Exchange hashpartitioning(word#963, 200)
+- *HashAggregate(keys=[word#963], functions=[partial_count(1)])
+- InMemoryTableScan [word#963]
+- InMemoryRelation [word#963, id#964L, title#965, part#966],
true,10000, StorageLevel(disk, memory, deserialized,
1 replicas)
+- *FileScan parquet [word#963,id#964L,title#965,part#966]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>

INTRODUCTION TO SPARK SQL IN PYTHON


Reading from bottom up
FileScan parquet

InMemoryRelation

InMemoryTableScan

`HashAggregate(keys=[word#963], ...)``

`HashAggregate(keys=[word#963], functions=[count(1)])``

`Sort [count#1040L DESC NULLS LAST]``

INTRODUCTION TO SPARK SQL IN PYTHON


Query plan
== Physical Plan ==
*Sort [count#1160L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1160L DESC NULLS LAST, 200)
+- *HashAggregate(keys=[word#963], functions=[count(1)])
+- Exchange hashpartitioning(word#963, 200)
+- *HashAggregate(keys=[word#963], functions=[partial_count(1)])
+- *FileScan parquet [word#963] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet], PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<word:string>

The previous plan had the following lines, which are missing from the plan above:

...
+- InMemoryTableScan [word#963]
+- InMemoryRelation [word#963, id#964L, title#965, part#966], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
...

INTRODUCTION TO SPARK SQL IN PYTHON


Let's practice
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N

You might also like