Caching in Spark
Caching in Spark
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
Mark Plutowski
Data Scientist
What is caching?
Keeping data in memory
df.cache()
TO UNCACHE IT:
df.unpersist()
False
df.cache()
df.is_cached
True
False
1. useDisk = True
2. useMemory = True
3. useOffHeap = False
4. deserialized = True
5. replication = 1
df.persist()
df.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK)
False
spark.catalog.cacheTable('df')
spark.catalog.isCached(tableName='df')
True
False
spark.catalog.clearCache()
Cache selectively
Mark Plutowski
Data Scientist
Use the Spark UI inspect execution
Spark Task is a unit of execution that runs on a single cpu
Spark Stage a group of tasks that perform the same computation in parallel, each task typically
running on a different subset of the data
Spark Job is a computation triggered by an action, sliced into one or more stages.
2. http://[DRIVER_HOST]:4041
3. http://[DRIVER_HOST]:4042
4. http://[DRIVER_HOST]:4043
...
spark.catalog.uncacheTable('table1')
spark.catalog.isCached('table1')
spark.catalog.dropTempView('table1')
in memory,
or on disk,
at a snapshot in time.
spark.sql(query3agg).show()
Mark Plutowski
Data Scientist
Logging primer
import logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Hello %s", "world")
logging.debug("Hello, take %d", 2)
distributed execution
def reset(self):
self.start_time = time.time()
t = timer()
logging.info("No action here.")
t.elapsed()
logging.debug("df has %d rows.", df.count())
t.elapsed()
t = timer()
logger.info("No action here.")
t.elapsed()
if ENABLED:
logger.info("df has %d rows.", df.count())
t.elapsed()
Mark Plutowski
Data Scientist
Explain
EXPLAIN SELECT * FROM table1
df.registerTempTable('df')
Batched: true,
Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>'
== Physical Plan ==
FileScan parquet [word#963,id#964L,title#965,part#966]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
== Physical Plan ==
FileScan parquet [word#712,id#713L,title#714,part#715]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
+- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan parquet [word#0,id#1L,title#2,part#3]
Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
+- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan parquet [word#0,id#1L,title#2,part#3]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
df.groupBy('word')
.count()
.sort(desc('count'))
.explain()
InMemoryRelation
InMemoryTableScan
`HashAggregate(keys=[word#963], ...)``
`HashAggregate(keys=[word#963], functions=[count(1)])``
The previous plan had the following lines, which are missing from the plan above:
...
+- InMemoryTableScan [word#963]
+- InMemoryRelation [word#963, id#964L, title#965, part#966], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
...