Lecture 4 - Spark Introduction
Lecture 4 - Spark Introduction
Spark Introduction
1
06/10/2024
Agenda
• History of Spark
• Introduction
• Components of Stack
• Resilient Distributed Dataset – RDD
History of Spark
2004 2010
MapReduce paper Spark paper
2006
Hadoop @Yahoo!
2
06/10/2024
History of Spark
circa 1979 – Stanford, MIT, CMU, etc.
set/list operations in LISP, Prolog, etc., for parallel processing
www-formal.stanford.edu/ jmc/ history / lisp/ lisp.htm
MapReduce
Most current cluster programming models are
based on acyclic data flow from stable storage to
stable storage
Map
Reduce
Reduce
Map
3
06/10/2024
MapReduce
• Acyclic data flow is inefficient for applications that
repeatedly reuse a working set of data:
• Iterative algorithms (machine learning, graphs)
• Interactive data mining tools (R, Excel, Python)
4
06/10/2024
Specialized Systems
Pregel Giraph
MapReduce
Impala GraphLab
Storm S4
Couchbase
C ouchDB vs ib vs H20
MongoDB vs Neo4j vs Tita Mahout vs M
Ll
n vs
Giraph vs Ori
entDB
csearch
Solr vs Elasti
10
5
06/10/2024
11
Specialized Systems
(2007 – 2015?)
Giraph
(2004 – 2013)
Pregel (2014 – ?)
Tez
Drill
Dremel
Mahout
Storm
S4
Impala
GraphLab
Specialized
Systems
General Batch Processing (iterative, interactive, ML, streaming, graph, SQL, etc) General Unified Engine
12
6
06/10/2024
vs
YARN Mesos
Tachyon
SQL
MLlib
Streaming
13
10x – 100x
14
7
06/10/2024
15
Tnew (< T)
16
8
06/10/2024
Berkeley AMPLab
§ “Launched” January 2011: 6 Year Plan
• 8 CS Faculty
• ~40 students
• 3 software engineers
• Organized for collaboration:
17
Berkeley AMPLab
• Funding:
• XData, CISE Expedition Grant
18
9
06/10/2024
Databricks
Databricks Cloud:
“A unified platform for building Big Data pipelines – from
ETL to Exploration and Dashboards, to Advanced Analytics
and Data Products.”
19
20
10
06/10/2024
History of Spark
2004 2010
MapReduce paper Spark paper
21
History of Spark
“We present Resilient Distributed Datasets
(RDDs), a distributed memory abstraction that
lets programmers perform in-memory
computations on large clusters in a fault-
tolerant manner.
April 2012
22
11
06/10/2024
History of Spark
23
History of Spark
TwitterUtils.createStream(...)
.filter(_.getText.contains("Sp
ark"))
.countByWindow(Seconds(5))
24
12
06/10/2024
History of Spark
25
History of Spark
https://amplab.cs.berkeley.edu/wp-content/uploads/2013/05/grades-graphx_with_fonts.pdf
26
13
06/10/2024
History of Spark
https://www.cs.berkeley.edu/~sameerag/blinkdb_eurosys13.pdf
27
History of Spark
• Unlike the various specialized systems, Spark’s goal
was to generalize MapReduce to support new apps
within same engine
• Two reasonably small additions are enough to
express the previous models:
• fast data sharing
• general DAGs
• This allows for an approach which is more efficient
for the engine, and much simpler for the end users
28
14
06/10/2024
29
30
15
06/10/2024
df = spark.read.json("logs.json")
df.where("age > 21")
select("name.first").show()
Spark's Python DataFrame API
Read JSON files with automatic schema inference
31
32
16
06/10/2024
33
34
17
06/10/2024
35
36
18
06/10/2024
Components of Stack
37
38
19
06/10/2024
39
40
20
06/10/2024
41
42
21
06/10/2024
RDD Basics
• RDD:
• Immutable distributed collection of objects
• Split into multiple partitions => can be computed on
different nodes
• All work in Spark is expressed as
• creating new RDDs
• transforming existing RDDs
• calling actions on RDDs
43
Example
Load error messages from a log into memory, then
interactively search for various patterns
BaseTransformed
RDD Cache 1
RDD
lines = spark.textFile(“hdfs://...”) Worker
results
errors = lines.filter(_.startsWith(“ERROR”))
messages = errors.map(_.split(‘\t’)(2)) tasks Block 1
Driver
cachedMsgs = messages.cache()
Action
cachedMsgs.filter(_.contains(“foo”)).count
cachedMsgs.filter(_.contains(“bar”)).count Cache 2
Worker
. . .
Cache 3
Worker Block 2
Block 3
44
22
06/10/2024
RDD Basics
• Two types of operations: transformations and
actions
• Transformations: construct a new RDD from a
previous one e.g., filter data
• Actions: compute a result base on an RDD e.g.,
count elements, get first element
45
Transformations
• Create new RDDs from existing RDDs
• Lazy evaluation
• See the whole chain of transformations
• Compute just the data needed
• Persist contents:
• persist an RDD in memory to reuse it in future
• persist RDDs on disk is possible
46
23
06/10/2024
47
48
24
06/10/2024
49
50
25
06/10/2024
51
RDD Operations
• Two types of operations
• Transformations: operations that return a new RDDs e.g.,
map(), filter()
• Actions: operations that return a result to the driver
program or write it to storage such as count(), first()
• Treated differently by Spark
• Transformation: lazy evaluation
• Action: execution at any time
52
26
06/10/2024
Transformation
• Example 1. Use filter()
• Python
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)
• Scala
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line =>
line.contains("error"))
• Java
JavaRDD<String> inputRDD = sc.textFile("log.txt");
JavaRDD<String> errorsRDD = inputRDD.filter(
new Function<String, Boolean>() {
public Boolean call(String x) {
return x.contains("error"); }}
});
53
Transformation
• filter()
• does not change the existing inputRDD
• returns a pointer to an entirely new RDD
• inputRDD still can be reused
• union()
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD=inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
• transformations can operate on any number of input
RDDs
54
27
06/10/2024
Transformation
• Spark keeps track dependencies between RDDs,
called the lineage graph
• Allow recovering lost data
55
Actions
• Example. count the number of errors
• Python
print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
print line
• Scala
println("Input had " + badLinesRDD.count() + " concerning lines")
println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)
• Java
System.out.println("Input had " + badLinesRDD.count() + " concerning
lines")
System.out.println("Here are 10 examples:")
for (String line: badLinesRDD.take(10)) {
System.out.println(line);
}
56
28
06/10/2024
57
RDD Basics
Transformations Actions
map reduce
flatMap collect
filter count
sample save
union lookupKey
groupByKey …
reduceByKey
join
cache
…
58
29
06/10/2024
Transformations
59
Transformations
60
30
06/10/2024
Actions
61
Actions
62
31
06/10/2024
63
Persistence levels
64
32
06/10/2024
Persistence
• Example
65
DataFrame
• A primary abstraction in Spark 2.0
• Immutable once constructed
• Track lineage information to efficiently re-compute lost data
• Enable operations on collection of elements in parallel
• To construct DataFrame
• By parallelizing existing Python collections (lists)
• By transforming an existing Spark or pandas DataFrame
• From files in HDFS or other storage system
66
66
33
06/10/2024
Using DataFrame
>>> data = [(‘Alice’, 1), (‘Bob’, 2), (‘Bob’, 2)]
>>> df1 = sqlContext.createDataFrame(data, [‘name’,
‘age’])
[Row(name=u’Alice’, age=1),
Row=(name=u’Bob’, age=2),
Row=(name=u’Bob’, age=2)]
67
67
Transformations
• Create new DataFrame from an existing one
• Use lazy evaluation
• Nothing executes
• Spark saves recipe for transformation source
Transformation Description
select(*cols) Selects columns from this DataFrame
drop(col) Returns a new Dataframe that drops the specific column
sort(*cols, **kw) Returns a new DataFrame sorted by the specified columns and in
the sort order specified by kw
68
68
34
06/10/2024
Using Transformations
>>> data = [(‘Alice’, 1), (‘Bob’, 2), (‘Bob’, 2)]
>>> df1 = sqlContext.createDataFrame(data, [‘name’,
‘age’])
>>> df2 = df1.distinct()
[Row(name=u’Alice’, age=1), Row=(name=u’Bob’,
age=2)]
>>> df3 = df2.sort(“age”, asceding=False)
[Row=(name=u’Bob’, age=2), Row(name=u’Alice’,
age=1)]
69
69
Actions
• Cause Spark to execute recipe to transform source
• Mechanisms for getting results out of Spark
Action Description
show(n, truncate) Prints the first n rows of this DataFrame
take(n) Returns the first n rows as a list of Row
collect() Returns all the records as a list of Row (*)
count() Returns the number of rows in this DataFrame
70
70
35
06/10/2024
Using Actions
>>> data = [(‘Alice’, 1), (‘Bob’, 2)]
>>> df = sqlContext.createDataFrame(data, [‘name’, ‘age’])
>>> df.collect()
[Row(name=u’Alice’, age=1), Row=(name=u’Bob’, age=2)]
>>> df.count()
2
>>> df.show()
+-------+--------+
|name| age |
+-------+-------+
|Alice| 1|
|Bob | 2|
+-----+-------+
71
71
Caching
>>> linesDF = sqlContext.read.text(‘…’)
>>> linesDF.cache()
>>> commentsDF = linesDF.filter(isComment)
>>> print linesDF.count(), commentsDF.count()
>>> commentsDF.cache()
72
72
36
06/10/2024
73
73
74
37
06/10/2024
df.write.
format("parquet").
mode("append").
partitionBy("year").
saveAsTable("faster-stuff")
46
75
df.write.
format("parquet"). read and write
mode("append"). functions create
partitionBy("year").
saveAsTable("faster-stuff")
new buildersfor
doing I/O
47
76
38
06/10/2024
val df = sqlContext.
read.
format("json"). Builder
"methods
}
option("samplingRatio", "0.1").
)
specify:
load("/Users/spark/data/stuff.json
• format
df.write.
mode("append").
• partitioning
} • handling of
format("parquet").
existing data
partitionBy("year").
saveAsTable("faster- 48
stuff")
77
49
78
39
06/10/2024
built-in external
JDBC
{ JSON }
and more …
79
80
40
06/10/2024
81
82
41
06/10/2024
Architecture
• A master-worker type architecture
• A driver or master node
• Worker nodes
83
83
Architecture(2)
• A Spark program first creates a SparkContext object
• SparkContext tells Spark how and where to access a
cluster
• The master parameter for a SparkContext determines
Master parameter Description
which type and size of cluster to use
local Run Spark locally with one worker thread (no parallelism)
local[K] Run Spark locally with K worker threads (ideal set to number of
cores)
spark://HOST:PORT Connect to a Spark standalone cluster
mesos://HOST:PORT Connect to a Mesos cluster
yarn Connect to a YARN cluster
84
84
42
06/10/2024
85
85
Books:
86
43
06/10/2024
Q&A
87
88
44
06/10/2024
Thank you
for your
attentions!
89
90
45