MapReduce - A Programing
Model
Agenda
Introduction to MapReduce
Overview
Features of MapReduce
MapReduce: Terminology
Hadoop Cluster
Job Tracker and Task Tracker
MapReduce Data Flow
InputFormat
InputSplits
Record Reader
Mapper
Partition and Shuffle Sort
OutputFormat
Record Writer
Agenda
Anatomy of a MapReduce Job Run
Overview
Job Submission
Job Initialization
Task Assignment
Task Execution
Progress and Status updates
Job Completion
Failures in MapReduce
Task Failure
Tasktracker Failure
Jobtracker Failure
Job Scheduler
FIFO Scheduler
Fair Scheduler
Agenda
Shuffle and Sort
The Map Side
The Reduce Side
Configuration Tuning
Troubleshooting
Debugging MapReduce
Listing and Killing Jobs
Hadoop Streaming
Introduction To MapReduce
Overview
MapReduce is a method for distributing a task across multiple nodes
Each node processes data stored on that node where possible
Consists of two phase
• Map
• Reduce
Features of MapReduce
Automatic parallelization and distribution
Fault tolerance
A clean abstraction for programmers
• MapReduce programs are usually written in Java
• Can be written in any language using Hadoop&Streaming (see later)
• All of Hadoop is written in Java
MapReduce abstracts all the ‘housekeeping’ away from the developer
• Developer can concentrate simply on wriDng the Map and Reduce functions
Introduction To MapReduce
MapReduce : Terminology
A job is a ‘full program’
• A complete execution of Mappers and Reducers over a dataset
A task is the execution of a single Mapper or Reducer over a slice of data
A task attempt is a particular instance of an attempt to execute a task
• There will be at least as many task attempts as there are tasks
• If a task attempt fails, another will be started by the JobTracker
• Speculative execution (see later) can also result in more task attempts than completed
tasks
Hadoop Cluster
MapReduce Data Flow
InputFormat
How these input files are split up and read is defined by the InputFormat. An InputFormat
is a class that provides the following functionality:
• Selects the files or other objects that should be used for input
• Defines the InputSplits that break a file into tasks
• Provides a factory for RecordReader objects that read the file
Anatomy of a MapReduce Job run
Anatomy of a MapReduce Job Run
A job run in MapReduce V1 is illustrated in previous figure. At
the highest level, there are four independent entities
The client, which submits the MapReduce job.
The jobtracker, which coordinates the job run. The jobtracker is a Java
application whose main class is JobTracker
The tasktrackers, which run the tasks that the job has been split into.
Tasktrackers are Java applications whose main class is TaskTracker.
The distributed filesystem , which is used for sharing job files between
the other entities.
Anatomy of a MapReduce Job Run
Job Submission
Asks the jobtracker for a new job ID (by calling getNewJobId() on JobTracker) (step2).
Checks the output specification of the job. For example, if the output directory has not
been specified or it already exists, the job is not submitted and an error is thrown to the
MapReduce program.
Computes the input splits for the job. If the splits cannot be computed, because the input
paths don’t exist, for example, then the job is not submitted and an error is thrown to the
MapReduce program.
Copies the resources needed to run the job, including the job JAR file, the configuration
file, and the computed input splits, to the jobtracker’s filesystem in a directory named
after the job ID. The job JAR is copied with a high replication factor (controlled by the
mapred.submit.replication property, which defaults to 10) so that there are lots of copies
across the cluster for the tasktrackers to access when they run tasks for the job (step 3).
Tells the jobtracker that the job is ready for execution (by calling submitJob() on
JobTracker) (step 4).
Anatomy of a MapReduce Job Run
Job initialization
When the JobTracker receives a call to its submitJob() method, it puts it into an internal
queue from where the job scheduler will pick it up and initialize it. Initialization involves
creating an object to represent the job being run, which encapsulates its tasks, and
bookkeeping information to keep track of the tasks’ status and progress (step 5).
To create the list of tasks to run, the job scheduler first retrieves the input splits computed
by the client from the HDFS(step 6).
It then creates one map task for each split. The number of reduce tasks to create is
determined by the mapred.reduce.tasks property in the Job, which is set by the
setNumReduceTasks() method, and the scheduler simply creates this number of reduce
tasks to be run. Tasks are given IDs at this point.
In addition to the map and reduce tasks, two further tasks are created: a job setup task
and a job cleanup task. These are run by tasktrackers and are used to run code to setup
the job before any map tasks run, and to cleanup after all the reduce tasks are complete.
The OutputCommitter that is configured for the job determines the code to be run, and by
default this is a FileOutputCommitter. For the job setup task it will create the final output
directory for the job and the temporary working space for the task output, and for the job
cleanup task it will delete the temporary working space for the task output.
Anatomy of a MapReduce Job Run
Task Assignment
Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker.
Heartbeats tell the jobtracker that a tasktracker is alive, but they also double as a channel for messages.
As a part of the heartbeat, a tasktracker will indicate whether it is ready to run a new task, and if it is, the
jobtracker will allocate it a task, which it communicates to the tasktracker using the heartbeat return
value (step 7).
Before it can choose a task for the tasktracker, the jobtracker must choose a job to select the task from.
There are various scheduling algorithms as explained later in Job Scheduling section, but the default one
simply maintains a priority list of jobs. Having chosen a job, the jobtracker now chooses a task for the job.
Tasktrackers have a fixed number of slots for map tasks and for reduce tasks: for example, a tasktracker
may be able to run two map tasks and two reduce tasks simultaneously. (The precise number depends on
the number of cores and the amount of memory on the tasktracker) The default scheduler fills empty
map task slots before reduce task slots, so if the tasktracker has at least one empty map task slot, the
jobtracker will select a map task; otherwise, it will select a reduce task.
To choose a reduce task, the jobtracker simply takes the next in its list of yet-to-be-run reduce tasks,
since there are no data locality considerations. For a map task, however,it takes account of the
tasktracker’s network location and picks a task whose input split is as close as possible to the tasktracker.
In the optimal case, the task is data-local, that is, running on the same node that the split resides on.
Alternatively, the task may be rack-local: on the same rack, but not the same node, as the split. Some
tasks are neither data-local nor rack-local and retrieve their data from a different rack from the one they
are running on. You can tell the proportion of each type of task by looking at a job’s counters.
Anatomy of a MapReduce Job Run
Task Execution
Once the tasktracker has been assigned a task, the next step for it is to run the task.
First, it localizes the job JAR by copying it from the shared filesystem to the tasktracker’s
filesystem. It also copies any files needed from the distributed cache by the application to
the local disk;Second, it creates a local working directory for the task, and un-jars the
contents of the JAR into this directory. Third, it creates an instance of TaskRunner to run
the task.
TaskRunner launches a new Java Virtual Machine (step 9) to run each task in (step 10),so
that any bugs in the user-defined map and reduce functions don’t affect the
tasktracker(by causing it to crash or hang, for example). It is, however, possible to reuse
the JVM between tasks.
The child process communicates with its parent through the umbilical interface. This way
it informs the parent of the task’s progress every few seconds until the task is complete.
Each task can perform setup and cleanup actions, which are run in the same JVM as the
task itself, and are determined by the OutputCommitter for the job . The cleanup action is
used to commit the task, which in the case of file-based jobs means that its output is
written to the final location for that task. The commit protocol ensures that when
speculative execution is enabled only one of the duplicate tasks is committed and the
other is aborted.
Anatomy of a MapReduce Job Run
Progress and Status Updates
MapReduce jobs are long-running batch jobs, taking anything from minutes to hours
to run. Because this is a significant length of time, it’s important for the user to get
feedback on how the job is progressing.
A job and each of its tasks have a status, which includes such things as the state of
the job or task (e.g., running, successfully completed,failed), the progress of maps
and reduces, the values of the job’s counters, and a status message or description
(which may be set by user code). These statuses change over the course of the job,
so how do they get communicated back to the client?
When a task is running, it keeps track of its progress, that is, the proportion of the
task completed. For map tasks, this is the proportion of the input that has been
processed.For reduce tasks, it’s a little more complex, but the system can still
estimate the proportion of the reduce input processed. It does this by dividing the
total progress into three parts, corresponding to the three phases of the shuffle For
example, if the task has run the reducer on half its input, then the task’s progress is
⅚, since it has completed the copy and sort phases (⅓ each) and is halfway through
the reduce phase (⅙).
Anatomy of a MapReduce Job Run
What constitutes progress in MapReduce?
Progress is not always measurable, but nevertheless it tells Hadoop that a task is doing
something. For example, a task writing output records is making progress, even though it
cannot be expressed as a percentage of the total number that will be written, since the
latter figure may not be known, even by the task producing the output.
Progress reporting is important, as it means Hadoop will not fail a task that’s making
progress.
All of the following operations constitute progress:
• Reading an input record (in a mapper or reducer)
• Writing an output record (in a mapper or reducer)
• Setting the status description on a reporter (using Reporter’s setStatus() method)
• Incrementing a counter (using Reporter’s incrCounter() method)
• Calling Reporter’s progress() method
Anatomy of a MapReduce Job Run
Progress and Status Updates (Contd)
Tasks also have a set of counters that count various events as the task runs , either
those built into the framework, such as the number of map output records written,
or ones defined by users.
If a task reports progress, it sets a flag to indicate that the status change should be
sent to the tasktracker. The flag is checked in a separate thread every three seconds,
and if set it notifies the tasktracker of the current task status. Meanwhile, the
tasktracker is sending heartbeats to the jobtracker every five seconds (this is a
minimum, as the heartbeat interval is actually dependent on the size of the cluster:
for larger clusters,the interval is longer), and the status of all the tasks being run by
the tasktracker is sent in the call.
Counters are sent less frequently than every five seconds, because they can be
relatively high-bandwidth.The jobtracker combines these updates to produce a
global view of the status of all the jobs being run and their constituent tasks. Finally,
as mentioned earlier, the Job receives the latest status by polling the jobtracker
every second.
Clients can also use Job’s getStatus() method to obtain a JobStatus instance, which
contains all of the status information for the job.
Anatomy of a MapReduce Job Run
Job Completion
When the jobtracker receives a notification that the last task for a job is complete (this
will be the special job cleanup task), it changes the status for the job to “successful.”
Then, when the Job polls for status, it learns that the job has completed successfully,so it
prints a message to tell the user and then returns from the waitForCompletion()
method.The jobtracker also sends an HTTP job notification if it is configured to do so. This
can be configured by clients wishing to receive callbacks, via the job.end.notification.url
property.
Last, the jobtracker cleans up its working state for the job and instructs tasktrackers to do
the same (so intermediate output is deleted, for example).
Failures
Task Failure
Consider first the case of the child task failing. The most common way that this happens is when user code in the map or
reduce task throws a runtime exception. If this happens, the child JVM reports the error back to its parent tasktracker, before
it exits. The error ultimately makes it into the user logs. The tasktracker marks the task attempt as failed, freeing up a slot to
run another task.
For Streaming tasks, if the Streaming process exits with a nonzero exit code, it is marked as failed. This behavior is governed
by the stream.non.zero.exit.is.failure property (the default is true).
Another failure mode is the sudden exit of the child JVM—perhaps there is a JVM bug that causes the JVM to exit for a
particular set of circumstances exposed by the Map-Reduce user code. In this case, the tasktracker notices that the process
has exited and marks the attempt as failed.
Hanging tasks are dealt with differently. The tasktracker notices that it hasn’t received a progress update for a while and
proceeds to mark the task as failed. The child JVM process will be automatically killed after this period.
The timeout period after which tasks are considered failed is normally 10 minutes and can be configured on a per-job basis (or
a cluster basis) by setting the mapred.task.timeout property to a value in milliseconds.
Setting the timeout to a value of zero disables the timeout, so long-running tasks are never marked as failed. In this case, a
hanging task will never free up its slot, and over time there may be cluster slowdown as a result. This approach should
therefore be avoided, and making sure that a task is reporting progress periodically will suffice.
When the jobtracker is notified of a task attempt that has failed (by the tasktracker’s heartbeat call), it will reschedule
execution of the task. The jobtracker will try to avoid rescheduling the task on a tasktracker where it has previously failed.
Furthermore, if a task fails four times (or more), it will not be retried further. This value is configurable:
• the maximum number of attempts to run a task is controlled by the mapred.map.max.attempts property for map
tasks
• and mapred.reduce.max.attempts for reduce tasks. By default, if any task fails four times (or whatever the maximum
number of attempts is configured to), the whole job fails.
For some applications, it is undesirable to abort the job if a few tasks fail, as it may be possible to use the results of the job
despite some failures. In this case, the maximum percentage of tasks that are allowed to fail without triggering job failure can
be set for the job. Map tasks and reduce tasks are controlled independently, using the mapred.max.map.failures.percent and
mapred.max.reduce.failures.percent properties.
A task attempt may also be killed, which is different from it failing. A task attempt may be killed because it is a speculative
duplicate , or because the tasktracker it was running on failed, and the jobtracker marked all the task attempts running on it as
killed. Killed task attempts do not count against the number of attempts to run the task (as set by mapred.map.max.attempts
and mapred.reduce.max.attempts), since it wasn’t the task’s fault that an attempt was killed.
Users may also kill or fail task attempts using the web UI or the command line (type hadoop job to see the options). Jobs may
also be killed by the same mechanisms.
Failures
Tasktracker Failure
Failure of a tasktracker is another failure mode. If a tasktracker fails by crashing, or running very
slowly, it will stop sending heartbeats to the jobtracker (or send them very infrequently). The
jobtracker will notice a tasktracker that has stopped sending heartbeats (if it hasn’t received one
for 10 minutes, configured via the mapred.tasktracker.expiry.interval property, in milliseconds)
and remove it from its pool of tasktrackers to schedule tasks on.
The jobtracker arranges for map tasks that were run and completed successfully on that
tasktracker to be rerun if they belong to incomplete jobs, since their intermediate output
residing on the failed tasktracker’s local filesystem may not be accessible to the reduce task. Any
tasks in progress are also rescheduled.
A tasktracker can also be blacklisted by the jobtracker, even if the tasktracker has not failed. If
more than four tasks from the same job fail on a particular tasktracker (set by
(mapred.max.tracker.failures), then the jobtracker records this as a fault. A tasktracker is
blacklisted if the number of faults is over some minimum threshold (four, set by
mapred.max.tracker.blacklists) and is significantly higher than the average number of faults for
tasktrackers in the cluster.
Blacklisted tasktrackers are not assigned tasks, but they continue to communicate with the
jobtracker. Faults expire over time (at the rate of one per day), so tasktrackers get the chance to
run jobs again simply by leaving them running. Alternatively, if there is an underlying fault that
can be fixed (by replacing hardware, for example), the tasktracker will be removed from the
jobtracker’s blacklist after it restarts and rejoins the cluster.
Failures
Jobtracker Failure
Failure of the jobtracker is the most serious failure mode. Hadoop has no mechanism for
dealing with failure of the jobtracker.it is a single point of failure.so in this case the job
fails. However, this failure mode has a low chance of occurring, since the chance of a
particular machine failing is low. The good news is that the situation is improved in YARN,
since one of its design goals is to eliminate single points of failure in Map-Reduce.
After restarting a jobtracker, any jobs that were running at the time it was stopped will
need to be re-submitted. There is a configuration option that attempts to recover any
running jobs (mapred.jobtracker.restart.recover, turned off by default), however it is
known not to work reliably, so should not be used.
Job Scheduling
FIFO Scheduler
Early versions of Hadoop had a very simple approach to scheduling users’ jobs: they ran in
order of submission, using a FIFO scheduler. Typically, each job would use the whole
cluster, so jobs had to wait their turn. Although a shared cluster offers great potential for
offering large resources to many users, the problem of sharing resources fairly between
users requires a better scheduler.
Production jobs need to complete in a timely manner, while allowing users who are
making smaller ad hoc queries to get results back in a reasonable time.
Later on, the ability to set a job’s priority was added, via the mapred.job.priority property
or the setJobPriority() method on JobClient (both of which take one of the values
VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW). When the job scheduler is choosing the
next job to run, it selects one with the highest priority. However, with the FIFO scheduler,
priorities do not support preemption, so a high-priority job can still be blocked by a long-
running low priority job that started before the high-priority job was scheduled.
MapReduce in Hadoop comes with a choice of schedulers. The default in MapReduce 1 is
the original FIFO queue-based scheduler, and there are also multiuser schedulers called
the Fair Scheduler and the Capacity Scheduler. MapReduce 2 comes with the Capacity
Scheduler (the default), and the FIFO scheduler.
Job Scheduling
Fair Scheduler
The Fair Scheduler aims to give every user a fair share of the cluster capacity over time.
If a single job is running, it gets all of the cluster. As more jobs are submitted, free task
slots are given to the jobs in such a way as to give each user a fair share of the cluster.A
short job belonging to one user will complete in a reasonable time even while another
user’s long job is running, and the long job will still make progress.
Jobs are placed in pools, and by default, each user gets their own pool. A user who
submits more jobs than a second user will not get any more cluster resources than the
second, on average. It is also possible to define custom pools with guaranteed minimum
capacities defined in terms of the number of map and reduce slots, and to set weightings
for each pool.
The Fair Scheduler supports preemption, so if a pool has not received its fair share for a
certain period of time, then the scheduler will kill tasks in pools running over capacity in
order to give the slots to the pool running under capacity.
The Fair Scheduler is a “contrib” module. To enable it, place its JAR file on Hadoop’s
classpath, by copying it from Hadoop’s contrib/fairscheduler directory to the lib directory.
Then set the mapred.jobtracker.taskScheduler property to:
org.apache.hadoop.mapred.FairScheduler
Troubleshooting
Debugging MapReduce
Log Files: Hadoop keeps logs of important events during program execution. By default, these
are stored in the logs/ subdirectory of the hadoop-version/ directory where you run Hadoop
from. Log files are named hadoop-username-service-hostname.log. The most recent data is in
the .log file; older logs have their date appended to them. The username in the log filename
refers to the username under which Hadoop was started -- this is not necessarily the same
username you are using to run programs. The service name refers to which of the several
Hadoop programs are writing the log; these can be jobtracker, namenode, datanode,
secondarynamenode, or tasktracker. All of these are important for debugging a whole Hadoop
installation. But for individual programs, the tasktracker logs will be the most relevant. Any
exceptions thrown by your program will be recorded in the tasktracker logs.
The log directory will also have a subdirectory called userlogs. Here there is another subdirectory
for every task run. Each task records its stdout and stderr to two files in this directory. Note that
on a multi-node Hadoop cluster, these logs are not centrally aggregated -- you should check each
TaskNode's logs/userlogs/ directory for their output.
Troubleshooting
Listing and Killing Jobs
Hadoop Streaming
Overview
Streaming is a generic API that allows programs written in virtually any language to be used as
Hadoop Mapper and Reducer implementations.
Hadoop Streaming allows you to use arbitrary programs for the Mapper and Reducer phases of a
MapReduce job. Both Mappers and Reducers receive their input on stdin and emit output (key,
value) pairs on stdout.
Input and output are always represented textually in Streaming. The input (key, value) pairs are
written to stdin for a Mapper or Reducer, with a 'tab' character separating the key from the
value. The Streaming programs should split the input on the first tab character on the line to
recover the key and the value. Streaming programs write their output to stdout in the same
format: key \t value \n.
The inputs to the reducer are sorted so that while each line contains only a single (key, value)
pair, all the values for the same key are adjacent to one another.
Provided it can handle its input in the text format described above, any Linux program or tool
can be used as the mapper or reducer in Streaming. You can also write your own scripts in bash,
python, perl, or another language of your choice, provided that the necessary interpreter is
present on all nodes in your cluster.
Thank You