Module 2
Module 2
& [gn CD MANAGE HADOOP WORKFLOWS WITH APACHE OOZIE ozie is a workflow director system designed to run and manage multiple related Apache Hadoop jobs. For instance, complete data input and analysis may require several discrete Hadoop jobs to be run as a workflow in which the output of one job serves as the input for a successive job. ozie is designed to construct and manage these workflows. Oozie is not a substitute for the YARN scheduler That is, YARN manages resources for individual Hadoop jobs, and Oozie provides a way to connect and control Hadoop jobs on the cluster. ozie workflow jobs are represented as directed acyclic graphs (DAGs) of actions. (DAGs are basically graphs that cannot have directed loops.) Three types of Oorie jobs are permitted: 1, Workflow—a specified sequence of Hadoop jobs with outcome- based decision points and control dependency. Progress from one action to another cannot happen until the first action is complete. 2. Coordinator—a scheduled workflow job that can run at various time intervals or when data become available, 3, Bundle—a higher-level Oozie abstraction that will batch a set of coordinator jobs. Oozie is integrated with the rest of the Hadoop stack, supporting several types of Hadoop jobs out of the box (e.g., Java MapReduce, Streaming MapReduce, Pig, Dept of CSE, CEC Page 6 hhttps://sites.google.com/view/dksbinBig Data Analytics(ISCS82) ‘Module 11 Santosh Kumar D K Hive, and Sqoop) as well as systemspecitic jobs (e.g., Java programs and shell scripts). Oozie also provides a CLI and a web Ul for monitoring jobs. ™—~ Figure 7.6 A simple Oozie DAG workflow start. | map-reduce|_OK wordcount ERROR MapReduce Workflow DAG Worktlow.xmi Figure 7.6 depicts a simple Oozie workflow. In this case, Oozie runs a basic MapReduce operation. If the application was successful, the job ends; if an error occurred, the job is killed. Oozie workflow definitions are written in hPDL (an XML Process Definition Language). Such workflows contain several types of nodes: 1. Control flow nodes define the beginning and the end of a workflow. They include start, end, and optional fail nodes. 2. Action nodes are where the actual processing tasks are defined, When an action node finishes, the remote systems notify Oozie and the next node in the workflow is executed. Action nodes can also include HDFS commands. 3. Fork/join nodes enable parallel execution of tasks in the workflow. The fork node enables two or more tasks to run at the same time. A join node represents a rendezvous point that must wait until all forked tasks complete. 4. Control flow nodes enable decisions to be made about the previous task. Control decisions are based on the results of the previous action (e.g, file size or file existence). Decision nodes are essentially switch-case statements that use JSP EL (Java Server Pages—Expression Language) that evaluate to either true or false. Figure 7.7 A more complex Oozie DAG workflow | afta Yer > @-[=}— Dept of CSE, CEC Page 7 hhttps://sites.google.com/view/dksbinBig Data Analytics(1SCS82) ‘Module 11 Santosh Kumar D K USING APACHE HBASE Apache HBase is an open source, distributed, versioned, nonrelational database modeled after Google’s Bigtable. Like Bigtable, HBase leverages the distributed data storage provided by the underlying distributed file systems spread across commodity servers. Apache HBase provides Bigtable-like capabi on top of Hadoop and HDES. Some of the more important features include the following capabilities: Linear and modular scalability Strictly consistent reads and writes Automatic and configurable sharding of tables Automatic failover support between Re; Convenient hase classes for backing Hadoop MapReduce jobs with Apache HBase tables Easy-to-use Java API for client access HBase Data Model Overview A table in HBase is similar to other databases, having rows and columns. Columns in HBase are grouped into column families, all with the same prefix. For example, consider a table of daily stock prices. There may be a column family called “price” that has four members— price:open, price:close, priceslow, and price:high. A column does not need to be a family. For instance, the stock table may have a column named “volume” indicating how many shares were traded. All column family members are stored together in the physical file system. Specific HBase cell values are identified by a row key, column (column family and column), and version (timestamp). It is possible to have many versions of data within an HBase cell. A version is specified as a timestamp and is created each time data are written to aceil. Almost anything can serve as a row key, from strings to binary representations of longs to serialized data structures. Rows are lexicographically sorted with the lowest order appearing first in a table. The empty byte array denotes both the start and the end of a table’s namespace. All table accesses are via the table row key, which is considered its primary key YARN DISTRIBUTED-SHELL ‘The Hadoop YARN project includes the Distributed-Shell application, which is an example of a Hadoop non-MapReduce application built on top of YARN. Distributed-Shell is a simple mechanism for running shell commands and scripts in containers on multiple nodes in a Hadoop cluster. Dept of CSE, CEC Page 8 hhttps://sites.google.com/view/dksbin15CS82) ‘Module I This application is not meant to be a production administration tool, but rather a demonstration of the non-MapReduce capability that can be implemented on top of YARN. There are multiple mature implementations of a distributed shell that administrators typically use to manage a cluster of machines. In addition, Distributed-Shell can be used as a starting point for exploring and building Hadoop YARN applications. STRUCTURE OF YARN APPLICATIONS YARN ResourceManager runs as a scheduling daemon on a dedicated machine and acts as the central authority for allocating resources to the various competing applications in the cluster. The ResourceManager has a central and global view of all cluster resources and, therefore, can ensure fairness, capacity, and locality are shared across all users. Depending on the application demand, scheduling priorities, and resource availability, the ResourceManager dynamically allocates resource containers 0 applications to run on particular nodes. A container is a logical bundle of resources (c.g., memory, cores) bound to a particular cluster node To enforce and track such assignments, the ResourceManager interacts with a special system daemon running on each node called the NodeManager. Communications between the ResoureeManager and NodeManagers are heartbeat based for scalability. NodeManagers are responsible for local monitoring of resource availability, fault reporting, and container life-cycle management (c.g., starting and killing jobs). The ResourceManager depends on the NodeManagers for its “global view” of the cluster User applications are submitted to the ResourceManager via a public protocol and go through an admission control phase during which security credentials are validated and various operational and administrative checks are performed. ‘Those applications that are accepted pass to the scheduler and are allowed to run. Once the scheduler has enough resources to satisfy the request, the application is moved from an accepted state to a running state. Aside from internal bookkeeping, this process the single ApplicationMaster and spawning it on a node in the cluster. Often called container0, the ApplicationMaster does not have any additional resources at this point, but rather must request additional resources from the ResourceManager. The ApplicationMaster is the “master” user job that manages all application life- cycle aspects, consumption (i. involves allocating a container for \cluding dynamically increasing and decreasing resource containers), managing the flow of execution (c.g., in case of Dept of CSE, CEC Page 9 hhttps://sites.google.com/view/dksbinBig Data Analytics(1SCS82) ‘Module 11 Santosh Kumar D K MapReduce jobs, running reducers against the output of maps), handling faults and computation skew, and performing other local optimizations. The ApplicationMaster is designed to run arbitrary user code that can be written, in any programming language, as all communication with the ResourceManager and NodeManager is encoded using extensible network protocols. YARN makes few assumptions about the ApplicationMaster, although in practice it expects most jobs will use a higher-level programming framework. By delegating all these functions to ApplicationMasters, YARN’s architecture gains a great deal of scalability, programming model flexibility, and improved user agility. For example, upgrading and testing a new MapReduce framework can be done independently of other running MapReduce frameworks. Typically, an ApplicationMaster will need to hamess the processing power of multiple servers to complete a job. To achieve this, the ApplicationMaster isswes resource requests to the ResourceManager. The form of these requests includes specification of locality preferences (e.g., 10 accommodate HDFS use) and properties of the containers. ‘The ResourceManager will attempt to satisfy the resource requests coming from each application according to availability and scheduling policies. When a resource is scheduled on behalf of an ApplicationMaster, the ResourceManager generates a lease for the resource, which is acquired by a subsequent ApplicationMaster heartbeat. The ApplicationMaster then works with the NodeManagers to start the resource. A. token-based security mechanism guarantees its authenticity when the ApplicationMaster presents the container lease to the NodeManager. In a typical situation, running containers will communicate with the ApplicationMaster through an application-specific protocol to report status and health information and to receive framework-specific commands. In this way, YARN provides a basic infrastructure for monitoring and life-cycle management of containers, while each framework manages application-specifie ics independently. Tigure 8.2 WARN archit darker elient (MPI-AM= ‘client (MRAM) fe Ps semi sure with two clients (MapReduce and MPD. The s running an MPI application, and the lighter (3 a MapReduce applicati Dept of CSE, CECBig Data Analytics(ISCS82) ‘Module 11 Santosh Kumar D K This design stands in sharp contrast to the original Hadoop version 1 design, in which scheduling was designed and integrated around managing only MapReduce tasks. Figure 8.1 illustrates the relationship between the application and YARN components. ‘The YARN components appear as the large outer boxes (ResourceManager and NodeManagers), and the two applications appear as smaller boxes (containers), one dark and one light. Each application uses a different ApplicationMaster; the darker client is running a Message Passing Interface (MPI) appli traditional MapReduce application ion and the lighter client is running a YARN APPLICATION FRAMEWORKS One of the most exciting aspects of Hadoop version 2 is the capability to run all types of applications on a Hadoop cluster. In Hadoop version 1, the only processing model available to users is MapReduce. In Hadoop version 2, MapReduce is separated from the resource management layer of Hadoop and placed into its own application framework. Indeed, the growing number of YARN applications offers a high level and multifaceted interface to the Hadoop data lake YARN presents a resource management platform, which provides services such as scheduling, fault monitoring, data locality, and more to MapReduce and other frameworks. Figure 8.2 illustrates some of the various frameworks that will run under YARN Figure 8.2 Example of the Hadoop version 2 ecosystem. Hadoop. Version 1 supports batch MapReduce applications only version 1 || Apache vez} | “gache Apache 1) upc - mpi ‘Base, spite] | Leste" || “Sap tS wa | | tine ying {oa Eee ae YARN Resource Manager at Hadoop Distributed File System - HDFS Distributed-Shell Distributed-Shell is an example application included with the Hadoop core components that demonstrates how to write applications on top of YARN. It provides a simple method for running shell commands and scripts in containers in parallel on a Hadoop YARN cluster. Dept of CSE, CEC Page 11 Ihttps:/sites.google.com/view/dksbinBig Data Analyti 15CS82) ‘Module I Hadoop MapReduce MapReduce was the first YARN framework and drove many of YARN’s requirements. It is integrated tightly with the rest of the Hadoop ecosystem projects, such as Apache Pig, Apache Hive, and Apache Oozie Apache Tez Many Hadoop jobs involve the execution of a complex directed acyclic graph (DAG) of tasks using separate MapReduce stages. Apache Tez generalizes this process and enables these tasks to be spread across stages so that they can be run as a single, all-encompassing job. Tez can be used as a MapReduce replacement for projects such as Apache Hive and Apache Pig. Apache Giraph Apache Giraph is an iterative graph processing system built for high scalability. Facebook, Twitter, and LinkedIn use it to create social graphs of users. Giraph_ was originally written to run on standard Hadoop V1 using the MapReduce framework, but that approach proved inefficient and totally unnatural for various reasons. The native Giraph implementation under YARN provides the user with an iterative processing model that is not directly available with MapReduce. Support for YARN has been present in Giraph since its own version 1.0 release. In addition, using the flexibility of YARN, the Giraph developers plan on implementing their own web interface to monitor job progress. Hoya: HBase on YARN ‘© The Hoya project creates dynamic and elastic Apache HBase clusters on top of YARN. + A client application creates the persistent configuration files, sets up the HBase cluster XML files, and then asks YARN to create an ApplicationMaster. © YARN copies all files listed in the client's application-launch request from HDES into the local file system of the chosen server, and then executes the command to start the Hoya ApplicationMaster. © Hoya also asks YARN for the number of containers matching the number of HBase region servers it needs Dryad on YARN * Similar to Apache Tez, Microsoft's Dryad provides a DAG as the abstraction of execution flow. ‘* This framework is ported to run natively on YARN and is fully compatible with its non-YARN version, ‘+ The code is written completely in native C++ and C# for worker nodes and uses a thin layer of Java within the application. Dept of CSE, CEC Page 12, Ittps:/sites.google.com/view/dksbinBig Data Analyti 15CS82) ‘Module 11 Apache Spark Apache Stor Apache RE] Spark was initially developed for applications in which keeping data in memory improves performance, such as iterative algorithms, which are common in machine learning, and interactive data mining. Spark differs from classic MapReduce in two important ways. First, Spark holds intermediate results in memory, rather than w to disk. Second, Spark supports more than just MapReduce functions; that is, it greatly expands the set of possible analyses that can be executed over HDFS data stores. It also provides APIs in Scala, Java, and Python, Since 2013, Spark has been running on production YARN clusters at Yahoo!. The advantage of porting and running Spark on top of YARN is the common resource management and a single underlying file system. ing them rm Traditional MapReduce jobs are expected to eventually finish, but Apache Storm continuously processes messages until itis stopped. This framework is designed to process unbounded streams of data in real time. It can be used in any programming language. The basic Storm use-cases include real-time analytics, online machine learning, continuous computation, distributed RPC (remote procedure calls), ETL (extract, transform, and load), and more Storm provides fast performance, is scalable, is fault tolerant, and provides processing guarantees. It works directly under YARN and takes advantage of the common data and resource management substrate. ‘EF: Retainable Evaluator Execution Framework YARN’s flexibility sometimes requires significant effort on the part of application implementers. ‘The steps involved in writing a custom application on YARN include building your own ApplicationMaster, performing client and container management, and handling aspects of fault tolerance, execution flow, coordination, and other concerns. ‘The REEF project by Microsoft recognizes this challenge and factors out several components that are common to many applications, such as storage management, data caching, fault detection, and checkpoints. Framework designers can build their applications on top of REEF more easily than they can build those same applications directly on YARN, and can reuse these common services/libraries REEF’s design makes it suitable for both MapReduce and DAG-like executions as well as iterative and interactive computations Dept of CSE, CEC Page 13 https:/sites.google.com/view/dksbin