[go: up one dir, main page]

0% found this document useful (0 votes)
27 views93 pages

06 Parallel Processing Part2

The document provides an overview of Spark, a parallel processing framework, comparing it to MapReduce and detailing its architecture, including driver and executor processes. It explains the concept of Resilient Distributed Datasets (RDDs), their operations, and how to create them, emphasizing their use in distributed computing. Additionally, it discusses the programming model and transformations available within Spark, highlighting the importance of RDDs in Spark's functionality.

Uploaded by

mraiyata
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPTX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
27 views93 pages

06 Parallel Processing Part2

The document provides an overview of Spark, a parallel processing framework, comparing it to MapReduce and detailing its architecture, including driver and executor processes. It explains the concept of Resilient Distributed Datasets (RDDs), their operations, and how to create them, emphasizing their use in distributed computing. Additionally, it discusses the programming model and transformations available within Spark, highlighting the importance of RDDs in Spark's functionality.

Uploaded by

mraiyata
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPTX, PDF, TXT or read online on Scribd
You are on page 1/ 93

Parallel Processing -

Spark
Amir H.
Payberah
payberah@kth.se
2022-09-15
The Course Web
Page

htt ps : / / i d 2 2 2 1 k t h . g i t h u b . i o

1 / 66
The Questions-Answers
Page

htt ps://ti nyurl.com/bdenpwc5

2 / 66
Where Are
We?

3 / 66
MapReduce
Reminder

4 / 66
Motivation
(1/2)

► Acyclic data flow from stable storage to stable


storage.

5/
Motivation
(1/2)

► Acyclic data flow from stable storage to stable


storage.

5/
Motivation
(2/2)

► MapReduce is expensive (slow), i.e., always goes to disk


and HDFS.

6 / 66
So, Let’s Use
Spark

7 / 66
Spark vs. MapReduce
(1/2)

8/
Spark vs. MapReduce
(1/2)

8/
Spark vs. MapReduce
(2/2)

9/
Spark vs. MapReduce
(2/2)

9/
Spark
Application

10 /
Spark Applications
Architecture
► Spark applications consist
of
• A driver process
• A set of executor
processes

[M. Zaharia e t a l . , Spa rk : The D e fi n i ti v e Guide, O ’ R e i l l y Media, 2018]

11 /
Driver
Process

► The heart of a Spark


application

► Sits on a node in the cluster

► Runs the main() function

12 /
Driver
Process

► The heart of a Spark application

► Sits on a node in the cluster

► Runs the main() function

► Responsible for three things:


• Maintaining information about the Spark application
• Responding to a user’s program or input
• Analyzing, distributing, and scheduling work across the
executors

12 /
Executo
rs
► Responsible for two things:
• Executing code assigned to it by the driver
• Reporting the state of the computation on that executor back to
the driver

13 /
SparkSession

► A driver process that controls a Spark application.

► Main entry point to Spark functionality.

► A one-to-one correspondence between a SparkSession and a Spark


application.

► Available in console shell as spark.


SparkSession.builder.master(master).appName(appName).getOrCreate()

14 /
SparkContext

► The entry point for low-level API functionality.

► You access it through the SparkSession.

► You can access a SparkContext via


spark.sparkContext.

► Available in console shell as sc.


v a l conf = new SparkConf().setMaster(master).setAppName(appName)
new SparkContext (conf)

15 /
SparkSession vs. SparkContext

► Prior to Spark 2.0.0, a the spark driver program uses SparkContext to


connect to the cluster.

► In order to use APIs of SQL, Hive and streaming, separate


SparkContexts should to be created.

16 /
SparkSession vs. SparkContext

► Prior to Spark 2.0.0, a the spark driver program uses SparkContext to


connect to the cluster.

► In order to use APIs of SQL, Hive and streaming, separate


SparkContexts should to be created.

► SparkSession provides access to all the spark functionalities that


SparkContext
does, e.g., SQL, Hive and streaming.

► SparkSession internally has a SparkContext for actual computation.

16 /
Programming
Model

17 /
Spark Programming
Model

► Job is described based on directed acyclic graphs (DAG)


data flow.

18 /
Spark Programming
Model

► Job is described based on directed acyclic graphs (DAG) data flow.

► A data flow is composed of any number of data sources, operators,


and data sinks by connecting their inputs and outputs.

18 /
Spark Programming
Model

► Job is described based on directed acyclic graphs (DAG) data flow.

► A data flow is composed of any number of data sources, operators,


and data sinks by connecting their inputs and outputs.

► Parallelizable operators

18 /
Resilient Distributed Datasets (RDD)
(1/3)

► A distributed memory abstraction.

► Immutable collections of objects spread across a


cluster.
• Like a L i n ke d L i s t <MyObjects>

19 /
Resilient Distributed Datasets (RDD)
(2/3)

► An RDD is divided into a number of partitions, which are atomic pieces


of information.

► Partitions of an RDD can be stored on different nodes of a cluster.

20 /
Resilient Distributed Datasets (RDD)
(3/3)

► RDDs were the primary API in the Spark 1.x series.

► They are not commonly used in the Spark 2.x


series.

► Virtually all Spark code you run, compiles down to


an RDD.

21 /
Types of
RDDs

► Two types of RDDs:


• Generic RDD
• Key-value RDD

► Both represent a collection of objects.

► Key-value RDDs have special operations, such as aggregation, and a


concept of custom partitioning by key.

22 /
When To Use
RDDs?

► Short answer: you should not manually create RDDs unless you have a
very specific reason.

23 /
When To Use
RDDs?

► Short answer: you should not manually create RDDs unless you have a
very specific reason.

► They are a much lower-level API that provides a lot of power.

► But, lack of the optimizations that are available in the Structured APIs.

23 /
When To Use
RDDs?

► Short answer: you should not manually create RDDs unless you have a
very specific reason.

► They are a much lower-level API that provides a lot of power.

► But, lack of the optimizations that are available in the Structured APIs.

► The most likely reason to use RDDs: custom partitioning of data.


• Fine-grained control over the physical distribution of data.

23 /
Creating
RDDs

24 /
Creating RDDs - Parallelized
Collections
► Use the p a r a l l e l i z e method on a SparkContext.

► This turns a single node collection into a parallel


collection.

► You can also explicitly state the number of partitions.

► In the console shell, you can either use s c or


spark.sparkContext

25 /
Creating RDDs - Parallelized
Collections
► Use the p a r a l l e l i z e method on a SparkContext.

► This turns a single node collection into a parallel


collection.

► You can also explicitly state the number of partitions.

► In the console shell, you can either use s c or


spark.sparkContext
v a l numsCollecti on = A r r a y ( 1 , 2 , 3 )
v a l nums = s c . p a r a l l e l i z e ( n u m s C o l l e c ti o n )

25 /
Creating RDDs - Parallelized
Collections
► Use the p a r a l l e l i z e method on a SparkContext.

► This turns a single node collection into a parallel


collection.

► You can also explicitly state the number of partitions.

► In the console shell, you can either use s c or


spark.sparkContext
v a l numsCollecti on = A r r a y ( 1 , 2 , 3 )
v a l nums = s c . p a r a l l e l i z e ( n u m s C o l l e c ti o n )

v a l wordsCollecti on = "take i t e a sy, t h i s i s a t e s t " . s p l i t ( " " )


v a l words = s p a r k . s p a r k C o n t ex t . p a ra l l e l i ze ( w o rd s C o l l e c ti o n , 2 )

25 /
Creating RDDs - External
Datasets

► Create RDD from an external storage.


• E.g., local file system, HDFS, Cassandra, HBase, Amazon
S3, etc.

► Text file RDDs can be created using t e x t F i l e method.

v a l myFile1 = s c . t e x t F i l e ( " fi l e . t x t " )


v a l myFile2 = s c . t ex t F i l e ( " h d fs : / / n a m e n o d e : 9 0 0 0 / p at h / fi le " )

26 /
RDD
Operations

27 /
RDD
Operations

► RDDs support two types of operations:

• Transformations: allow us to build the


logical plan

• Actions: allow us to trigger the


computation

28 /
Transformatio
ns

29 /
Transformatio
ns

► Create a new RDD from an existing one.

► All transformations are lazy.


• Not compute their results right away.
• Remember the transformations applied to the base dataset.
• They are only computed when an action requires a result to be returned
to the driver program.

30 /
Lineag
e

► Lineage: transformations used to


build an RDD.

► RDDs are stored as a chain of


objects cap- turing the lineage of
each RDD.

val fi l e = s c . t e x t F i l e ( " h d f s : / / . . . " )


val s i c s = fi l e . fi l t e r ( _ . c o n t a i n s ( " S I C S " ) )
val cachedSics = s i c s . c a c h e ( )
val ones = cachedSics.map(_ => 1 )
val count = ones.reduce(_+_)

31 /
Generic RDD Transformations
(1/3)
► d i s ti n c t removes duplicates from the RDD.
► fi l t e r returns the RDD records that match some predicate
function.
v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l even = n u m s . fi l t e r ( x => x % 2 == 0 )
// 2

32 /
Generic RDD Transformations
(1/3)
► d i s ti n c t removes duplicates from the RDD.
► fi l t e r returns the RDD records that match some predicate
function.
v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l even = n u m s . fi l t e r ( x => x % 2 == 0 )
// 2

v a l words = s c . p a r a l l e l i z e ( " t h i s i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )


v a l disti nctWords = w o r d s . d i s ti n c t ( )
/ / a , t h i s , i s , e a s y, , t e s t , i t

32 /
Generic RDD Transformations
(1/3)
► d i s ti n c t removes duplicates from the RDD.
► fi l t e r returns the RDD records that match some predicate
function.
v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l even = n u m s . fi l t e r ( x => x % 2 == 0 )
// 2

v a l words = s c . p a r a l l e l i z e ( " t h i s i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )


v a l disti nctWords = w o r d s . d i s ti n c t ( )
/ / a , t h i s , i s , e a s y, , t e s t , i t

def s t a r t s W i t h T ( i n d i v i d u a l : S t r i n g ) = { i n d i v i d u a l . s t a r t s W i t h ( " t " ) }


v a l tWordList = wo rd s . fi l t e r ( wo rd => startsW ithT (word))
// t h i s , test

32 /
Generic RDD Transformations
(2/3)

► map and fl atMap apply a given


function on each RDD record
independently.

v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l squares = nums.map(x => x * x )
// 1, 4, 9

33 /
Generic RDD Transformations
(2/3)

► map and fl atMap apply a given


function on each RDD record
independently.

v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l squares = nums.map(x => x * x )
// 1, 4, 9

v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )


v a l tWords = words.map(word => (word, w o r d . s t a r t s W i t h ( " t " ) ) )
/ / ( t a k e , t r u e ) , ( i t , f a l s e ) , ( e a s y, , f a l s e ) , ( t h i s , t r u e ) , ( i s , f a l s e ) ,
( a , fa l s e ) , (test,true)

33 /
Generic RDD Transformations
(3/3)

► sortBy sorts an RDD


records.
v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )

v a l sortedWords = words.sortBy(word => wo rd . l e n g t h ( ))


/ / a , i t , i s , t a ke , t h i s , t e s t , e a sy,

34 /
Key-Value RDD Transformations - Basics
(1/2)
► In a (k, v) pairs, k is is the key, and v is
the value.
► To make a key-value RDD:

35 /
Key-Value RDD Transformations - Basics
(1/2)
► In a (k, v) pairs, k is is the key, and v is the
value.
► To make a key-value RDD:
• map over your current RDD to a basic key-value
structure.

v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )


v a l keyword1 = words.map(word => (word, 1 ) )
/ / ( t a k e , 1 ) , ( i t , 1 ) , ( e a s y, , 1 ) , ( t h i s , 1 ) , ( i s , 1 ) , ( a , 1 ) , ( t e s t , 1 )

35 /
Key-Value RDD Transformations - Basics
(1/2)
► In a (k, v) pairs, k is is the key, and v is the
value.
► To make a key-value RDD:
•map over your current RDD to a basic key-value
structure.
• Use the keyBy to create a key from the current
val words =value.
s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " "))
v a l keyword1 = words.map(word => (word, 1 ) )
/ / ( t a k e , 1 ) , ( i t , 1 ) , ( e a s y, , 1 ) , ( t h i s , 1 ) , ( i s , 1 ) , ( a , 1 ) , ( t e s t , 1 )

v a l keyword2 = words.keyBy(word => wo rd . t o S e q ( 0 ). t o S t r i n g )


/ / ( t , t a k e ) , ( i , i t ) , ( e , e a s y, ) , ( t , t h i s ) , ( i , i s ) , ( a , a ) , ( t , t e s t )

35 /
Key-Value RDD Transformations - Basics
(1/2)
► In a (k, v) pairs, k is is the key, and v is the
value.
► To make a key-value RDD:
• map over your current RDD to a basic key-value
structure.
• Use the keyBy to create a key from the current
v a l words =value. s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " "))
v a l keyword1 = words.map(word => (word, 1 ) )
• Use the z i p to zip together two RDD.
/ / ( t a k e , 1 ) , ( i t , 1 ) , ( e a s y, , 1 ) , ( t h i s , 1 ) , ( i s , 1 ) , ( a , 1 ) , ( t e s t , 1 )

v a l keyword2 = words.keyBy(word => wo rd . t o S e q ( 0 ). t o S t r i n g )


/ / ( t , t a k e ) , ( i , i t ) , ( e , e a s y, ) , ( t , t h i s ) , ( i , i s ) , ( a , a ) , ( t , t e s t )

v a l numRange = s c . p a r a l l e l i z e ( 0 to 6 )
v a l keyword3 = words.zip(numRange)
/ / ( t a k e , 0 ) , ( i t , 1 ) , ( e a s y, , 2 ) ,
(this,3), ( i s , 4 ) , (a,5), (test,6)

35 /
Key-Value RDD Transformations - Basics
(2/2)
► keys and values extract keys and values,
respectively.

v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )


v a l keyword = words.keyBy(word => word.toLowerCase .toSeq(0).toString )
/ / ( t , t a k e ) , ( i , i t ) , ( e , e a s y, ) , ( t , t h i s ) , ( i , i s ) , ( a , a ) , ( t , t e s t )

v a l k = keyword.keys
v a l v = keyword.values

36 /
Key-Value RDD Transformations - Basics
(2/2)
► keys and values extract keys and values, respectively.
► lookup looks up the values for a particular key with
an RDD.

v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )


v a l keyword = words.keyBy(word => word.toLowerCase .toSeq(0).toString )
/ / ( t , t a k e ) , ( i , i t ) , ( e , e a s y, ) , ( t , t h i s ) , ( i , i s ) , ( a , a ) , ( t , t e s t )

v a l k = keyword.keys
v a l v = keyword.values

v a l tValues = key word.lookup("t")


/ / t a ke , t h i s , t e s t

36 /
Key-Value RDD Transformations - Basics
(2/2)
► keys and values extract keys and values, respectively.
► lookup looks up the values for a particular key with
an RDD.
► mapValues maps over values.
v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )
v a l keyword = words.keyBy(word => word.toLowerCase .toSeq(0).toString )
/ / ( t , t a k e ) , ( i , i t ) , ( e , e a s y, ) , ( t , t h i s ) , ( i , i s ) , ( a , a ) , ( t , t e s t )

v a l k = keyword.keys
v a l v = keyword.values

v a l tValues = key word.lookup("t")


/ / t a ke , t h i s , t e s t

v a l mapV = keyword.mapValues(word => word.toUpperCase)


/ / ( t ,TA K E ) , ( i , I T ) , ( e , E A S Y, ) , ( t ,T H I S ) , ( i , I S ) , ( a , A ) , ( t ,T E S T )

36 /
Key-Value RDD Transformations - Aggregation
(1/2)

► Aggregate the values associated with


each key.

v a l kvChars = . . .
// ( t , 1 ) , ( a , 1 ) , ( k , 1 ) , ( e , 1 ) , ( i , 1 ) , ( t , 1 ) , ( e , 1 ) , ( a , 1 ) , ( s , 1 ) , ( y, 1 ) , ( , , 1 ) , . . .

v a l grpChar = kvChars.groupByKey().map(row => (row . _1 , row._2.reduce(addFunc)))


// ( t , 5 ) , ( h , 1 ) , ( , , 1 ) , ( e , 3 ) , ( a , 3 ) , ( i , 3 ) , ( y, 1 ) , ( s , 4 ) , ( k , 1 ) )

37 /
Key-Value RDD Transformations - Aggregation
(1/2)

► Aggregate the values associated with


each key.

v a l kvChars = . . .
// ( t , 1 ) , ( a , 1 ) , ( k , 1 ) , ( e , 1 ) , ( i , 1 ) , ( t , 1 ) , ( e , 1 ) , ( a , 1 ) , ( s , 1 ) , ( y, 1 ) , ( , , 1 ) , . . .

v a l grpChar = kvChars.groupByKey().map(row => (row . _1 , row._2.reduce(addFunc)))


// ( t , 5 ) , ( h , 1 ) , ( , , 1 ) , ( e , 3 ) , ( a , 3 ) , ( i , 3 ) , ( y, 1 ) , ( s , 4 ) , ( k , 1 ) )

def a d d F u n c ( l e ft : I n t , r i g h t : I n t ) = l e ft + r i g h t
v a l redChar = kvChars.reduceByKey(addFunc)
// ( t , 5 ) , (h,1), ( , , 1 ) , (e, 3 ), (a,3), ( i , 3 ) ,
( y, 1 ) , ( s , 4 ) , ( k , 1 ) )

37 /
Key-Value RDD Transformations - Aggregation
(2/2)

► groupByKey or reduceByKey?

38 /
Key-Value RDD Transformations - Aggregation
(2/2)

► groupByKey or reduceByKey?

► In groupByKey, each executor must hold all values for a given key in
memory before applying the function to them.
• This is problematic in massive skewed key.

► In reduceByKey, the reduce happens within each partition, and does not
need to put everything in memory.

38 /
Key-Value RDD Transformations -
Join

► j o i n performs an inner-join on the key.


► f u l l O t h e r J o i n , left O ute r Jo in , rightOuterJoin ,
and cartesian .

v a l keyedChars = . . .
// ( t , 4 ) , ( h , 6 ) , ( , , 9 ) , ( e , 8 ) , ( a , 3 ) , ( i , 5 ) , ( y, 2 ) , ( s , 7 ) , ( k , 0 )

v a l kvChars = . . .
// ( t , 1 ) , ( a , 1 ) , ( k , 1 ) , ( e , 1 ) , ( i , 1 ) , ( t , 1 ) , ( e , 1 ) , ( a , 1 ) , ( s , 1 ) , ( y, 1 ) , ( , , 1 ) , . . .

v a l joinedChars = kvC h ars . join (keyedC hars )


// ( t , ( 1 , 4 ) ) , ( t , ( 1 , 4 ) ) , ( t , ( 1 , 4 ) ) , ( t , ( 1 , 4 ) ) , ( t , ( 1 , 4 ) ) , (h, (1 , 6 )), ( , , ( 1 , 9 ) ) , (e , (1 , 8 )), . . .

39 /
Action
s

40 /
Action
s

► Transformations allow us to build up our logical


transformation plan.

► We run an action to trigger the computation.


• Instructs Spark to compute a result from a series of
transformations.

41 /
Action
s

► Transformations allow us to build up our logical


transformation plan.

► We run an action to trigger the computation.


• Instructs Spark to compute a result from a series of
transformations.

► There are three kinds of actions:


• Actions to view data in the console
• Actions to collect data to native objects in the respective
language
• Actions to write to output data sources

41 /
RDD Actions
(1/6)

► c o l l e c t returns all the elements of the RDD as an array at


the driver.

► fi r s t returns the first value in the RDD.


v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )

nums.collect()
/ / Array(1, 2 , 3)

n u m s . fi r s t ( )
// 1

42 /
RDD Actions
(2/6)
► take returns an array with the first n elements of
the RDD.

v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )

words.take(5)
/ / A r r a y ( t a ke , i t , e a s y, , t h i s , i s )

43 /
RDD Actions
(2/6)
► take returns an array with the first n elements of
the RDD.
► Variations on this function: takeOrdered and
val takeSample.
words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " "))

words.take(5)
/ / A r r a y ( t a ke , i t , e a s y, , t h i s , i s )

words.takeOrdered(5)
/ / A r r a y ( a , e a s y, , i s , i t , take)

v a l withReplacement = true
v a l numberToTake = 6
v a l randomSeed = 100L
words.takeSample(withRepla
cement, numberToTake,
randomSeed)
/ / A r r a y ( t a ke , i t , t e s t ,
t h i s , t e s t , take )

43 /
RDD Actions
(3/6)

► count returns the number of elements in the dataset.


► countByValue counts the number of values in a given RDD.
► countByKey returns a hashmap of (K, I n t ) pairs with the count of
each key.
• Only available on key-valye RDDs, i.e., (K, V)
v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t , take i t e a s y " . s p l i t ( " " ) )

words.count()
/ / 10

words.countByValue()
/ / Map(this - > 1 , i s - > 1 , i t - > 2 , a - > 1 , e a sy, - > 1 , t e s t , - > 1 , take - > 2 , easy - > 1 )

44 /
RDD Actions
(4/6)

► max and min return the maximum and minimum values,


respectively.
v a l nums = s c . p a r a l l e l i z e ( 1 to 20)

v a l maxValue = nums.max()
/ / 20

v a l minValue = nums.min()
// 1

45 /
RDD Actions
(5/6)
► reduce aggregates the elements of the dataset using a given function.
► The given function should be commutative and associative so that it can
be computed correctly in parallel.

s c . p a r a l l e l i z e ( 1 to 20). reduce(_ + _ )
/ / 210

def wordLengthReducer(left Word:String, r i g h t Wo rd : S t r i n g ) : S t r i n g =


{ i f (left Word . length > rightWord .length)
return left Word
else
return rightWord
}

words.reduce(wordLengthReducer)
/ / e a sy,

46 /
RDD Actions
(6/6)

► saveAsTextFile writes the elements of an RDD as a text


file.
• Local filesystem, HDFS or any other Hadoop-supported file
system.

► saveAsObjectFile explicitly writes key-value pairs.


v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )

words . saveA sTex tF ile (" fi le: / tm p/ words" )

47 /
Exampl
e
val textFile = s c . tex t F il e ( " h d fs : / / . . . " )

v a l words = t e x t F i l e . fl a t M a p ( l i n e => l i n e . s p l i t ( " " ) )


v a l ones = words.map(word => (word, 1 ) )
v a l counts = ones.reduceByKey(_ + _ )

c o u n t s . s a v e A s Te x t F i l e ( " h d f s : / / . . . " )

48 /
Cache and
Checkpoints

49 /
Cachin
g
► When you cache an RDD, each node stores any partitions of it that it
computes in memory.

► An RDD that is not cached is re-evaluated each time an action is


invoked on that RDD.

► A node reuses the cached RDD in other actions on that dataset.

50 /
Cachin
g
► When you cache an RDD, each node stores any partitions of it that it
computes in memory.

► An RDD that is not cached is re-evaluated each time an action is


invoked on that RDD.

► A node reuses the cached RDD in other actions on that dataset.


► There are two functions for caching an RDD:
• cache caches the RDD into memory
• p e r s i s t ( l e v e l ) can cache in memory, on disk, or off-heap memory

v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )

words.cache()

50 /
Checkpointi
ng

► checkpoint saves an RDD to disk.

► Checkpointed data is not removed after SparkContext is destroyed.

► When we reference a checkpointed RDD, it will derive from the


checkpoint instead of the source data.

v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )

sc . setC heckpointDir (" / p ath/ch eckpointi n g " )


words.checkpoint()

51 /
Execution
Engine

52 /
More About
Lineage
► A DAG representing the computations done on the RDD is called
lineage graph.

val rdd = s c . t e x t F i l e ( . . . )
val fi l t e r e d = r d d . m a p ( . . . ) . fi l t e r ( . . . ) . p e r s i s t ( )
val count = fi l t e r e d . c o u n t ( )
val reduced = fi l t e r e d . r e d u c e ( )

[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]

53 /
Dependenci
es

► RDD dependencies encode when data must move across


network.

[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]

54 /
Two Types of Dependencies
(1/2)
► Narrow transformations (dependencies)
• Each input partition will contribute to only one output
partition.
• With narrow transformations, Spark can perform a
pipelining

[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]

55 /
Two Types of Dependencies
(2/2)
► Wide transformations (dependencies)
• Each input partition will contribute to many output
partition.
• Usually referred to as a shuffle

[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]

56 /
Exampl
e

57 /
Exampl
e

[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]

57 /
Lineages and Fault Tolerance
(1/2)

► No replication.

► Lineages are the key to fault tolerance in


Spark.

► Recompute only the lost partitions of an


RDD.

58 /
Lineages and Fault Tolerance
(2/2)

► Assume one of the partitions fails.

59 /
Lineages and Fault Tolerance
(2/2)
► Assume one of the partitions fails.
► We only have to recompute the data shown below to get
back on track.

[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]

59 /
The Anatomy of a Spark
Job

[H. Karau e t a l . , High Performance S p a rk , O ’ R e i l l y Media, 2017]

60 /
Job
s
► A Spark job is the highest element of Spark’s execution
hierarchy.
• Each Spark job corresponds to one action.
• Each action is called by the driver program of a Spark
application.

[H. Karau e t a l . , High Performance S p a rk , O ’ R e i l l y Media, 2017]

61 /
Stage
s
► Each job breaks down into a series of stages.
• Stages in Spark represent groups of tasks that can be executed
together.
• Wide transformations define the breakdown of jobs into stages.

[H. Karau e t a l . , High Performance S p a rk , O ’ R e i l l y Media, 2017]

62 /
Task
s
► A stage consists of tasks, which are the smallest execution unit.
• Each task represents one local computation.
• All of the tasks in one stage execute the same code on a different piece
of the data.

[H. Karau e t a l . , High Performance S p a rk , O ’ R e i l l y Media, 2017]

63 /
Summa
ry

► RDD: a distributed memory abstraction

► Two types of operations: transformations and


actions

► Lineage graph

► Caching

► Jobs, stages, and tasks

► Wide vs. narrow dependencies

64 /
Referenc
es

► M. Zaharia et al., “Spark: The Definitive Guide”, O’Reilly Media, 2018


- Chapters 2, 12, 13, and 14

► M. Zaharia et al., “Resilient distributed datasets: A fault-tolerant


abstraction for in-memory cluster computing”, USENIX NSDI, 2012.

65 /
Question
s?

66 /

You might also like