[go: up one dir, main page]

0% found this document useful (0 votes)
10 views57 pages

Bda Lab Manual - Cse 8 Sem - Compl

The document outlines a practical file for a Big Data Analytics Lab as part of a Bachelor of Technology in Computer Science & Engineering at St. Andrews Institute of Technology & Management. It includes a series of practical exercises involving the installation of Apache Hadoop and the development of various MapReduce programs for tasks such as calculating word frequency and finding maximum temperatures from weather data. The document also provides detailed procedures and code examples for executing these tasks using Hadoop's MapReduce framework.

Uploaded by

SAKSHI SHARMA
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
10 views57 pages

Bda Lab Manual - Cse 8 Sem - Compl

The document outlines a practical file for a Big Data Analytics Lab as part of a Bachelor of Technology in Computer Science & Engineering at St. Andrews Institute of Technology & Management. It includes a series of practical exercises involving the installation of Apache Hadoop and the development of various MapReduce programs for tasks such as calculating word frequency and finding maximum temperatures from weather data. The document also provides detailed procedures and code examples for executing these tasks using Hadoop's MapReduce framework.

Uploaded by

SAKSHI SHARMA
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 57

ST.

ANDREWS INSTITUTE
OF TECHNOLOGY & MANAGEMENT

Gurgaon Delhi (NCR)


Approved by AICTE, Govt. of India, New Delhi Affiliated to Maharshi Dayanand
University
‘A’ Grade State University, accredited by NAAC

Session: 2024 – 2025

Bachelor of Technology

Computer Science & Engineering

A Practical File on

Big Data Analytics Lab

Subject Code-LC-CSE-421G

Submitted To: Submitted by:


NAME:
(Assistant Professor)
SEM:

ROLL NO.:
St. Andrews Institute of Technology &
Management, Gurugram
Department of……………………………

Practical Lab Evaluation Sheet

Practical Viva- Remarks &


Performed Voce Attendance Practical Overall Signature
S.No Program Date CO
(05) File (05) (25)
(10) (05)
1 Install Apache Hadoop CO 2

2 Develop a MapReduce CO 1
program to calculate the
frequency of a given
word in a given file.

3 Develop a MapReduce CO 4
program to find the
maximum temperature
in each year.

4 Develop a MapReduce CO 4
program to find the
grades of students.

5 Develop a MapReduce CO 2
program to implement
Matrix Multiplication.
6 Develop a MapReduce CO 2
to find the maximum
electrical consumption
in each year given
electrical consumption
for each month in each
year.

7 Develop a MapReduce CO 2
to analyze weather data
set and print whether
the day is shinny or cool
day.

8 Develop a program to CO 4
calculate the maximum
recorded temperature by
yearwise for the
weather dataset in Pig
Latin
9 Develop a program to CO 2
implement Pig Latin
Modes, Programs.

10 Develop a Java CO 4
application to find the
maximum temperature
using Spark.

Average Marks

Approved & Verified by (Faculty Name)

(Faculty Sign.)
PROGRAM NO - 1
 Install Apache Hadoop

AIM: To Install Apache Hadoop.

Hadoop software can be installed in three modes of

Hadoop is a Java-based programming framework that supports the processing and storage of
extremely large datasets on a cluster of inexpensive machines. It was the first major open source
project in the big data playing field and is sponsored by the Apache Software Foundation. Hadoop-
2.7.3 is comprised of four main layers:

 Hadoop Common is the collection of utilities and libraries that support other Hadoop modules.
 HDFS, which stands for Hadoop Distributed File System, is responsible for persisting data to disk.
 YARN, short for Yet Another Resource Negotiator, is the "operating system" for HDFS.
 MapReduce is the original processing model for Hadoop clusters. It distributes work within the
cluster or map, then organizes and reduces the results from the nodes into a response to a query.
Many other processing models are available for the 2.x version of Hadoop.
Hadoop clusters are relatively complex to set up, so the project includes a stand-alone mode which
is suitable for learning about Hadoop, performing simple operations, and debugging.

Procedure:
we'll install Hadoop in stand-alone mode and run one of the example example MapReduce programs it
includes to verify the installation.

Prerequisites:

Step1: Installing Java 8 version.


Openjdk version "1.8.0_91"
OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.04.1-b14)
OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode) This output verifies
that OpenJDK has been successfully installed.
Note: To set the path for environment variables. i.e. JAVA_HOME

Step2: Installing Hadoop


With Java in place, we'll visit the Apache Hadoop Releases page to find the most recent
stable release. Follow the binary for the current release:

Download Hadoop from www.hadoop.apache.org


Procedure to Run Hadoop

1. Install Apache Hadoop 3.3.6 in MAC OS.

If Apache Hadoop 3.3.6 is not already installed then follow the post Build, Install, Configure
and Run Apache Hadoop 3.3.6 in MAC OS.

2. Start HDFS (Namenode and Datanode) and YARN (Resource Manager and Node Manager)
Result: We've installed Hadoop in stand-alone mode and verified it by running an example
program it provided.
PROGRAM 2
 Develop a MapReduce program to calculate the frequency of a given word in a given
file.

AIM: To Develop a MapReduce program to calculate the frequency of a given word in agiven file

Map Function – It takes a set of data and converts it into another set of data, where individual
elements are broken down into tuples (Key-Value pair).

Example – (Map function in Word Count)

Input
Set of data
Bus, Car, bus, car, train, car, bus, car, train, bus, TRAIN,BUS, buS, caR, CAR, car, BUS, TRAIN

Output
Convert into another set of data
(Key,Value)
(Bus,1), (Car,1), (bus,1), (car,1), (train,1), (car,1), (bus,1), (car,1), (train,1), (bus,1),
(TRAIN,1),(BUS,1), (buS,1), (caR,1), (CAR,1), (car,1), (BUS,1), (TRAIN,1)
Reduce Function – Takes the output from Map as an input and combines those data tuples into
a smaller set of tuples.
Example – (Reduce function in Word Count)
Input Set of Tuples
(output of Map function)
(Bus,1), (Car,1), (bus,1), (car,1), (train,1), (car,1), (bus,1), (car,1), (train,1), (bus,1),
(TRAIN,1),(BUS,1),

(buS,1),(caR,1),(CAR,1), (car,1), (BUS,1), (TRAIN,1) Output

Converts into a smaller set of tuples


(BUS,7), (CAR,7), (TRAIN,4)
Work Flow of Program

Workflow of MapReduce consists of 5 steps


1. Splitting – The splitting parameter can be anything, e.g. splitting by space, comma,
semicolon, or even by a new line (‘\n’).
2. Mapping – as explained above
3. Intermediate splitting – the entire process in parallel on different clusters. In order to group
them in “Reduce Phase” the similar KEY data should be on same cluster.
4. Reduce – it is nothing but mostly group by phase
5. Combining – The last phase where all the data (individual result set from each cluster) is
combine together to form a Result

Now Let’s See the Word Count Program in Java

Make sure that Hadoop is installed on your system with java idk

Steps to follow

Step 1. Open Eclipse> File > New > Java Project > (Name it – MRProgramsDemo)
> Finish
Step 2. Right Click > New > Package ( Name it - PackageDemo) > Finish
Step 3. Right Click on Package > New > Class (Name it - WordCount) Step 4.
Add Following Reference Libraries –

Right Click on Project > Build Path> Add External Archivals


• /usr/lib/hadoop-3.3.6/hadoop-core.jar
• Usr/lib/hadoop-3.3.6/lib/Commons-cli-1.2.jar

Step 5. Type following Program :

package PackageDemo; import


java.io.IOException;
import org.apache.hadoop.conf.Configuration; import
org.apache.hadoop.fs.Path; import
org.apache.hadoop.io.IntWritable; import
org.apache.hadoop.io.LongWritable; import
org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.Job; import
org.apache.hadoop.mapreduce.Mapper; import
org.apache.hadoop.mapreduce.Reducer; import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import
org.apache.hadoop.util.GenericOptionsParser; public class
WordCount {
public static void main(String [] args) throws Exception {
Configuration c=new Configuration();
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
Path input=new Path(files[0]);
Path output=new Path(files[1]);
Job j=new Job(c,"wordcount");
j.setJarByClass(WordCount.class);
j.setMapperClass(MapForWordCount.class);
j.setReducerClass(ReduceForWordCount.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(j, input);
FileOutputFormat.setOutputPath(j, output);
System.exit(j.waitForCompletion(true)?0:1);
}
public static class MapForWordCount extends Mapper<LongWritable, Text, Text,
IntWritable>{
public void map(LongWritable key, Text value, Context con) throws IOException,
InterruptedException
{
String line = value.toString();
String[] words=line.split(","); for(String
word: words )
{
Text outputKey = new Text(word.toUpperCase().trim());
IntWritable outputValue = new IntWritable(1);
con.write(outputKey, outputValue);
}
}
}
public static class ReduceForWordCount extends Reducer<Text, IntWritable, Text,
IntWritable>
{
public void reduce(Text word, Iterable<IntWritable> values, Context con) throws
IOException,
InterruptedException
{ int sum =
0;
for(IntWritable value : values)
{
sum += value.get();
}
con.write(word, new IntWritable(sum));
}
}
}

Make Jar File


Right Click on Project> Export> Select export destination as Jar File > next> Finish
To Move this into Hadoop directly, open the terminal and enter the following commands:
[training@localhost ~]$ hadoop fs -put wordcountFile wordCountFile

Run Jar file


(Hadoop jar jarfilename.jar packageName.ClassName PathToInputTextFile
PathToOutputDirectry)

[training@localhost ~]$ Hadoop jar


MRProgramsDemo.jar
PackageDemo.WordCount wordCountFile MRDir1

Result: Open Result

[training@localhost ~]$ hadoop fs -ls MRDir1


Found 3 items
-rw-r--r-- 1 training supergroup
0 2023-10-23 03:36 /user/training/MRDir1/_SUCCESS
drwxr-xr-x - training supergroup
0 2023-10-23 03:36 /user/training/MRDir1/_logs
-rw-r--r-- 1 training supergroup
20 2023-10-23 03:36 /user/training/MRDir1/part-r-00000
[training@localhost ~]$ hadoop fs -cat MRDir1/part-r-00000
BUS 7
CAR 4
TRAIN 6
PROGRAM 3
 Develop a MapReduce program to find the maximum temperature in each year.

AIM: To Develop a MapReduce program to find the maximum temperature in each year.

Description: MapReduce is a programming model designed for processing large volumes of data
in parallel by dividing the work into a set of independent tasks.Our previous traversal has given an
introduction about MapReduce This traversal explains how to design a MapReduce program. The
aim of the program is to find the Maximum temperature recorded for each year of NCDC data.
The input for our program is weather data files for each year This weather data is collected by
National Climatic Data Center – NCDC from weather sensors at all over the world. You can find
weather data for each year from ftp://ftp.ncdc.noaa.gov/pub/data/noaa/.All files are zipped by year
and the weather station. For each year, there are multiple files for different weather stations. Here
is an example for 1990 (ftp://ftp.ncdc.noaa.gov/pub/data/noaa/1901/).

• 010080-99999-1990.gz • 010100-99999-1990.gz
• 010150-99999-1990.gz
• …………………………………

MapReduce is based on set of key value pairs. So first we have to decide on the types for the
key/value pairs for the input.
Map Phase: The input for Map phase is set of weather data files as shown in snap shot. The types
of input key value pairs are LongWritable and Text and the types of output key value pairs are
Text and IntWritable. Each Map task extracts the temperature data from the given year file. The
output of the map phase is set of key value pairs. Set of keys are the years. Values are the
temperature of each year.
Reduce Phase: Reduce phase takes all the values associated with a particular key. That is all the
temperature values belong to a particular year is fed to a same reducer. Then each reducer finds
the highest recorded temperature for each year. The types of output key value pairs in Map phase
is same for the types of input key value pairs in reduce phase (Text and IntWritable). The types of
output key value pairs in reduce phase is too Text and IntWritable. So, in this example we write
three java classes:

• HighestMapper.java
• HighestReducer.java
• HighestDriver.java
Program: HighestMapper.java

import java.io.IOException; import


org.apache.hadoop.io.*; import
org.apache.hadoop.mapred.*;
public class HighestMapper extends MapReduceBase implements Mapper<LongWritable, Text,
Text, IntWritable>
{ public static final int MISSING = 9999;
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException
{
String line = value.toString();
String year = line.substring(15,19);
int temperature; if
(line.charAt(87)=='+')
temperature = Integer.parseInt(line.substring(88, 92)); else
temperature = Integer.parseInt(line.substring(87, 92)); String
quality = line.substring(92, 93);
if(temperature != MISSING && quality.matches("[01459]"))
output.collect(new Text(year),new IntWritable(temperature));
}
}

HighestReducer.java
import java.io.IOException; import
java.util.Iterator; import
org.apache.hadoop.io.*; import
org.apache.hadoop.mapred.*;
public class HighestReducer extends MapReduceBase implements Reducer<Text, IntWritable,
Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable>
output, Reporter reporter) throws IOException
{
int max_temp = 0;
;
while (values.hasNext())
{
int current=values.next().get();
if ( max_temp < current)
max_temp = current;
}
output.collect(key, new IntWritable(max_temp/10));
}

HighestDriver.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class HighestDriver extends Configured implements Tool{ public
int run(String[] args) throws Exception
{
JobConf conf = new JobConf(getConf(), HighestDriver.class);
conf.setJobName("HighestDriver");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(HighestMapper.class);
conf.setReducerClass(HighestReducer.class);
Path inp = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(conf, inp);
FileOutputFormat.setOutputPath(conf, out);
JobClient.runJob(conf);
return 0; }
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new HighestDriver(),args);
System.exit(res);
}
}

commands for execution :

hadoop com.sun.tools.javac.Main MaxMonTem.java jar


cf mmt.jar *.class
hadoop fs -mkdir -p /user/hadoop/MaxMonTem/input hadoop fs
-put -f data_tem.txt /user/hadoop/MaxMonTem/input/ hadoop
fs -ls /user/hadoop/MaxMonTem/input/
hadoop fs -cat /user/hadoop/MaxMonTem/input/data_tem.txt hadoop
fs -cat /user/hadoop/MaxMonTem/output/part-r-00000
Output:

2005 90
2006 100
2007 100
PROGRAM 4
 Develop a MapReduce program to find the grades of students.

AIM: To Develop a MapReduce program to find the grades of student’s.

import java.util.Scanner; public


class JavaExample
{ public static void main(String args[])
{
/* This program assumes that the student has 6 subjects,
* thats why I have created the array of size 6. You can *
change this as per the requirement.
*/ int marks[] = new
int[6];
int i;
float total=0, avg;
Scanner scanner = new Scanner(System.in);
for(i=0; i<6; i++) {
System.out.print("Enter Marks of Subject"+(i+1)+":");
marks[i] = scanner.nextInt(); total = total + marks[i];
} scanner.close();
//Calculating average
here avg = total/6;
System.out.print("The student Grade is: ");
if(avg>=80)
{
System.out.print("A");
}
else if(avg>=60 && avg<80)
{
System.out.print("B");
}
else if(avg>=40 && avg<60)
{

18

System.out.print("C");
} else
{

System.out.print("D");
}
}
}

Output:

Enter Marks of Subject1:40


Enter Marks of Subject2:80
Enter Marks of Subject3:80
Enter Marks of Subject4:40
Enter Marks of Subject5:60
Enter Marks of Subject6:60
The student Grade is: B
PROGRAM 5
 Develop a MapReduce program to implement Matrix Multiplication.

AIM: To Develop a MapReduce program to implement Matrix Multiplication.

In mathematics, matrix multiplication or the matrix product is a binary operation


that produces a matrix from two matrices. The definition is motivated by linear
equations and linear transformations on vectors, which have numerous applications
in applied mathematics, physics, and engineering. In more detail, if A is an n × m
matrix and B is an m × p matrix, their matrix product AB is an n × p matrix, in which
the m entries across a row of A are multiplied with the m entries down a column of
B and summed to produce an entry of AB. When two linear transformations are
represented by matrices, then the matrix product represents the composition of the
two transformations.
Algorithm for Map Function.

a. for each element mij of M do produce (key,value) pairs as ((i,k), (M,j,mij), for
k=1,2,3,.. upto the number of columns of N
b. for each element njk of N do produce (key,value) pairs as ((i,k),(N,j,Njk), for i
= 1,2,3,.. Upto the number of rows of M.
c. return Set of (key,value) pairs that each key (i,k), has list with values (M,j,mij)
and (N, j,njk) for all possible values of j.
Algorithm for Reduce Function.

a) for each key (i,k) do


b) sort values begin with M by j in listM sort values begin with N
by j in listN multiply mij and njk for jth value of each list
c) sum up mij x njk return (i,k), Σj=1 mij x njk
Step 1. Download the hadoop jar files with these links.

Download Hadoop Common Jar files: https://goo.gl/G4MyHp


$ wget https://goo.gl/G4MyHp -O hadoop-common-3.3.6.jar
Download Hadoop Mapreduce Jar File: https://goo.gl/KT8yfB
$ wget https://goo.gl/KT8yfB -O hadoop-mapreduce-client-core-2.7.1.jar

Step 2. Creating Mapper file for Matrix Multiplication.


import java.io.DataInput;
import java.io.DataOutput;
import
java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import
org.apache.hadoop.io.DoubleWritable; import
org.apache.hadoop.io.IntWritable; import
org.apache.hadoop.io.Text; import
org.apache.hadoop.io.Writable; import
org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import
org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.ReflectionUtils;

class Element implements Writable {


int tag; int
index;
double
value;
Element() { tag =
0; index =
0; value =
0.0;
}
Element(int tag, int index, double value)
{ this.tag = tag; this.index =
index; this.value = value;
}
@Override
public void readFields(DataInput input) throws IOException {
tag = input.readInt(); index = input.readInt(); value =
input.readDouble();
}
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(tag); output.writeInt(index);
output.writeDouble(value);
}
}
class Pair implements WritableComparable<Pair> {
int i;
int j;
Pair() { i =
0;
j = 0;
}
Pair(int i, int j) {
this.i = i;
this.j = j;
}
@Override
public void readFields(DataInput input) throws IOException {
i = input.readInt(); j = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(i); output.writeInt(j);
}
@Override
public int compareTo(Pair compare) {
if (i > compare.i) { return 1;
} else if ( i < compare.i) { return
-1;
} else { if(j > compare.j)
{ return 1;
} else if (j < compare.j) { return
-1;
}
} return
0;
}
public String toString() { return
i + " " + j + " ";
}
}
public class Multiply { public static class MatriceMapperM extends
Mapper<Object,Text,IntWritable,Element>
{

@Override
public void map(Object key, Text value, Context context) throws
IOException, InterruptedException {
String readLine = value.toString();
String[] stringTokens = readLine.split(",");

int index = Integer.parseInt(stringTokens[0]);


double elementValue = Double.parseDouble(stringTokens[2]);
Element e = new Element(0, index, elementValue);
IntWritable keyValue = new
IntWritable(Integer.parseInt(stringTokens[1]));
context.write(keyValue, e); }
}
public static class MatriceMapperN extends Mapper<Object,Text,IntWritable,Element> {
@Override
public void map(Object key, Text value, Context context) throws
IOException, InterruptedException {
String readLine = value.toString(); String[]
stringTokens = readLine.split(","); int index =
Integer.parseInt(stringTokens[1]);
double elementValue = Double.parseDouble(stringTokens[2]);
Element e = new Element(1,index, elementValue);
IntWritable keyValue = new
IntWritable(Integer.parseInt(stringTokens[0]));
context.write(keyValue, e); }
}
public static class ReducerMxN extends Reducer<IntWritable,Element, Pair,
DoubleWritable> { @Override
public void reduce(IntWritable key, Iterable<Element> values, Context context) throws
IOException, InterruptedException {
ArrayList<Element> M = new ArrayList<Element>();
ArrayList<Element> N = new ArrayList<Element>();
Configuration conf = context.getConfiguration();
for(Element element : values) {
Element tempElement = ReflectionUtils.newInstance(Element.class, conf);

ReflectionUtils.copy(conf, element, tempElement);

if (tempElement.tag == 0) {
M.add(tempElement);
} else if(tempElement.tag == 1) {
N.add(tempElement);
}
}
for(int i=0;i<M.size();i++) { for(int
j=0;j<N.size();j++) {

Pair p = new Pair(M.get(i).index,N.get(j).index); double


multiplyOutput = M.get(i).value * N.get(j).value;

context.write(p, new DoubleWritable(multiplyOutput));


}
}
}
}
public static class MapMxN extends Mapper<Object, Text, Pair, DoubleWritable> {
@Override
public void map(Object key, Text value, Context context) throws
IOException, InterruptedException {
String readLine = value.toString();
String[] pairValue = readLine.split(" ");
Pair p = new
Pair(Integer.parseInt(pairValue[0]),Integer.parseInt(pairValue[1])); DoubleWritable
val = new
DoubleWritable(Double.parseDouble(pairValue[2]));
context.write(p, val);
}
}
public static class ReduceMxN extends Reducer<Pair, DoubleWritable, Pair,
DoubleWritable> { @Override
public void reduce(Pair key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum = 0.0;
for(DoubleWritable value : values) {
sum += value.get();
}
context.write(key, new DoubleWritable(sum)); }
}
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJobName("MapIntermediate");
job.setJarByClass(Project1.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,
MatriceMapperM.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,
MatriceMapperN.class); job.setReducerClass(ReducerMxN.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Element.class);
job.setOutputKeyClass(Pair.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new
Path(args[2]));
job.waitForCompletion(true); Job
job2 = Job.getInstance();
job2.setJobName("MapFinalOutput")
; job2.setJarByClass(Project1.class);

job2.setMapperClass(MapMxN.class);
job2.setReducerClass(ReduceMxN.class);

job2.setMapOutputKeyClass(Pair.class);
job2.setMapOutputValueClass(DoubleWritable.class);

job2.setOutputKeyClass(Pair.class);
job2.setOutputValueClass(DoubleWritable.class);

job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job2, new Path(args[2]));


FileOutputFormat.setOutputPath(job2, new Path(args[3]));
job2.waitForCompletion(true); }
}
Step 5. Compiling the program in particular folder named as operation

#!/bin/bash
rm -rf multiply.jar classes

module load

hadoop/3.3.6

mkdir -p classes javac -d classes -cp classes:`$HADOOP_HOME/bin/hadoop


classpath` Multiply.java jar cf multiply.jar -C classes .

echo "end"

Step 6. Running the program in particular folder named as operation


export HADOOP_CONF_DIR=/home/$USER/cometcluster module
load hadoop/3.3.6
myhadoop-configure.sh
start-dfs.sh start-
yarn.sh

hdfs dfs -mkdir -p /user/$USER hdfs dfs -put M-matrix-large.txt


/user/$USER/M-matrix-large.txt hdfs dfs -put N-matrix-large.txt
/user/$USER/N-matrix-large.txt
hadoop jar multiply.jar edu.uta.cse6331.Multiply /user/$USER/M-matrix-large.txt
/user/$USER/N-matrix-large.txt /user/$USER/intermediate /user/$USER/output
rm -rf output-distr mkdir output-distr
hdfs dfs -get /user/$USER/output/part* output-distr

stop-yarn.sh
stop-dfs.sh myhadoop-cleanup.sh
27

Data set:M
M,1,2,10
M,3,2,9
M,6,3,9

Data set:N
N,2,2,9
N,6,7,8
N,8,8,10
Command for execution: hadoop com.sun.tools.javac.Main *.java jar cf mm.jar *.class hadoop fs -
mkdir -p /user/hadoop/mm/input/ hadoop fs -put -f M /user/hadoop/mm/input hadoop fs -put -f N/
/user/hadoop/mm/input hadoop fs -ls /user/hadoop/mm/input/ hadoop fs -cat /user/hadoop/mm/input/M
hadoop fs -cat /user/hadoop/mm/input/N hadoop jar mm.jar /user/hadoop/mm/input
/user/hadoop/mm/output hadoop fs -cat /user/hadoop/mm/output/part-r-0000

Output:

999,970,493.0 999,971,586.0

999,972,763.0

999,973,717.0

999,974,236.0

999,975,532.0
PROGRAM 6
 Develop a MapReduce to find the maximum electrical consumption in each year
given electrical consumption for each month in each year.

AIM: To Develop a MapReduce to find the maximum electrical consumption in each year
given electrical consumption for each month in each year.
Given below is the data regarding the electrical consumption of an organization. It contains the
monthly electrical consumption and the annual average for various years.
If the above data is given as input, we have to write applications to process it and produce results
such as finding the year of maximum usage, year of minimum usage, and so on. This is a walkover
for the programmers with finite number of records. They will simply write the logic to produce
the required output, and pass the data to the application written.
But, think of the data representing the electrical consumption of all the largescale industries of a
particular state, since its formation.
When we write applications to process such bulk data,
• They will take a lot of time to execute.
• There will be a heavy network traffic when we move data from source to network server and
so on.
To solve these problems, we have the MapReduce framework

Input Data
The above data is saved as sample.txt and given as input. The input file looks as shown below.

1979 23 23 2 43 24 25 26 26 26 26 25 26 25

1980 26 27 28 28 28 30 31 31 31 30 30 30 29

1981 31 32 32 32 33 34 35 36 36 34 34 34 34

1984 39 38 39 39 39 41 42 43 40 39 38 38 40

1985 38 39 39 39 39 41 41 41 00 40 39 39 45
Source code:
import java.util.*; import
java.io.IOException; import
java.io.IOException; import
org.apache.hadoop.fs.Path; import
org.apache.hadoop.conf.*; import
org.apache.hadoop.io.*; import
org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper&lt;LongWritable ,/*Input key Type */ Text, /*Input value Type*/
Text, /*Output key Type*/ IntWritable&gt; /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector&lt;Text, IntWritable&gt; output,
Reporter reporter) throws IOException
{
String line = value.toString(); String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens())
{
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer&lt; Text, IntWritable, Text, IntWritable &gt;
{
//Reduce function
public void reduce( Text key, Iterator &lt;IntWritable&gt; values, OutputCollector&lt;Text,
IntWritable&gt; output, Reporter reporter) throws
IOException
{
int maxavg=30; int
val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())&gt;maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}

}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}

Output:
Kolkata,56
Jaipur,45
Delhi,43
Mumbai,34
Goa,45
Kolkata,35
Jaipur,34
Delhi,32
Output:
Kolkata 56
Jaipur 45
Delhi 43
Mumbai 34

PROGRAM 7
 Develop a MapReduce to analyze weather data set and print whether the day is
shinny or cool day.

import java.io.IOException; import


java.util.Iterator; import
org.apache.hadoop.fs.Path; import
org.apache.hadoop.io.LongWritable; import
org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper; import
org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
public class MyMaxMin {
public static class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text,
Text> {
/**
* @method map

* This method takes the input as text data type

* Now leaving the first five tokens,it takes 6th token is taken as temp_max and

* 7th token is taken as temp_min. Now temp_max > 35 and temp_min < 10 are passed to the
reducer.
*/ @Override public void map(LongWritable arg0, Text Value, Context 2 context)
throws IOException, InterruptedException {

//Converting the record (single line) to String and storing it in a String variable line
String line = Value.toString();
//Checking if the line is not empty if
(!(line.length() == 0)) {

//date
String date = line.substring(6, 14);
//maximum temperature float temp_Max =
Float parseFloat(line.substring(39, 45).trim());
//minimum temperature float
temp_Min = Float
parseFloat(line.substring(47, 53).trim());
//if maximum temperature is greater than 35 , its a hot day

if (temp_Max > 35.0) {


// Hot day
context.write(new Text("Hot Day " + date),new
Text(String.valueOf(temp_Max)));
}
//if minimum temperature is less than 10, it’s a cold day if

(temp_Min < 10) {

// Cold day context.write(new

Text("Cold Day " + date), new

Text(String.valueOf(temp_Min)));
}
}
}
}
//Reducer
*MaxTemperatureReducer class is static and extends Reducer abstract having four hadoop

generics type Text, Text, Text, Text.

*/ public static class MaxTemperatureReducer extends Reducer<Text, Text, Text,

Text> { public void reduce (Text Key, Iterator<Text> Values, Context context) throws

IOException,

Interrupted Exception {
String temperature = Values.next().toString();
context.write(Key, new Text(temperature));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "weather example");

job.setJarByClass(MyMaxMin.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class); job.setMapperClass(MaxTemperatureMapper.class);

job.setReducerClass(MaxTemperatureReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class); Path

OutputPath = new Path(args[1]);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

OutputPath.getFileSystem(conf).delete(OutputPath);

System.exit(job.waitForCompletion(true) ? 0 : 1);

Import the project in eclipse IDE in the same way it was told in earlier guide and
change the jar paths with the jar files present in the lib directory of this project.

When the project is not having any error, we will export it as a jar file, same as we
did in wordcount mapreduce guide. Right Click on the Project file and click on
Export. Select jar file.

Give the path where you want to save the file.


Click on Finish to export.
Output:
PROGRAM 8
 Develop a program to calculate the maximum recorded temperature by yearwise for the
weather dataset in Pig Latin

AIM: To Develop a program to calculate the maximum recorded temperature by year wise for the
weather dataset in Pig Latin

Description:

The National Climatic Data Center (NCDC) is the world's largest active archive of weather data. I
downloaded the NCDC data for year 1930 and loaded it in HDFS system. I implemented MapReduce
program and Pig, Hove scripts to findd the Min, Max, avg temparature for diffrent stations.

Compiled the Java File: javac -classpath /home/student3/hadoop-common-


2.6.1.jar:/home/student3/hadoop-mapreduce-client-core-2.6.1.jar:/home/student3/commons-cli- 2.0.jar d
. MaxTemperature.java MaxTemperatureMapper.java MaxTemperatureReducer.java

Created the JAR file: jar -cvf hadoop-project.jar *class

Executed the jar file: hadoop jar hadoop-project.jar MaxTemperature /home/student3/Project/


/home/student3/Project_output111

Copy the output file to local hdfs dfs -copyToLocal /home/student3/Project_output111/part-r- 00000

PIG Script

Pig -x local grunt> records = LOAD '/home/student3/Project/Project_Output/output111.txt' AS


(year:chararray, temperature:int); grunt> DUMP records; grunt> grouped_records = GROUP records
BY year; grunt> DUMP grouped_records; grunt> max_temp = FOREACH grouped_records
GENERATE group,

Hive Script

Commands to create table in hive and to find average temperature DROP

TABLE IF EXISTS w_hd9467;

CREATE TABLE w_hd9467(year STRING, temperature INT) ROW FORMAT DELIMITED FIELDS
TERMINATED BY ‘\t’;

LOAD DATA LOCAL INPATH '/home/student3/Project/Project_Output/output1.txt'

OVERWRITE INTO TABLE w_hd9467;


SELECT count(*) from w_hd9467;

SELECT * from w_hd9467 limit 5;

Query to find average temperature SELECT year, AVG(temperature) FROM w_hd9467 GROUP BY
year;
MaxTemperature.java
import org.apache.hadoop.fs.Path; import
org.apache.hadoop.io.IntWritable; import
org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {


public static void main(String[] args) throws Exception { if
(args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1);
}

Job job = new Job();


job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");

FileInputFormat.addInputPath(job, new Path(args[0]));


FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1); }
}

MaxTemperatureMapper.java
import java.io.IOException; import
org.apache.hadoop.io.IntWritable; import
org.apache.hadoop.io.LongWritable; import
org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper


extends Mapper<LongWritable, Text, Text, IntWritable> { private
static final int MISSING = 9999;

@Override
public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
String line = value.toString(); String year = line.substring(15, 19); int
airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like
leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else { airTemperature =
Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature)); }
}
}

MaxTemperatureReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public

class MaxTemperatureReducer extends Reducer<Text,

IntWritable, Text, IntWritable> {

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context
context)
throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE; for


(IntWritable value : values) { context.write(key,
value);
// maxValue = Math.max(maxValue, value.get());
}
//context.write(key, new IntWritable(maxValue)); }
}

Output:

1921 -
222
1921 -
144
1921 -
122
1921 -
139
1921 -
122
1921 -89
1921 -72
1921 -61
1921 -56
1921 -44
1921 -61
1921 -72
1921 -67
1921 -78
1921 -78
1921 -
133
1921 -
189
1921 -
250
1921 -
200
1921 -
150
1921 -
156
1921 -
144
1921 -
133
1921 -
139
1921 -
161
1921 -
233
1921 -
139
1921 -
94
1921 -
89
1921 -
122
1921 -
100
1921 -
100
1921 -
106
1921 -
117
1921 -
144
1921 -
128
1921 -
139
1921 -
106
1921 -
100
1921 -
94
1921 -
83
1921 -
83
1921 -
106
1921 -
150
1921 -
200
1921 -
178
1921 -
72
1921 -
156
PROGRAM 9

 Develop a program to implement Pig Latin Modes, Programs.

OBJECTIVE:

a. Run the Pig Latin Scripts to find Word Count.


b. Run the Pig Latin Scripts to find a max temp for each and every year.

PROGRAM LOGIC:

Run the Pig Latin Scripts to find Word Count.

lines = LOAD '/user/hadoop/HDFS_File.txt' AS (line:chararray);


words = FOREACH lines GENERATE FLATTEN(TOKENIZE(line)) as word; grouped
= GROUP words BY word;
wordcount = FOREACH grouped GENERATE group, COUNT(words); DUMP wordcount;

Run the Pig Latin Scripts to find a max temp for each and every year

-- max_temp.pig: Finds the maximum temperature by year records


= LOAD 'input/ncdc/micro-tab/sample.txt'
AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year; max_temp = FOREACH
grouped_records GENERATE group,
MAX(filtered_records.temperature);
DUMP max_temp;

OUTPUT:
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
PROGRAM 10
 Develop a Java application to find the maximum temperature using Spark.

AIM: To Develop a Java application to find the maximum temperature using Spark.

Sourcecode:

import org.apache.spark._; object testfilter extends App { val conf=new


SparkConf().setMaster(“local[2]”).setAppName(“testfilter”) val sc = new
SparkContext(conf)
System.setProperty(“hadoop.home.dir”, “c://winutil//”) val
input=sc.textFile(“file:///D://sparkprog//temp//stats.txt”) val
line=input.map(x=>x.split(“\t val city=line.map(x=>(x(3)+”\t
x(4))) val rdd3=city.map(x=>x.split(“\t
val maintemp=rdd3.map(x=>((x(0),x(1)))) val grp=
maintemp.groupByKey()
val main = grp.map{case (x,iter) => (x,iter.toList.max)} for (
i<- main) { print(i) print(“\n”)
}
}
OUTPUT:

(Jammu and Kashmir,20) (Madhya Pradesh,32) (Bihar,31) and so on ..

You might also like