06 Parallel Processing Part2
06 Parallel Processing Part2
Spark
Amir H.
Payberah
payberah@kth.se
2022-09-15
The Course Web
Page
htt ps : / / i d 2 2 2 1 k t h . g i t h u b . i o
1 / 66
The Questions-Answers
Page
2 / 66
Where Are
We?
3 / 66
MapReduce
Reminder
4 / 66
Motivation
(1/2)
5/
Motivation
(1/2)
5/
Motivation
(2/2)
6 / 66
So, Let’s Use
Spark
7 / 66
Spark vs. MapReduce
(1/2)
8/
Spark vs. MapReduce
(1/2)
8/
Spark vs. MapReduce
(2/2)
9/
Spark vs. MapReduce
(2/2)
9/
Spark
Application
10 /
Spark Applications
Architecture
► Spark applications consist
of
• A driver process
• A set of executor
processes
11 /
Driver
Process
12 /
Driver
Process
12 /
Executo
rs
► Responsible for two things:
• Executing code assigned to it by the driver
• Reporting the state of the computation on that executor back to
the driver
13 /
SparkSession
14 /
SparkContext
15 /
SparkSession vs. SparkContext
16 /
SparkSession vs. SparkContext
16 /
Programming
Model
17 /
Spark Programming
Model
18 /
Spark Programming
Model
18 /
Spark Programming
Model
► Parallelizable operators
18 /
Resilient Distributed Datasets (RDD)
(1/3)
19 /
Resilient Distributed Datasets (RDD)
(2/3)
20 /
Resilient Distributed Datasets (RDD)
(3/3)
21 /
Types of
RDDs
22 /
When To Use
RDDs?
► Short answer: you should not manually create RDDs unless you have a
very specific reason.
23 /
When To Use
RDDs?
► Short answer: you should not manually create RDDs unless you have a
very specific reason.
► But, lack of the optimizations that are available in the Structured APIs.
23 /
When To Use
RDDs?
► Short answer: you should not manually create RDDs unless you have a
very specific reason.
► But, lack of the optimizations that are available in the Structured APIs.
23 /
Creating
RDDs
24 /
Creating RDDs - Parallelized
Collections
► Use the p a r a l l e l i z e method on a SparkContext.
25 /
Creating RDDs - Parallelized
Collections
► Use the p a r a l l e l i z e method on a SparkContext.
25 /
Creating RDDs - Parallelized
Collections
► Use the p a r a l l e l i z e method on a SparkContext.
25 /
Creating RDDs - External
Datasets
26 /
RDD
Operations
27 /
RDD
Operations
28 /
Transformatio
ns
29 /
Transformatio
ns
30 /
Lineag
e
31 /
Generic RDD Transformations
(1/3)
► d i s ti n c t removes duplicates from the RDD.
► fi l t e r returns the RDD records that match some predicate
function.
v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l even = n u m s . fi l t e r ( x => x % 2 == 0 )
// 2
32 /
Generic RDD Transformations
(1/3)
► d i s ti n c t removes duplicates from the RDD.
► fi l t e r returns the RDD records that match some predicate
function.
v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l even = n u m s . fi l t e r ( x => x % 2 == 0 )
// 2
32 /
Generic RDD Transformations
(1/3)
► d i s ti n c t removes duplicates from the RDD.
► fi l t e r returns the RDD records that match some predicate
function.
v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l even = n u m s . fi l t e r ( x => x % 2 == 0 )
// 2
32 /
Generic RDD Transformations
(2/3)
v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l squares = nums.map(x => x * x )
// 1, 4, 9
33 /
Generic RDD Transformations
(2/3)
v a l nums = s c . p a r a l l e l i z e ( A r r a y ( 1 , 2 , 3 ) )
v a l squares = nums.map(x => x * x )
// 1, 4, 9
33 /
Generic RDD Transformations
(3/3)
34 /
Key-Value RDD Transformations - Basics
(1/2)
► In a (k, v) pairs, k is is the key, and v is
the value.
► To make a key-value RDD:
35 /
Key-Value RDD Transformations - Basics
(1/2)
► In a (k, v) pairs, k is is the key, and v is the
value.
► To make a key-value RDD:
• map over your current RDD to a basic key-value
structure.
35 /
Key-Value RDD Transformations - Basics
(1/2)
► In a (k, v) pairs, k is is the key, and v is the
value.
► To make a key-value RDD:
•map over your current RDD to a basic key-value
structure.
• Use the keyBy to create a key from the current
val words =value.
s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " "))
v a l keyword1 = words.map(word => (word, 1 ) )
/ / ( t a k e , 1 ) , ( i t , 1 ) , ( e a s y, , 1 ) , ( t h i s , 1 ) , ( i s , 1 ) , ( a , 1 ) , ( t e s t , 1 )
35 /
Key-Value RDD Transformations - Basics
(1/2)
► In a (k, v) pairs, k is is the key, and v is the
value.
► To make a key-value RDD:
• map over your current RDD to a basic key-value
structure.
• Use the keyBy to create a key from the current
v a l words =value. s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " "))
v a l keyword1 = words.map(word => (word, 1 ) )
• Use the z i p to zip together two RDD.
/ / ( t a k e , 1 ) , ( i t , 1 ) , ( e a s y, , 1 ) , ( t h i s , 1 ) , ( i s , 1 ) , ( a , 1 ) , ( t e s t , 1 )
v a l numRange = s c . p a r a l l e l i z e ( 0 to 6 )
v a l keyword3 = words.zip(numRange)
/ / ( t a k e , 0 ) , ( i t , 1 ) , ( e a s y, , 2 ) ,
(this,3), ( i s , 4 ) , (a,5), (test,6)
35 /
Key-Value RDD Transformations - Basics
(2/2)
► keys and values extract keys and values,
respectively.
v a l k = keyword.keys
v a l v = keyword.values
36 /
Key-Value RDD Transformations - Basics
(2/2)
► keys and values extract keys and values, respectively.
► lookup looks up the values for a particular key with
an RDD.
v a l k = keyword.keys
v a l v = keyword.values
36 /
Key-Value RDD Transformations - Basics
(2/2)
► keys and values extract keys and values, respectively.
► lookup looks up the values for a particular key with
an RDD.
► mapValues maps over values.
v a l words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " " ) )
v a l keyword = words.keyBy(word => word.toLowerCase .toSeq(0).toString )
/ / ( t , t a k e ) , ( i , i t ) , ( e , e a s y, ) , ( t , t h i s ) , ( i , i s ) , ( a , a ) , ( t , t e s t )
v a l k = keyword.keys
v a l v = keyword.values
36 /
Key-Value RDD Transformations - Aggregation
(1/2)
v a l kvChars = . . .
// ( t , 1 ) , ( a , 1 ) , ( k , 1 ) , ( e , 1 ) , ( i , 1 ) , ( t , 1 ) , ( e , 1 ) , ( a , 1 ) , ( s , 1 ) , ( y, 1 ) , ( , , 1 ) , . . .
37 /
Key-Value RDD Transformations - Aggregation
(1/2)
v a l kvChars = . . .
// ( t , 1 ) , ( a , 1 ) , ( k , 1 ) , ( e , 1 ) , ( i , 1 ) , ( t , 1 ) , ( e , 1 ) , ( a , 1 ) , ( s , 1 ) , ( y, 1 ) , ( , , 1 ) , . . .
def a d d F u n c ( l e ft : I n t , r i g h t : I n t ) = l e ft + r i g h t
v a l redChar = kvChars.reduceByKey(addFunc)
// ( t , 5 ) , (h,1), ( , , 1 ) , (e, 3 ), (a,3), ( i , 3 ) ,
( y, 1 ) , ( s , 4 ) , ( k , 1 ) )
37 /
Key-Value RDD Transformations - Aggregation
(2/2)
► groupByKey or reduceByKey?
38 /
Key-Value RDD Transformations - Aggregation
(2/2)
► groupByKey or reduceByKey?
► In groupByKey, each executor must hold all values for a given key in
memory before applying the function to them.
• This is problematic in massive skewed key.
► In reduceByKey, the reduce happens within each partition, and does not
need to put everything in memory.
38 /
Key-Value RDD Transformations -
Join
v a l keyedChars = . . .
// ( t , 4 ) , ( h , 6 ) , ( , , 9 ) , ( e , 8 ) , ( a , 3 ) , ( i , 5 ) , ( y, 2 ) , ( s , 7 ) , ( k , 0 )
v a l kvChars = . . .
// ( t , 1 ) , ( a , 1 ) , ( k , 1 ) , ( e , 1 ) , ( i , 1 ) , ( t , 1 ) , ( e , 1 ) , ( a , 1 ) , ( s , 1 ) , ( y, 1 ) , ( , , 1 ) , . . .
39 /
Action
s
40 /
Action
s
41 /
Action
s
41 /
RDD Actions
(1/6)
nums.collect()
/ / Array(1, 2 , 3)
n u m s . fi r s t ( )
// 1
42 /
RDD Actions
(2/6)
► take returns an array with the first n elements of
the RDD.
words.take(5)
/ / A r r a y ( t a ke , i t , e a s y, , t h i s , i s )
43 /
RDD Actions
(2/6)
► take returns an array with the first n elements of
the RDD.
► Variations on this function: takeOrdered and
val takeSample.
words = s c . p a r a l l e l i z e ( " t a k e i t e a sy, t h i s i s a t e s t " . s p l i t ( " "))
words.take(5)
/ / A r r a y ( t a ke , i t , e a s y, , t h i s , i s )
words.takeOrdered(5)
/ / A r r a y ( a , e a s y, , i s , i t , take)
v a l withReplacement = true
v a l numberToTake = 6
v a l randomSeed = 100L
words.takeSample(withRepla
cement, numberToTake,
randomSeed)
/ / A r r a y ( t a ke , i t , t e s t ,
t h i s , t e s t , take )
43 /
RDD Actions
(3/6)
words.count()
/ / 10
words.countByValue()
/ / Map(this - > 1 , i s - > 1 , i t - > 2 , a - > 1 , e a sy, - > 1 , t e s t , - > 1 , take - > 2 , easy - > 1 )
44 /
RDD Actions
(4/6)
v a l maxValue = nums.max()
/ / 20
v a l minValue = nums.min()
// 1
45 /
RDD Actions
(5/6)
► reduce aggregates the elements of the dataset using a given function.
► The given function should be commutative and associative so that it can
be computed correctly in parallel.
s c . p a r a l l e l i z e ( 1 to 20). reduce(_ + _ )
/ / 210
words.reduce(wordLengthReducer)
/ / e a sy,
46 /
RDD Actions
(6/6)
47 /
Exampl
e
val textFile = s c . tex t F il e ( " h d fs : / / . . . " )
c o u n t s . s a v e A s Te x t F i l e ( " h d f s : / / . . . " )
48 /
Cache and
Checkpoints
49 /
Cachin
g
► When you cache an RDD, each node stores any partitions of it that it
computes in memory.
50 /
Cachin
g
► When you cache an RDD, each node stores any partitions of it that it
computes in memory.
words.cache()
50 /
Checkpointi
ng
51 /
Execution
Engine
52 /
More About
Lineage
► A DAG representing the computations done on the RDD is called
lineage graph.
val rdd = s c . t e x t F i l e ( . . . )
val fi l t e r e d = r d d . m a p ( . . . ) . fi l t e r ( . . . ) . p e r s i s t ( )
val count = fi l t e r e d . c o u n t ( )
val reduced = fi l t e r e d . r e d u c e ( )
[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]
53 /
Dependenci
es
[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]
54 /
Two Types of Dependencies
(1/2)
► Narrow transformations (dependencies)
• Each input partition will contribute to only one output
partition.
• With narrow transformations, Spark can perform a
pipelining
[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]
55 /
Two Types of Dependencies
(2/2)
► Wide transformations (dependencies)
• Each input partition will contribute to many output
partition.
• Usually referred to as a shuffle
[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]
56 /
Exampl
e
57 /
Exampl
e
[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]
57 /
Lineages and Fault Tolerance
(1/2)
► No replication.
58 /
Lineages and Fault Tolerance
(2/2)
59 /
Lineages and Fault Tolerance
(2/2)
► Assume one of the partitions fails.
► We only have to recompute the data shown below to get
back on track.
[htt ps://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies ]
59 /
The Anatomy of a Spark
Job
60 /
Job
s
► A Spark job is the highest element of Spark’s execution
hierarchy.
• Each Spark job corresponds to one action.
• Each action is called by the driver program of a Spark
application.
61 /
Stage
s
► Each job breaks down into a series of stages.
• Stages in Spark represent groups of tasks that can be executed
together.
• Wide transformations define the breakdown of jobs into stages.
62 /
Task
s
► A stage consists of tasks, which are the smallest execution unit.
• Each task represents one local computation.
• All of the tasks in one stage execute the same code on a different piece
of the data.
63 /
Summa
ry
► Lineage graph
► Caching
64 /
Referenc
es
65 /
Question
s?
66 /