CS246: Mining Massive Datasets Jure Leskovec, Stanford University
http://cs246.stanford.edu
CPU
Machine Learning, Statistics
Memory
Classical Data Mining
Disk
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
20+ billion web pages x 20KB = 400+ TB 1 computer reads 30-35 MB/sec from disk
~4 months to read the web
~1,000 hard drives to store the web Even more to do something with the data
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
Web data sets are massive Cannot mine on a single server Standard architecture emerging: How to organize computations on this architecture?
Mask issues such as hardware failure Cluster of commodity Linux nodes Gigabit ethernet interconnect Tens to hundreds of terabytes
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
Traditional big-iron box (circa 2003)
8 2GHz Xeons 64GB RAM 8TB disk 758,000 USD
Prototypical Google rack (circa 2003)
176 2GHz Xeons 176GB RAM ~7TB disk 278,000 USD
In Aug 2006 Google had ~450,000 machines
Jure Leskovec, Stanford C246: Mining Massive Datasets 5
1/3/2011
2-10 Gbps backbone between racks 1 Gbps between any pair of nodes in a rack Switch Switch
Switch
CPU Mem Disk
CPU
CPU Mem Disk
CPU
Mem Disk
Mem Disk
Each rack contains 16-64 nodes
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
Yahoo M45 cluster:
Datacenter in a Box (DiB) 1000 nodes, 4000 cores, 3TB RAM, 1.5PB disk High bandwidth connection to Internet Located on Yahoo! campus Worlds top 50 supercomputer
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
Large scale computing for data mining problems on commodity hardware: Challenges:
PCs connected in a network Process huge datasets on many computers How do you distribute computation? Distributed/parallel programming is hard Machines fail
Map-reduce addresses all of the above
Googles computational/data manipulation model Elegant way to work with big data
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
Implications of such computing environment:
Single machine performance does not matter
Add more machines
Machines break:
One server may stay up 3 years (1,000 days) If you have 1,0000 servers, expect to loose 1/day
How can we make it easy to write distributed programs?
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
Idea:
Bring computation close to the data Store files multiple times for reliability
Need:
Programming model
Map-Reduce
Infrastructure File system
Google: GFS Hadoop: HDFS
1/3/2011 Jure Leskovec, Stanford C246: Mining Massive Datasets 10
Problem: Answer:
If nodes fail, how to store data persistently? Distributed File System:
Typical usage pattern
Provides global file namespace Google GFS; Hadoop HDFS; Kosmix KFS
Huge files (100s of GB to TB) Data is rarely updated in place Reads and appends are common
1/3/2011 Jure Leskovec, Stanford C246: Mining Massive Datasets 11
Chunk Servers:
File is split into contiguous chunks Typically each chunk is 16-64MB Each chunk replicated (usually 2x or 3x) Try to keep replicas in different racks
Master node:
Client library for file access:
a.k.a. Name Nodes in Hadoops HDFS Stores metadata Might be replicated Talks to master to find chunk servers Connects directly to chunkservers to access data
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
12
Reliable distributed file system for petabyte scale Data kept in chunks spread across thousands of machines Each chunk replicated on different machines
Seamless recovery from disk or machine failure
C0 C5 C1 C2 D0 C5 C1 C3 C2 D0 C5 D1 C0 C5 C2
D0
Chunk server 1
Chunk server 2
Chunk server 3
Chunk server N
Bring computation directly to the data!
1/3/2011 Jure Leskovec, Stanford C246: Mining Massive Datasets 13
We have a large file of words:
one word per line
Count the number of times each distinct word appears in the file Sample application:
Analyze web server logs to find popular URLs
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
14
Case 1: Case 2: Case 3:
Entire file fits in memory File too large for memory, but all <word, count> pairs fit in memory File on disk, too many distinct words to fit in memory:
sort datafile | uniq c
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
15
Suppose we have a large corpus of documents Count occurrences of words:
words(docs/*) | sort | uniq -c
where words takes a file and outputs the words in it, one per a line
Captures the essence of MapReduce
Great thing is it is naturally parallelizable
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
16
Read a lot of data Map: Shuffle and Sort Reduce: Write the result
Extract something you care about
Aggregate, summarize, filter or transform
Outline stays the same, map and reduce change to fit the problem
1/3/2011 Jure Leskovec, Stanford C246: Mining Massive Datasets 17
Program specifies two primary methods:
Map(k,v) <k, v>* Reduce(k, <v>*) <k, v>*
All values v with same key k are reduced together and processed in v order
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
18
Provided by the programmer MAP:
reads input and produces a set of key value pairs
Provided by the programmer Group by key:
Collect all pairs with same key
Reduce:
Collect all values belonging to the key and output
The crew of the space shuttle Endeavor recently returned to Earth as ambassadors, harbingers of a new era of space exploration. Scientists at NASA are saying that the recent assembly of the Dextre bot is the first step in a longterm space-based man/machine partnership. '"The work we're doing now -the robotics we're doing -- is what we're going to need to do to build any work station or habitat structure on the moon or Mars," said Allard Beutel.
(the, 1) (crew, 1) (of, 1) (the, 1) (space, 1) (shuttle, 1) (Endeavor, 1) (recently, 1) . (key, value)
(crew, 1) (crew, 1) (space, 1) (the, 1) (the, 1) (the, 1) (shuttle, 1) (recently, 1) (key, value)
(crew, 2) (space, 1) (the, 3) (shuttle, 1) (recently, 1) (key, value)
Big document
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
Only sequential reads Sequentially read the data
19
map(key, value): // key: document name; value: text of document for each word w in value: emit(w, 1)
reduce(key, values): // key: a word; value: an iterator over counts result = 0 for each count v in values: result += v emit(result)
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
20
Map-Reduce environment takes care of:
Partitioning the input data Scheduling the programs execution across a set of machines Handling machine failures Managing required inter-machine communication
Allows programmers without a PhD in parallel and distributed systems to use large distributed clusters
Jure Leskovec, Stanford C246: Mining Massive Datasets 21
1/3/2011
Big document MAP:
reads input and produces a set of key value pairs
Group by key:
Collect all pairs with same key
Reduce:
Collect all values belonging to the key and output
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
22
Programmer specifies: Workflow:
Map and Reduce and input files
Input 0
Input 1
Input 2
Read inputs as a set of key-valuepairs Map transforms input kv-pairs into a new set of k'v'-pairs Sorts & Shuffles the k'v'-pairs to output nodes All kv-pairs with a given k are sent to the same reduce Reduce processes all k'v'-pairs grouped by key into new k''v''-pairs Write the resulting pairs to files
Map 0
Map 1
Map 2
Shuffle
Reduce 0
Reduce 1
Out 0
Out 1
All phases are distributed with many tasks doing the work
Jure Leskovec, Stanford C246: Mining Massive Datasets 23
1/3/2011
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
24
Input and final output are stored on a distributed file system:
Scheduler tries to schedule map tasks close to physical storage location of input data
Intermediate results are stored on local FS of map and reduce workers Output is often input to another map reduce task
Jure Leskovec, Stanford C246: Mining Massive Datasets 25
1/3/2011
Master data structures:
Task status: (idle, in-progress, completed) Idle tasks get scheduled as workers become available When a map task completes, it sends the master the location and sizes of its R intermediate files, one for each reducer Master pushes this info to reducers
Master pings workers periodically to detect failures
Jure Leskovec, Stanford C246: Mining Massive Datasets 26
1/3/2011
Map worker failure
Map tasks completed or in-progress at worker are reset to idle Reduce workers are notified when task is rescheduled on another worker Only in-progress tasks are reset to idle MapReduce task is aborted and client is notified
Reduce worker failure Master failure
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
27
M map tasks, R reduce tasks Rule of thumb:
Make M and R much larger than the number of nodes in cluster One DFS chunk per map is common Improves dynamic load balancing and speeds recovery from worker failure because output is spread across R files
Usually R is smaller than M
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
28
Fine granularity tasks: map tasks >> machines
Minimizes time for fault recovery Can pipeline shuffling with map execution Better dynamic load balancing
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
29
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
30
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
31
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
32
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
33
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
34
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
35
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
36
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
37
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
38
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
39
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
40
Want to simulate disease spreading in a network Input
Map
Each line: node id, virus parameters Reads a line of input and simulate the virus Output: triplets (node id, virus id, hit time) Collect the node IDs and see which nodes are most vulnerable
Reduce
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
41
Statistical machine translation:
Need to count number of times every 5-word sequence occurs in a large corpus of documents
Easy with MapReduce:
Map:
Extract (5-word sequence, count) from document
Reduce:
Combine counts
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
42
Suppose we have a large web corpus Look at the metadata file
Lines of the form (URL, size, date, )
For each host, find the total number of bytes
i.e., the sum of the page sizes for all URLs from that host
Other examples:
Link analysis and graph processing Machine Learning algorithms
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
43
Google Hadoop
Not available outside Google An open-source implementation in Java Uses HDFS for stable storage Download: http://lucene.apache.org/hadoop/ Aster Data Cluster-optimized SQL Database that also implements MapReduce
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
44
Ability to rent computing by the hour
Additional services e.g., persistent storage
Amazons Elastic Compute Cloud (EC2) Aster Data and Hadoop can both be run on EC2 For CS345 (offered next quarter) Amazon will provide free access for the class
Jure Leskovec, Stanford C246: Mining Massive Datasets 45
1/3/2011
Problem:
Slow workers significantly lengthen the job completion time:
Other jobs on the machine Bad disks Weird things
Solution: Effect:
Near end of phase, spawn backup copies of tasks
Whichever one finishes first wins
Dramatically shortens job completion time
1/3/2011 Jure Leskovec, Stanford C246: Mining Massive Datasets 46
Backup tasks reduce job time System deals with failures
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
47
Often a map task will produce many pairs of the form (k,v1), (k,v2), for the same key k
E.g., popular words in Word Count
Can save network time by pre-aggregating at mapper:
combine(k1, list(v1)) v2 Usually same as the reduce function
Works only if reduce function is commutative and associative
Jure Leskovec, Stanford C246: Mining Massive Datasets 48
1/3/2011
Inputs to map tasks are created by contiguous splits of input file Reduce needs to ensure that records with the same intermediate key end up at the same worker System uses a default partition function:
hash(key) mod R
Sometimes useful to override:
E.g., hash(hostname(URL)) mod R ensures URLs from a host end up in the same output file
Jure Leskovec, Stanford C246: Mining Massive Datasets 49
1/3/2011
Jeffrey Dean and Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters http://labs.google.com/papers/mapreduce.html
Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung, The Google File System http://labs.google.com/papers/gfs.html
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
50
Hadoop Wiki
Introduction
http://wiki.apache.org/lucene-hadoop/
Getting Started
http://wiki.apache.org/lucene-hadoop/GettingStartedWithHadoop
Map/Reduce Overview
http://wiki.apache.org/lucene-hadoop/HadoopMapReduce http://wiki.apache.org/lucene-hadoop/HadoopMapRedClasses
Eclipse Environment
http://wiki.apache.org/lucene-hadoop/EclipseEnvironment
Javadoc
http://lucene.apache.org/hadoop/docs/api/
1/3/2011
Jure Leskovec, Stanford C246: Mining Massive Datasets
51
Releases from Apache download mirrors http://www.apache.org/dyn/closer.cgi/lucene/hado op/ Nightly builds of source http://people.apache.org/dist/lucene/hadoop/nightl y/ Source code from subversion http://lucene.apache.org/hadoop/version_control. html
1/3/2011 Jure Leskovec, Stanford C246: Mining Massive Datasets 52
Programming model inspired by functional language primitives Partitioning/shuffling similar to many large-scale sorting systems Re-execution for fault tolerance Locality optimization has parallels with Active Disks/Diamond work Backup tasks similar to Eager Scheduling in Charlotte system Dynamic load balancing solves similar problem as River's distributed queues
River ['99]
Jure Leskovec, Stanford C246: Mining Massive Datasets 53
NOW-Sort ['97]
BAD-FS ['04] and TACC ['97]
Active Disks ['01], Diamond ['04] Charlotte ['96]
1/3/2011