Note to other teachers and users of these slides: We would be delighted if you found this our
material useful in giving your own lectures. Feel free to use these slides verbatim, or to modify
them to fit your own needs. If you make use of a significant portion of these slides in your own
lecture, please include this message, or a link to our web site: http://www.mmds.org
Map-Reduce and
the New Software
Stack
Mining of Massive Datasets
Jure Leskovec, Anand Rajaraman, Jeff
Ullman Stanford University
http://www.mmds.org
MapReduce
Much
of the course will be devoted to
large scale computing for data
mining
Challenges:
How to distribute computation?
Distributed/parallel programming is hard
Map-reduce
addresses all of the above
Googles computational/data manipulation
model
Elegant way to work with big data
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Single Node Architecture
CPU
Machine Learning, Statistics
Memory
Classical Data Mining
Disk
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Motivation: Google
Example
20+
billion web pages x 20KB = 400+ TB
1 computer reads 30-35 MB/sec from disk
~4 months to read the web
~1,000
hard drives to store the web
Takes even more to do something useful
with the data!
Today, a standard architecture for
such problems is emerging:
Cluster of commodity Linux nodes
Commodity network (ethernet) to connect them
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Cluster Architecture
2-10 Gbps backbone between racks
1 Gbps between
any pair of nodes
in a rack
Switch
Switch
CPU
Mem
Disk
Switch
CPU
CPU
Mem
Mem
Disk
Disk
CPU
Mem
Disk
Each rack contains 16-64 nodes
In 2011 it was guestimated that Google had 1M machines, http://bit.ly/Shh0RO
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Large-scale Computing
Large-scale
computing for data
mining
problems on commodity hardware
Challenges:
How do you distribute computation?
How can we make it easy to write
distributed programs?
Machines fail:
One server may stay up 3 years (1,000 days)
If you have 1,000 servers, expect to loose 1/day
People estimated Google had ~1M machines in
2011
1,000 machines fail every day!
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Idea and Solution
Issue:
Copying data over a network
takes time
Idea:
Bring computation close to the data
Store files multiple times for reliability
Map-reduce
addresses these problems
Googles computational/data manipulation model
Elegant way to work with big data
Storage Infrastructure File system
Google: GFS. Hadoop: HDFS
Programming model
Map-Reduce
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Storage Infrastructure
Problem:
If nodes fail, how to store data
persistently?
Answer:
Distributed File System:
Provides global file namespace
Google GFS; Hadoop HDFS;
Typical
usage pattern
Huge files (100s of GB to TB)
Data is rarely updated in place
Reads and appends are common
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Distributed File System
Chunk
servers
File is split into contiguous chunks
Typically each chunk is 16-64MB
Each chunk replicated (usually 2x or 3x)
Try to keep replicas in different racks
Master
node
a.k.a. Name Node in Hadoops HDFS
Stores metadata about where files are stored
Might be replicated
Client
library for file access
Talks to master to find chunk servers
Connects directly to chunk servers to access
data
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
10
Distributed File System
Reliable
distributed file system
Data kept in chunks spread across
machines
Each chunk replicated on different
machines
Seamless
C
C
D
C
C
Cdisk or machine
C
C
recovery
from
failure
D
D C
D
C
C
C
C
0
Chunk server 1
Chunk server 2
Chunk server 3
Chunk server N
Bring computation directly to the
data!
Chunk servers also serve as compute
servers
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
11
Programming Model: MapReduce
Warm-up task:
We have a huge text document
Count
the number of times each
distinct word appears in the file
Sample
application:
Analyze web server logs to find popular
URLs
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
12
Task: Word Count
Case 1:
File too large for memory, but all <word,
count> pairs fit in memory
Case 2:
Count occurrences of words:
words(doc.txt) | sort | uniq -c
where words takes a file and outputs the words in
it, one per a line
Case
2 captures the essence of
MapReduce
Great thing is that it is naturally
parallelizable
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
13
MapReduce: Overview
Sequentially
Map:
read a lot of data
Extract something you care about
Group by
Reduce:
key: Sort and Shuffle
Aggregate, summarize, filter or
transform
Write
the result
Outline stays the same, Map and
Reduce change to fit the
problem
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
14
MapReduce: The Map
Step
Input
key-value pairs
Intermediate
key-value pairs
k
map
map
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
15
MapReduce: The Reduce
Step
Intermediate
key-value pairs
k
Key-value groups
k
Group
by key
reduce
reduce
Output
key-value pairs
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
16
More Specifically
Input: a set
Programmer
of key-value pairs
specifies two methods:
Map(k, v) <k, v>*
Takes a key-value pair and outputs a set of keyvalue pairs
E.g., key is the filename, value is a single line in the file
There is one Map call for every (k,v) pair
Reduce(k, <v>*) <k, v>*
All values v with same key k are reduced
together
and processed in v order
There is one Reduce function call per unique
key k
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
17
MapReduce: Word
Counting
Read input and
produces a set
of key-value
pairs
The crew of the space
shuttle Endeavor recently
returned to Earth as
ambassadors, harbingers
of a new era of space
exploration. Scientists at
NASA are saying that the
recent assembly of the
Dextre bot is the first step
in a long-term space-based
man/mache
partnership.
'"The work we're doing now
-- the robotics we're doing
-- is what we're going to
need ..
Big document
(The, 1)
(crew, 1)
(of, 1)
(the, 1)
(space, 1)
(shuttle, 1)
(Endeavor,
1)
(recently, 1)
.
(key, value)
Group by
key:
Collect all pairs
with same key
Provided by the
programmer
Reduce:
Collect all
values
belonging to
the key and
output
(crew, 1)
(crew, 1)
(space, 1)
(the, 1)
(the, 1)
(the, 1)
(shuttle, 1)
(recently, 1)
(crew, 2)
(space, 1)
(the, 3)
(shuttle, 1)
(recently, 1)
(key, value)
(key, value)
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Only
sequential
reads
Sequentially
read the
data
Provided by the
programmer
MAP:
18
Word Count Using
MapReduce
map(key, value):
// key: document name; value: text of the document
for each word w in value:
emit(w, 1)
reduce(key, values):
// key: a word; value: an iterator over counts
result = 0
for each count v in values:
result += v
emit(key, result)
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
19
Map-Reduce: Environment
Map-Reduce environment takes
care of:
Partitioning the input data
Scheduling the programs execution
across a
set of machines
Performing the group by key step
Handling machine failures
Managing required inter-machine
communication
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
20
Map-Reduce: A diagram
Big document
MAP:
Read input and
produces a set
of key-value
pairs
Group by
key:
Collect all pairs
with same key
(Hash merge,
Shuffle, Sort,
Partition)
Reduce:
Collect all
values
belonging to the
key and output
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
21
Map-Reduce: In Parallel
All phases are distributed with many tasks
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
22
Map-Reduce
Programmer
specifies:
Map and Reduce and input files
Input 0
Input 1
Input 2
Workflow:
Read inputs as a set of key-value-pairs
Map 0
Map 1
Map 2
Map transforms input kv-pairs into a
new set of k'v'-pairs
Sorts & Shuffles the k'v'-pairs to
Shuffle
output nodes
All kv-pairs with a given k are sent
to the same reduce
Reduce 0
Reduce 1
Reduce processes all k'v'-pairs
grouped by key into new k''v''-pairs
Write the resulting pairs to files
All
phases are distributed with
many tasks doing the work
Out 0
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Out 1
23
Data Flow
Input
and final output are stored
on a distributed file system (FS):
Scheduler tries to schedule map tasks
close to physical storage location of
input data
Intermediate
results are stored
on local FS
of Map and Reduce workers
Output
is often input to another
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
24
Coordination: Master
Master
node takes care of
coordination:
Task status: (idle, in-progress, completed)
Idle tasks get scheduled as workers become
available
When a map task completes, it sends the
master the location and sizes of its R
intermediate files, one for each reducer
Master pushes this info to reducers
Master
pings workers periodically to
detect failures
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
25
Dealing with Failures
Map
worker failure
Map tasks completed or in-progress at
worker are reset to idle
Reduce workers are notified when task is
rescheduled on another worker
Reduce
worker failure
Only in-progress tasks are reset to idle
Reduce task is restarted
Master
failure
MapReduce task is aborted and client is
notified
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
26
How many Map and Reduce jobs?
M map tasks, R reduce
Rule of a thumb:
tasks
Make M much larger than the number
of nodes in the cluster
One DFS chunk per map is common
Improves dynamic load balancing and
speeds up recovery from worker
failures
Usually
R is smaller than M
Because output is spread across R files
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
27
Task Granularity & Pipelining
Fine
granularity tasks: map tasks
>> machines
Minimizes time for fault recovery
Can do pipeline shuffling with map
execution
Better dynamic load balancing
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
28
Refinements: Backup
Tasks
Problem
Slow workers significantly lengthen the job
completion time:
Other jobs on the machine
Bad disks
Weird things
Solution
Near end of phase, spawn backup copies of
tasks
Whichever one finishes first wins
Effect
Dramatically shortens job completion time
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
29
Refinement: Combiners
Often
a Map task will produce many
pairs of the form (k,v1), (k,v2), for the
same key k
E.g., popular words in the word count
example
Can
save network time by
pre-aggregating values in
the mapper:
combine(k, list(v1)) v2
Combiner is usually same
as the reduce function
Works
only if reduce
function is commutative and associative
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
30
Refinement: Combiners
Back
to our word counting example:
Combiner combines the values of all keys of a
single mapper (single machine):
Much less data needs to be copied and
shuffled!
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
31
Refinement: Partition
Function
Want
to control how keys get
partitioned
Inputs to map tasks are created by contiguous
splits of input file
Reduce needs to ensure that records with the
same intermediate key end up at the same worker
System
uses a default partition
function:
hash(key) mod R
Sometimes
function:
useful to override the hash
E.g., hash(hostname(URL)) mod R ensures URLs from a
host end up in the same output file
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
32
Problems Suited for
Map-Reduce
Example: Host size
Suppose
we have a large web corpus
Look at the metadata file
Lines of the form: (URL, size, date, )
For
each host, find the total number
of bytes
That is, the sum of the page sizes for all URLs
from that particular host
Other
examples:
Link analysis and graph processing
Machine Learning algorithms
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
34
Example: Language
Model
Statistical
machine translation:
Need to count number of times every 5word sequence occurs in a large corpus
of documents
Very
easy with MapReduce:
Map:
Extract (5-word sequence, count) from
document
Reduce:
Combine the counts
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
35
Example: Join By MapReduce
Compute
the natural join R(A,B)
S(B,C)
R and S are each stored in files
Tuples are pairs (a,b) or (b,c)
A
a1
b1
b2
c1
a2
b1
a3
c1
b2
c2
a3
b2
a3
c2
b3
c3
a4
b3
a4
c3
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
36
Map-Reduce Join
Use
a hash function h from Bvalues to 1...k
A Map process turns:
Each input tuple R(a,b) into key-value pair
(b,(a,R))
Each input tuple S(b,c) into (b,(c,S))
Map
processes send each key-value
pair with key b to Reduce process h(b)
Hadoop does this automatically; just tell it
what k is.
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
37
Cost Measures for
Algorithms
1.
2.
3.
In MapReduce we quantify the
cost of an algorithm using
Communication cost = total I/O of all
processes
Elapsed communication cost = max of
I/O along any path
(Elapsed) computation cost
analogous, but count only running
time of processes
Note that here the big-O notation is not the most useful
(adding more machines is always an option)
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
38
Example: Cost Measures
For
a map-reduce algorithm:
Communication cost = input file size
+ 2 (sum of the sizes of all files passed
from Map processes to Reduce
processes) + the sum of the output sizes
of the Reduce processes.
Elapsed communication cost is the
sum of the largest input + output for any
map process, plus the same for any
reduce process
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
39
What Cost Measures
Mean
Either
the I/O (communication) or
processing (computation) cost
dominates
Ignore one or the other
Total
cost tells what you pay in rent
from
your friendly neighborhood cloud
Elapsed
cost is wall-clock time using
parallelism
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
40
Cost of Map-Reduce Join
Total
communication cost
= O(|R|+|S|+|R S|)
Elapsed communication cost = O(s)
Were going to pick k and the number of Map
processes so that the I/O limit s is respected
We put a limit s on the amount of input or
output that any one process can have. s could
be:
What fits in main memory
What fits on local disk
With
proper indexes, computation cost is
linear in the input + output size
So computation cost is like comm. cost
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
41
Pointers and Further
Reading
Implementations
Google
Not available outside Google
Hadoop
An open-source implementation in Java
Uses HDFS for stable storage
Download: http://lucene.apache.org/hadoop/
Aster Data
Cluster-optimized SQL Database that
also implements MapReduce
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
43
Cloud Computing
Ability
to rent computing by the hour
Additional services e.g., persistent
storage
Amazons
Elastic Compute Cloud
(EC2)
Aster
Data and Hadoop can both be
run on EC2
For
CS341 (offered next quarter)
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
44
Reading
Jeffrey
Dean and Sanjay Ghemawat:
MapReduce: Simplified Data
Processing on Large Clusters
http://labs.google.com/papers/mapreduc
e.html
Sanjay
Ghemawat, Howard Gobioff,
and Shun-Tak Leung: The Google File
System
http://labs.google.com/papers/gfs.html
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
45
Resources
Hadoop
Wiki
Introduction
http://wiki.apache.org/lucene-hadoop/
Getting Started
http://wiki.apache.org/lucene-hadoop/GettingSta
rtedWithHadoop
Map/Reduce Overview
http://wiki.apache.org/lucene-hadoop/HadoopMap
Reduce
http://wiki.apache.org/lucene-hadoop/HadoopMap
RedClasses
Eclipse Environment
http://wiki.apache.org/lucene-hadoop/EclipseEnvi
ronment J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
46
Resources
Releases from Apache download
mirrors
http://www.apache.org/dyn/closer.cgi/lu
cene/hadoop/
Nightly builds of source
http://people.apache.org/dist/lucene/had
oop/nightly/
Source code from subversion
http://lucene.apache.org/hadoop/version
_control.html
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
47
Further Reading
Programming
model inspired by functional language
primitives
Partitioning/shuffling similar to many large-scale sorting
systems
NOW-Sort ['97]
Re-execution
for fault tolerance
BAD-FS ['04] and TACC ['97]
Locality
optimization has parallels with Active
Disks/Diamond work
Active Disks ['01], Diamond ['04]
Backup
tasks similar to Eager Scheduling in Charlotte
system
Charlotte ['96]
Dynamic
load balancing solves similar problem as River's
distributed queues
River ['99]
J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
48