Class 06 IntroToSpark
Class 06 IntroToSpark
Introduction to Spark
Arunkumar Bagavathi
Department of Computer Science
Oklahoma State university
1
Limitations of MapReduce
1. No possibility of interactive jobs and models
2. Challenges in many iterative algorithms:
• Graph algorithms like PageRank and Travelling Salesman algorithm
• ML algorithms like k-Means clustering, k-NN, and neural networks
3. Supports only batch processing
4. Intermediate disk read/write for each MapReduce job
5. Dependent tasks cannot be parallelized
6. Difficulty of programming
Apache Spark
• Unified architecture – support wide range of big data analytics tasks in a single
computing engine:
data loading
SQL queries
Machine learning
Streaming
• Support configuring varied set of libraries for variety of tasks
Programming languages (Python, Java, Scala, and R)
Web applications (Node.js and Django)
Data storage (AWS S3, HDFS, Apache Kafka)
Analytics (Spark MLlib, Spark SQL, and Spark GraphX)
• Deployment
Standalone
Amazon EC2
Hadoop YARN
Kubernetes
Source: Spark The Definitive Guide book 3
Spark Background
• Developed by Dr. Matei Zaharia from Amplab at UC Berkeley
• Written in Scala
• First Published paper - 2011
• Databricks extended support to Spark
• Spark Summit started from 2013
to assemble scientists, analysts, and engineers
to give workshops on Spark, big data, data science, and machine learning
• Enterprises using Spark at scale: Facebook, Hotels.com, Netflix, Microsoft, etc
• Open source
• Largest open source community in big data
• 1000s of contributors from 250+ organizations around the globe
Spark documentation: https://spark.apache.org/docs/2.0.0/ Spark 2.0 release video: https://www.youtube.com/watch?v=RUTeY4E2MoQ 4
MapReduce Word Count
5
Spark Word Count
Scala Python
val inputRDD = sc.textFile(filepath) inputRDD = sc.textFile(filepath)
val rdd1 = inputRDD.flatMap(line => line.split(“ “)) rdd1 = inputRDD.flatMap(lambda line: line.split(“ “))
val rdd2 = rdd1.map(word => (word,1)) rdd2 = rdd1.map(lambda word: (word,1))
val rdd3 = rdd2.reduceByKey(_ + _) rdd3 = rdd2.reduceByKey(lambda a, b: a + b)
rdd3.saveAsTextFile(filepath2) rdd3.saveAsTextFile(filepath2)
6
Resilient Distributed Datasets (RDD)
• Drawback in MapReduce in term of data analysis
More I/O than computation
• Spark tries to store the data as much as possible in the memory
instead of spilling it to the disk
• Spark creates logically partitioned datasets called RDDs
• RDD is a data structure to store immutable collections of objects
(Java, Scala, or Python) that can be distributed and processed in
parallel in the cluster
• Main component in Spark
• User has a complete control over
Partitioning
Storage (Memory or disk)
Parallel operations on the data (transformations, and actions)
• Fault-tolerant: Automatically rebuild on failure 7
Resilient Distributed Datasets (RDD)
• RDDs are created by
Loading the external dataset
Parallelizing existing data (e.g.) data stored in lists, dictionaries, etc.
Performing operations on existing RDDs
8
Spark Concepts
• Spark follows driver-executor architecture
• Every Spark job/application start with the Driver program – very
similar to Driver function in MapReduce
• Driver
runs in a primary node
access Spark through SparkContext – builds RDDs
converts user application into smaller tasks
assign tasks to executor nodes
schedules job execution
manages RDDs
• Executor
execute tasks
read/write data
store intermediate results in-memory, cache, or on hard drives
Source: Learning Spark book 9
Spark Execution
• Standalone application – created in Java/Python and runs in local
machine (or) configured machines
• YARN
Client
Cluster
• User programs are same for all execution types
10
Spark Standalone Execution
• Cluster Manager – acquire and allocating resources for tasks in the
cluster
YARN is the cluster manager when the execution is in YARN mode
• This execution mode works only for light-weight applications
11
Spark Execution in YARN Client mode
• Spark Driver runs on the Client side, where the job is submitted
• Application Master requests resources from YARN
• Client communicates with executors to schedule the tasks
Source: Cloudera 12
Spark Execution in YARN Cluster mode
• Spark Driver runs in the Application Master of the cluster
• Single node is responsible for running the Driver and requesting resources from
YARN
• Client launching the application doesn’t need to be active for the complete
lifetime of the application
Source: Cloudera 13
Spark Application Execution Overview
1. Users submit their application using spark-submit command in the terminal
2. SparkContext is instantiated and the application becomes a Driver
3. The Driver requests resources from the cluster manager to complete tasks in
the application
4. The cluster manager launches executors
5. Depending on data locations and resources required for transformations and
actions, the Driver send the tasks to executors
6. Executors run their corresponding tasks and output results as commanded
(save in memory for future use or store in HDFS)
7. If an executor crashes, the task will be sent to other executors
8. If the application end is reached (with SparkContext.stop()) or if the main
method crashes, all tasks will be terminated and resource will be released
14
Simple RDD Creation Step (PySpark)
• Create SparkContext:
from pyspark import SparkContext
sc = SparkContext(“local”, “PySpark Example”)
15
Spark Operations: Transformations
• Transformations are performed when there is any need for
computation over the RDD (data which is distributed over executors)
• Transformations create a new RDD based on its operation
Source: Spark
documentation
16
Spark Operations: Transformations
Source: Spark
documentation
17
Spark Operations: Transformations
Source: Spark
documentation 18
Spark Operations: Actions
19
Spark Operations: Actions
Source: Spark
documentation
20
Example Transformations
• map(func) – takes each input from the data and applies a function (func)
Example:
inputRDD = sc.textFile(input.txt)
mapRDD = inputRDD.map(lambda s: s + ‘-’ + s)
• mapPartitions(func) – similar to map(func), but runs on each partition of RDD. The func
must accept Iterator as input and produce Iterator as output
Example:
def f(partitionData): # User-defined function for a Spark transformation
toReturn = []
for d in partitionData:
toReturn.append(d + ‘-’ + d)
return toReturn
inputArray = [“one”,”two”,”three”,”four”,”five”]
inputRDD = sc.parallelize(inputArray,2)
mapRDD = inputRDD.mapPartitions(f) # Data processing on partitions 21
map(func) Example-1
• Input.txt
• I live in Oklahoma
• I work at OSU
Given program:
inputRDD = sc.textFile(input.txt)
mapRDD = inputRDD.map(lambda s: s + ‘-’ + s)
mapRDD.collect()
Output:
I live in Oklahoma-I live in Oklahoma
I work at OSU-I work at OSU
22
map(func) Example-2
Given program:
def mapFunc(x):
return x + ‘-’ + x
sampleInput = [“I”, “live”, “in”, “Oklahoma”]
inputRDD = sc.parallelize(sampleInput)
mapRDD = inputRDD.map(mapFunc)
mapRDD.collect()
Output:
I-I
live-live
in-in
Oklahoma-Oklahoma
23
map(func) Example-3
Given program:
sampleInp = [[“I”, “enjoy”, “programs”], [“Spark”, ”is”, “amazing”]]
inputRDD = sc.parallelize(sampleInp)
mapRDD = inputRDD.map(lambda s: s + ‘-’ + s)
mapRDD.collect()
Output:
?????
24
Example Transformations
25
Example Transformations
• flatMap(func) – creates an RDD after executing func on each element of the
input. Similar to map(), but flatMap allows to return 0 or more output
elements. Thus, the output type should be Iterator
Example:
putRDD = sc.parallelize(filepath,2)
flatmapRDD = inputRDD.flatMap(lambda line: line.split(“ “))
26
flatMap(func) Example-1
• Input.txt
• I live in Oklahoma
• I work at OSU
Given program:
inputRDD = sc.textFile(input.txt)
flatmapRDD = inputRDD.flatMap(lambda line: line.split(‘ ‘))
flatMapRDD.collect()
Output:
[“I”, “live”, “in”, “Oklahoma”, “I”, “work”, “at”, “OSU”]
27
flatMap(func) Example-2
• Input.txt
• I live in Oklahoma
• I work at OSU
Output:
?????
28
Example Transformations
• filter(func) – creates an RDD after executing filter func on each element of
the input
Example:
inputArray = [1,2,3,4,5,6,7,8]
inputRDD = sc.parallelize(inputArray,2)
mapRDD = inputRDD.filter(lambda num: num % 2 == 0)
results = mapRDD.collect()
29
Example Transformations
• mapValues(func)
• Takes input only as key-value pairs
• Passes the value part of the key-value pair through a map function
• It cannot process the keys
• It also retains original RDD’s partitioning
Example:
inputRDD = sc.parallelize([(“a”, [1,2,3]), (“b”, [5,6])])
def func(x):
return len(x)
inputRDD.mapValues(func).collect()
Output:
[(“a”, 3), (“b”, 2)]
30
Example Transformations
• groupByKey([numPartitions]) – works only for (K,v) pairs. Outputs <K,[V]>,
which are grouped from all data partitions. Use this only to group values.
• reduceByKey(func,[numPartitions]) – similar to groupByKey(), but applies a
func to the grouped values and outputs <K,V>
31
Examples
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.groupByKey.html
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey.html
32
Spark Transformation Types
• Narrow vs Wide transformations
• Narrow transformations:
Series of executions can be performed
in a single node on element-by-element
basis
Re-computation of lost RDD is efficient
• Wide transformations:
Requires to collect data from parent
partitions and shuffled across nodes like
MapReduce
Failures in a node may cause large
amount of re-execution
33
Source: Spark- Cluster Computing with Working Sets paper
Example Actions
• collect() – returns all elements from transformation(s) as an array to the
Driver program
Do not use this for applications which return big chunk of data
• count() – returns number of elements in transformation(s) output
• first() – returns first element in RDD
• take(n) – returns first ‘n’ elements in RDD
• saveAsTextFile(filepath) – writes RDD elements as a text files in the filepath
in local file system or HDFS
34
Spark Documentation
• Check complete list of operation functions in:
1. https://spark.apache.org/docs/latest/index.html
2. https://spark.apache.org/docs/latest/api/python/reference/pyspark.html
35
Example Spark Program: WordCount
Scala Python
val inputRDD = sc.textFile(filepath) inputRDD = sc.textFile(filepath)
val rdd1 = inputRDD.flatMap(line => rdd1 = inputRDD.flatMap(lambda line:
line.split(“ “)) line.split(“ “))
val rdd2 = rdd1.map(word => (word,1)) rdd2 = rdd1.map(lambda word: (word,1))
val rdd3 = rdd2.reduceByKey(_ + _) rdd3 = rdd2.reduceByKey(lambda a, b: a + b)
rdd3.saveAsTextFile(filepath2) rdd3.saveAsTextFile(filepath2)
36
Lazy Transformations
inputRDD = sc.textFile(filepath)
rdd1 = inputRDD.flatMap(lambda line: line.split(“ “))
rdd2 = rdd1.map(lambda word: (word,1))
rdd3 = rdd2.reduceByKey(lambda a, b: a + b)
rdd4 = rdd1.map(lambda word: word + ‘-’)
rdd3.saveAsTextFile(filepath2)
38
Spark Execution Overview Cache
Executor
Results Block 1
inputRDD = sc.textFile(filepath)
Driver
rdd1 = inputRDD.flatMap(lambda line: line.split(“ “)) Results
40
Spark Fault Tolerance – DAG (2)
File read DB read
• Spark handles fault tolerance with the help
RDD-1 RDD-2
of DAGs
• DAGs help to backtrack lineage in case of
any failures in between operations
RDD-3 join
• Example:
Consider RDD is lost due to the executor node
failure
RDD-4 map
Spark uses DAG to backtrack and recompute
the RDD
RDD-5 filter
41
Spark Data Partitioning
• Collection of data partitions is called RDD. A data partition does
not span across nodes
• Each node can have more than one data partitions
• Number of partitions is configurable. Having too many or too few
is not good.
• Configuring partitions is useful when a data is used multiple
times in key-oriented operations like join()
• Why partitioning? Minimize network traffic between operations.
Unpartitioned data creates more data shuffling in wide
transformations like: leftOuterJoin(), groupByKey(),
reduceByKey(), and lookup
• Types of partitions: HashPartitioner, RangePartitioner,and
CustomPartitioner
42
Spark Execution Overview Cache
Executor
Block 1
inputRDD = sc.textFile(filepath) Process & Cache data
Driver
rdd1 = inputRDD.flatMap(lambda line: line.split(“ “))
rdd2 = rdd1.map(lambda word: (word,1)) Cache
Executor
rdd3 = rdd2.reduceByKey(lambda a,b: a + b) Block 2
Cache
rdd3.saveAsTextFile(filepath2) Executor
Process & Cache data
What happens when the following appears after file Block 3 Process & Cache data
save?
rdd4 = rdd1.map(lambda word: word + ”-”)
rdd4.count() These jobs are sequential!
43
Spark Data Persistence
• In some applications same RDD should be used multiple times
without recomputing it
• Spark facilitates three types of RDD persistence:
In-memory deserialized object – used for faster performance but
memory inefficient
In-memory serialized object – used for memory efficient storage but
lower performance
On-disk – used when the RDD is too large to be persisted in memory
• Try to always persist static data while wide transformations
• Spark’s Least Recently Used (LRU) policy
Spark evicts least recently used RDD from memory
Such RDD must recomputed again if necessary
44
Shared Variables
Driver{
• Variable defined in driver can be used in
var count = 1
tasks
count = count + 2
• But each task gets their own copy of the
tasks
variable
println(count)
• Updates made in tasks will not be
}
propagated to Driver
RDD.map(line => {
count = /*What will be the ‘count’ value here???*/
count = count + 2
})
48
Source: Spark- Cluster Computing with Working Sets paper
Conclusion
• Spark is a powerful tool to perform advanced data analytics
• It provides all facilities (graph computation, ML, and SQL) in one
framework
• Fault tolerant
• Easy to use
• Easy to adapt
• Fast processing of big data
• Inefficient for small datasets!
49
Questions???
50
Acknowledgements
Some contents of these slides are motivated by materials from:
1. Dr. Srinivas Akella – UNC Charlotte
2. Spark: The Definitive Guide textbook by Chambers, and Zaharia
3. Learning Spark: Lightning-Fast Data Analysis textbook by Karau,
Konwinski, Wendall, and Zaharia
4. spark.apache.org
5. Zaharia, Matei, et al. "Spark: Cluster computing with working
sets." HotCloud 10.10-10 (2010): 95. (Paper)
6. Zaharia, Matei, et al. "Resilient distributed datasets: A fault-tolerant
abstraction for in-memory cluster computing." Presented as part of
the 9th {USENIX} Symposium on Networked Systems Design and
Implementation ({NSDI} 12). 2012. (Paper)
51