[go: up one dir, main page]

0% found this document useful (0 votes)
14 views32 pages

Bad601 Lab

The document is a lab manual for a Big Data Analytics course at Atria Institute of Technology, detailing the syllabus, practical components, and experiments. It covers various topics including Hadoop, MongoDB, Hive, and Spark, along with specific experiments for students to implement using Java, Python, or R. The manual also outlines the Continuous Internal Evaluation (CIE) criteria for practical assessments.

Uploaded by

yathin14310
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
14 views32 pages

Bad601 Lab

The document is a lab manual for a Big Data Analytics course at Atria Institute of Technology, detailing the syllabus, practical components, and experiments. It covers various topics including Hadoop, MongoDB, Hive, and Spark, along with specific experiments for students to implement using Java, Python, or R. The manual also outlines the Continuous Internal Evaluation (CIE) criteria for practical assessments.

Uploaded by

yathin14310
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 32

ATRIA INSTITUTE OF TECHNOLOGY

(An Autonomous Institution under VTU)

LAB MANUAL

BIG DATA ANALYTICS


SEMESTER: 6

DEPARTMENT OF INFORMATION SCIENCE


& ENGINEERING
Syllabus

Module 1: Classification of data, Characteristics, Evolution and definition of Big data, What is Big
data, Why Big data, Traditional Business Intelligence Vs Big Data,Typical data warehouse and
Hadoop environment. Big Data Analytics: What is Big data Analytics, Classification of Analytics,
Importance of Big Data Analytics, Technologies used in Big data Environments, Few Top Analytical
Tools , NoSQL, Hadoop.

Module 2: Introduction to Hadoop: Introducing hadoop, Why hadoop, Why not RDBMS, RDBMS
Vs Hadoop, History of Hadoop, Hadoop overview, Use case of Hadoop, HDFS (Hadoop Distributed
File System),Processing data with Hadoop, Managing resources and applications with Hadoop
YARN(Yet Another Resource Negotiator). Introduction to Map Reduce Programming: Introduction,
Mapper, Reducer, Combiner, Partitioner, Searching, Sorting, Compression.

Module 3: Introduction to MongoDB: What is MongoDB, Why MongoDB, Terms used in RDBMS
and MongoDB, Data Types in MongoDB, MongoDB Query Language.

Module 4: Introduction to Hive: What is Hive, Hive Architecture, Hive data types, Hive file formats,
Hive Query Language (HQL), RC File implementation, User Defined Function (UDF). Introduction
to Pig: What is Pig, Anatomy of Pig, Pig on Hadoop, Pig Philosophy, Use case for Pig, Pig Latin
Overview, Data types in Pig, Running Pig, Execution Modes of Pig, HDFS Commands, Relational
Operators, Eval Function, Complex Data Types, Piggy Bank, User Defined Function, Pig Vs Hive.

Module 5: Spark and Big Data Analytics: Spark, Introduction to Data Analysis with Spark.

CIE for the practical component of the IPCC


● 15 marks for the conduction of the experiment and preparation of laboratory record, and 10 marks for
the test to be conducted after the completion of all the laboratory sessions.
● On completion of every experiment/program in the laboratory, the students shall be evaluated including
viva-voce and marks shall be awarded on the same day.
● The CIE marks awarded in the case of the Practical component shall be based on the continuous
evaluation of the laboratory report. Each experiment report can be evaluated for 10 marks. Marks of
all experiments’ write-ups are added and scaled down to 15 marks.
● The laboratory test (duration 02/03 hours) after completion of all the experiments shall be conducted
for 50 marks and scaled down to 10 marks.
● Scaled-down marks of write-up evaluations and tests added will be CIE marks for the laboratory
component of IPCC for 25 marks.
● The student has to secure 40% of 25 marks to qualify in the CIE of the practical component of the
IPCC.

Dept. of ISE Atria Institute of Technology, Bengaluru


Note: The theory portion of the IPCC shall be for both CIE and SEE, whereas the practical portion will
have a CIE component only. Questions mentioned in the SEE paper may include questions from the
practical component

Dept. of ISE Atria Institute of Technology, Bengaluru


Experiments (Java/Python/R)

SL
EXPERIMENTS
NO
Install Hadoop and Implement the following file management tasks in
Hadoop: Adding files and directories Retrieving files Deleting files and
1 directories. Hint: A typical Hadoop workflow creates data files (such
as log files) elsewhere and copies them into HDFS using one of the
above command line utilities.

2 Develop a MapReduce program to implement Matrix Multiplication

Develop a Map Reduce program that mines weather data and displays
3
appropriate messages indicating the weather conditions of the day.

Develop a MapReduce program to find the tags associated with each


4
movie by analyzing movie lens data.
Implement Functions: Count – Sort – Limit – Skip – Aggregate using
5
MongoDB
Develop Pig Latin scripts to sort, group, join, project, and filter the
6
data.

Use Hive to create, alter, and drop databases, tables, views, functions,
7
and indexes.

8 Implement a word count program in Hadoop and Spark.

Use CDH (Cloudera Distribution for Hadoop) and HUE (Hadoop User
9
Interface) to analyze data and generate reports for sample datasets

Dept. of ISE Atria Institute of Technology, Bengaluru


Out of Syllabus Experiments (Java/Python/R)

SL
EXPERIMENTS
NO
Develop a MapReduce to find the maximum electrical consumption in
1
each year given electrical consumption for each month in each year.
Develop a MapReduce program to find the maximum temperature in
2
each year.
3 Visualize data using basic plotting techniques in Python
4 Implement a MapReduce program that processes a dataset
Develop a MapReduce program to analyze Uber data set to find the days
on which each basement has more trips using the following dataset. The
5 Uber dataset consists of four columns they are

6 Write queries to sort and aggregate the data in a table using HiveQL
Develop a MapReduce program to find the number of products sold in
each country by considering sales data containing fields like
7

8 Implement clustering techniques using SPARK.

Implement an application that stores big data in MongoDB / Pig using


9
Hadoop / R.

Dept. of ISE Atria Institute of Technology, Bengaluru


1. Install Hadoop and Implement the following file management tasks in
Hadoop: Adding files and directories Retrieving files Deleting files and
directories. Hint: A typical Hadoop workflow creates data files (such as
log files) elsewhere and copies them into HDFS using one of the above
command line utilities.

To install Hadoop and implement file management tasks (adding, retrieving, and deleting files and
directories) using Python, follow these steps:

----------------------------------------------------------------------------------------------------------------------------------------
-----------
### **Step 1: Install Hadoop**
1. **Prerequisites**:
- Install Java Development Kit (JDK) 8 or later.
- Ensure SSH is installed and configured.

2. **Download and Extract Hadoop**:


- Download Hadoop from the [Apache Hadoop website](https://hadoop.apache.org/releases.html).
- Extract the tarball:
```bash
tar -xzvf hadoop-<version>.tar.gz
```

3. **Configure Hadoop**:
- Set environment variables in `~/.bashrc` or `~/.bash_profile`:
```bash
export HADOOP_HOME=/path/to/hadoop-<version>
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_HOME=/path/to/java
```
- Reload the shell configuration:
```bash
source ~/.bashrc
```

4. **Configure Hadoop for Pseudo-Distributed Mode**:


- Edit the Hadoop configuration files (`core-site.xml`, `hdfs-site.xml`, `mapred-site.xml`, and `yarn-site.xml`)
to set up pseudo-distributed mode.
- Format the HDFS filesystem:
```bash
hdfs namenode -format
```
- Start Hadoop services:
```bash

Dept. of ISE Atria Institute of Technology, Bengaluru


start-dfs.sh
start-yarn.sh
```
----------------------------------------------------------------------------------------------------------------------------------------

### **Step 2: Install Python Libraries**


To interact with Hadoop using Python, install the `hdfs` library:
```bash
pip install hdfs
```
----------------------------------------------------------------------------------------------------------------------------------------

### **Step 3: Implement File Management Tasks in Python**


Below is a Python script to perform file management tasks in Hadoop HDFS:

```python
from hdfs import InsecureClient

# Connect to HDFS
client = InsecureClient('http://localhost:9870', user='your-username')

# 1. Adding files and directories to HDFS


def add_file_to_hdfs(local_path, hdfs_path):
"""Upload a file from local filesystem to HDFS."""
client.upload(hdfs_path, local_path)
print(f"File {local_path} uploaded to HDFS at {hdfs_path}")

def create_directory_in_hdfs(hdfs_path):
"""Create a directory in HDFS."""
client.makedirs(hdfs_path)
print(f"Directory {hdfs_path} created in HDFS")

# 2. Retrieving files from HDFS


def retrieve_file_from_hdfs(hdfs_path, local_path):
"""Download a file from HDFS to local filesystem."""
client.download(hdfs_path, local_path, overwrite=True)
print(f"File {hdfs_path} downloaded to {local_path}")

# 3. Deleting files and directories from HDFS


def delete_file_or_directory_in_hdfs(hdfs_path):
"""Delete a file or directory from HDFS."""
client.delete(hdfs_path, recursive=True)
print(f"File/Directory {hdfs_path} deleted from HDFS")

# Example usage

Dept. of ISE Atria Institute of Technology, Bengaluru


if __name__ == "__main__":
# Add a file to HDFS
add_file_to_hdfs('local_file.txt', '/user/hadoop/input/local_file.txt')

# Create a directory in HDFS


create_directory_in_hdfs('/user/hadoop/input/new_directory')

# Retrieve a file from HDFS


retrieve_file_from_hdfs('/user/hadoop/input/local_file.txt', 'downloaded_file.txt')

# Delete a file or directory from HDFS


delete_file_or_directory_in_hdfs('/user/hadoop/input/local_file.txt')
```
----------------------------------------------------------------------------------------------------------------------------------------
### **Step 4: Run the Python Script**
1. Save the script as `hdfs_operations.py`.
2. Run the script:
```bash
python hdfs_operations.py
```

### **Explanation of Tasks**


1. **Adding Files and Directories**:
- Use `client.upload()` to upload files from the local filesystem to HDFS.
- Use `client.makedirs()` to create directories in HDFS.

2. **Retrieving Files**:
- Use `client.download()` to download files from HDFS to the local filesystem.

3. **Deleting Files and Directories**:


- Use `client.delete()` to delete files or directories from HDFS.

----------------------------------------------------------------------------------------------------------------------------------------
### **Notes**
- Replace `your-username` with your Hadoop username.
- Ensure Hadoop services (NameNode, DataNode, etc.) are running before executing the script.

Dept. of ISE Atria Institute of Technology, Bengaluru


2. Develop a MapReduce program to implement Matrix Multiplication

Matrix multiplication is a classic problem that can be efficiently parallelized using the MapReduce
programming model. Below is a Python implementation of matrix multiplication using the MapReduce
paradigm. We'll use the mrjob library, which allows us to write MapReduce programs in Python.

-----------------------------------------------------------------------------------------------------------------------------

### **Step 1: Install `mrjob`**


Install the `mrjob` library using pip:
```bash
pip install mrjob
```
-----------------------------------------------------------------------------------------------------------------------------
### **Step 2: Matrix Multiplication Using MapReduce**

#### **Input Format**


Assume the input matrices are stored in a file in the following format:
- Each line represents an element of the matrix as `(i, j, value)`, where `i` is the row index, `j` is the column
index, and `value` is the matrix element.

For example, for a 2x2 matrix:


```
0,0,1
0,1,2
1,0,3
1,1,4
```
#### **MapReduce Program**
Here is the Python code for matrix multiplication using MapReduce:

```python
from mrjob.job import MRJob
import sys

class MatrixMultiplication(MRJob):

def configure_args(self):
super(MatrixMultiplication, self).configure_args()
self.add_passthru_arg('--size', type=int, help='Size of the matrices (N x N)')

def mapper(self, _, line):


# Parse the input line
i, j, value = map(int, line.split(','))

Dept. of ISE Atria Institute of Technology, Bengaluru


# Emit key-value pairs for matrix A and matrix B
if self.options.size is None:
sys.stderr.write("Error: Matrix size not specified. Use --size=N.\n")
sys.exit(1)

N = self.options.size

# Emit entries for matrix A


for k in range(N):
yield (i, k), ('A', j, value)

# Emit entries for matrix B


for k in range(N):
yield (k, j), ('B', i, value)

def reducer(self, key, values):


# Initialize lists to store values from matrix A and matrix B
A = {}
B = {}

# Separate values from matrix A and matrix B


for value in values:
matrix, index, val = value
if matrix == 'A':
A[index] = val
elif matrix == 'B':
B[index] = val

# Compute the product for the cell (i, j)


result = 0
for k in A:
if k in B:
result += A[k] * B[k]

# Emit the result


yield key, result

if __name__ == '__main__':
MatrixMultiplication.run()
```

-----------------------------------------------------------------------------------------------------------------------------
-----------

Dept. of ISE Atria Institute of Technology, Bengaluru


### **Step 3: Input File**
Create an input file `matrices.txt` with the following content (example for 2x2 matrices):
```
0,0,1
0,1,2
1,0,3
1,1,4
0,0,5
0,1,6
1,0,7
1,1,8
```

Here, the first four lines represent matrix A, and the next four lines represent matrix B.

### **Step 4: Run the MapReduce Job**


Run the MapReduce job using the following command:
```bash
python matrix_multiplication.py --size=2 matrices.txt
``
-------------------------------------------------------------------------------------------------------------------------------
-----------
### **Explanation of the Code**
1. **Mapper**:
- For each element of matrix A, emit `(i, k)` as the key and `('A', j, value)` as the value for all `k`
(columns of the resulting matrix).
- For each element of matrix B, emit `(k, j)` as the key and `('B', i, value)` as the value for all `k` (rows
of the resulting matrix).

2. **Reducer**:
- Collect all values for a given key `(i, j)`.
- Separate values from matrix A and matrix B.
- Compute the dot product of the corresponding row from matrix A and column from matrix B.
- Emit the result for the cell `(i, j)`.
-----------------------------------------------------------------------------------------------------------------------------
-----------

### **Output**
The output will be the resulting matrix in the format:
```
(i, j) value
```

For the example input, the output will be:


```

Dept. of ISE Atria Institute of Technology, Bengaluru


(0, 0) 19
(0, 1) 22
(1, 0) 43
(1, 1) 50
```

-----------------------------------------------------------------------------------------------------------------------------
-----------
### **Notes**
- The `--size` argument specifies the size of the matrices (N x N).
- This implementation assumes square matrices. For non-square matrices, additional adjustments are
needed.
- The `mrjob` library can run locally for testing or on a Hadoop cluster for distributed processing.
-------------------------------------------------------------------------------------------------------------------------------

3. Develop a Map Reduce program that mines weather data and displays
appropriate messages indicating the weather conditions of the day.

To develop a MapReduce program that mines weather data and displays appropriate messages
indicating the weather conditions of the day, we can use the `mrjob` library in Python. The program
will analyze weather data (e.g., temperature, humidity, precipitation) and generate messages like "Hot
day", "Rainy day", "Cold day", etc.

------------------------------------------------------------------------------------------------------------------------
-----------
### **Step 1: Install `mrjob`**
Install the `mrjob` library using pip:
```bash
pip install mrjob
```
------------------------------------------------------------------------------------------------------------------------
-----------
### **Step 2: Weather Data Format**
Assume the weather data is stored in a CSV file with the following columns:
- `Date`: The date of the weather record.
- `Temperature`: The temperature in Celsius.
- `Humidity`: The humidity percentage.
- `Precipitation`: The amount of precipitation in millimeters.

Example input file (`weather_data.csv`):


```
2023-10-01,25,60,0
2023-10-02,30,70,5
2023-10-03,15,80,10

Dept. of ISE Atria Institute of Technology, Bengaluru


2023-10-04,10,90,15
2023-10-05,35,50,0
```
------------------------------------------------------------------------------------------------------------------------
-----------

### **Step 3: MapReduce Program**


Here is the Python code for the MapReduce program:

```python
from mrjob.job import MRJob

class WeatherAnalysis(MRJob):

def mapper(self, _, line):


# Split the CSV line into fields
fields = line.strip().split(',')

# Extract relevant fields


date = fields[0]
temperature = float(fields[1])
humidity = float(fields[2])
precipitation = float(fields[3])

# Determine weather condition


if temperature > 30:
condition = "Hot day"
elif temperature < 10:
condition = "Cold day"
else:
condition = "Moderate day"

if precipitation > 0:
condition += ", Rainy day"
if humidity > 80:
condition += ", Humid day"

# Emit the date and weather condition


yield date, condition

def reducer(self, date, conditions):


# Combine all conditions for the same date
unique_conditions = set(conditions)
yield date, ", ".join(unique_conditions)

Dept. of ISE Atria Institute of Technology, Bengaluru


if __name__ == '__main__':
WeatherAnalysis.run()
```

### **Step 4: Run the MapReduce Job**


Run the MapReduce job using the following command:
```bash
python weather_analysis.py weather_data.csv
```

------------------------------------------------------------------------------------------------------------------------
-----------
### **Explanation of the Code**
1. **Mapper**:
- Reads each line of the input file and extracts the date, temperature, humidity, and precipitation.
- Determines the weather condition based on the following rules:
- If temperature > 30°C: "Hot day"
- If temperature < 10°C: "Cold day"
- Otherwise: "Moderate day"
- Adds additional conditions if precipitation > 0 ("Rainy day") or humidity > 80% ("Humid day").
- Emits the date as the key and the weather condition as the value.

2. **Reducer**:
- Combines all weather conditions for the same date.
- Emits the date and the combined weather conditions.

------------------------------------------------------------------------------------------------------------------------
-----------

### **Example Output**


For the input file `weather_data.csv`, the output will be:
```
"2023-10-01" "Moderate day"
"2023-10-02" "Hot day, Rainy day"
"2023-10-03" "Moderate day, Rainy day, Humid day"
"2023-10-04" "Cold day, Rainy day, Humid day"
"2023-10-05" "Hot day"
```
------------------------------------------------------------------------------------------------------------------------
-----------
### **Customization**
- You can modify the rules in the mapper to define different weather conditions based on your
requirements.

Dept. of ISE Atria Institute of Technology, Bengaluru


- Add more fields (e.g., wind speed, visibility) to the input data and extend the mapper logic to analyze
them.

------------------------------------------------------------------------------------------------------------------------
-----------

4. Develop a MapReduce program to find the tags associated with each movie
by analyzing movie lens data.

To develop a MapReduce program to find the tags associated with each movie using MovieLens data, we
need to follow these steps:

1. **Understand the Data**: MovieLens dataset typically contains files like `movies.csv`, `tags.csv`, etc.
The `tags.csv` file contains user-generated tags for movies, with columns like `userId`, `movieId`, `tag`,
and `timestamp`.

2. **MapReduce Overview**:
- **Mapper**: The mapper will process each line of the `tags.csv` file and emit key-value pairs where
the key is the `movieId` and the value is the `tag`.
- **Reducer**: The reducer will collect all tags for each `movieId` and output the `movieId` along with
the list of associated tags.

3. **Implementation**:

Below is a Python implementation using the `mrjob` library, which is a Python package for writing
MapReduce jobs.

### Step 1: Install `mrjob`


```bash
pip install mrjob
```

### Step 2: Write the MapReduce Program

```python
from mrjob.job import MRJob
from mrjob.step import MRStep

class MovieTags(MRJob):

def mapper(self, _, line):


# Split the line into fields
fields = line.split(',')

# Extract movieId and tag

Dept. of ISE Atria Institute of Technology, Bengaluru


movieId = fields[1]
tag = fields[2]

# Emit movieId and tag


yield movieId, tag

def reducer(self, movieId, tags):


# Collect all tags for a movieId
tag_list = list(tags)

# Emit movieId and the list of tags


yield movieId, tag_list

def steps(self):
return [
MRStep(mapper=self.mapper,
reducer=self.reducer)
]

if __name__ == '__main__':
MovieTags.run()
```

### Step 3: Run the MapReduce Job

Save the above code in a file, say `movie_tags.py`, and run it using the following command:

```bash
python movie_tags.py tags.csv > output.txt
```

### Explanation:

- **Mapper**:
- The mapper reads each line from the `tags.csv` file.
- It splits the line into fields and extracts `movieId` and `tag`.
- It emits a key-value pair where the key is `movieId` and the value is `tag`.

- **Reducer**:
- The reducer receives all tags for a particular `movieId`.
- It collects these tags into a list and emits the `movieId` along with the list of tags.

### Step 4: Analyze the Output

The output will be in the form:

Dept. of ISE Atria Institute of Technology, Bengaluru


```
movieId1 [tag1, tag2, tag3, ...]
movieId2 [tag4, tag5, ...]
...
```

Each line represents a movie and the list of tags associated with it.

### Step 5: Optional Enhancements

- **Sorting**: You can sort the tags alphabetically or by frequency.


- **Filtering**: You can filter out less frequent tags or apply other criteria.
- **Integration with Movie Titles**: You can join the output with `movies.csv` to include movie titles in
the final output.

This MapReduce program will efficiently process large datasets and provide the tags associated with each
movie in the MovieLens dataset.

Dept. of ISE Atria Institute of Technology, Bengaluru


5. Implement Functions: Count – Sort – Limit – Skip – Aggregate using
MongoDB

To implement MongoDB functions like **Count**, **Sort**, **Limit**, **Skip**, and **Aggregate**
using Python, you can use the `pymongo` library. Below is a step-by-step guide with examples.

---

### **Step 1: Install `pymongo`**


Install the `pymongo` library if you haven't already:
```bash
pip install pymongo
```

---

### **Step 2: Connect to MongoDB**


First, establish a connection to your MongoDB instance and select the database and collection.

```python
from pymongo import MongoClient

# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")

# Select database and collection


db = client["movie_lens"]
collection = db["movies"]
```

---

### **Step 3: Implement Functions**

#### **1. Count**


Count the number of documents in a collection or documents that match a query.

```python
# Count all documents
total_movies = collection.count_documents({})
print(f"Total movies: {total_movies}")

# Count documents that match a query

Dept. of ISE Atria Institute of Technology, Bengaluru


action_movies_count = collection.count_documents({"genres": "Action"})
print(f"Action movies: {action_movies_count}")
```

---

#### **2. Sort**


Sort documents based on a specific field.

```python
# Sort movies by release year in descending order
sorted_movies = collection.find().sort("year", -1)

# Print the first 5 sorted movies


for movie in sorted_movies.limit(5):
print(movie)
```

---

#### **3. Limit**


Limit the number of documents returned by a query.

```python
# Get the first 10 movies
limited_movies = collection.find().limit(10)

for movie in limited_movies:


print(movie)
```

---

#### **4. Skip**


Skip a specified number of documents in the result set.

```python
# Skip the first 5 movies and return the rest
skipped_movies = collection.find().skip(5)

for movie in skipped_movies.limit(5): # Print the next 5 movies


print(movie)
```

---

Dept. of ISE Atria Institute of Technology, Bengaluru


#### **5. Aggregate**
Perform complex data transformations using the aggregation pipeline.

```python
# Find the average rating for each movie and sort by average rating
pipeline = [
{
"$group": {
"_id": "$movieId",
"averageRating": {"$avg": "$rating"}
}
},
{
"$sort": {"averageRating": -1}
},
{
"$limit": 10 # Limit to top 10 results
}
]

# Execute the aggregation pipeline


results = collection.aggregate(pipeline)

for result in results:


print(result)
```

---

### **Step 4: Combining Functions**


You can combine these functions in a single query or aggregation pipeline.

**Example**:
Find the top 5 highest-rated movies, skip the first 2, and return the rest.

```python
pipeline = [
{
"$group": {
"_id": "$movieId",
"averageRating": {"$avg": "$rating"}
}
},
{

Dept. of ISE Atria Institute of Technology, Bengaluru


"$sort": {"averageRating": -1}
},
{
"$skip": 2 # Skip the first 2 results
},
{
"$limit": 5 # Limit to the next 5 results
}
]

# Execute the aggregation pipeline


results = collection.aggregate(pipeline)

for result in results:


print(result)
```

---
### **Step 5: Additional Aggregation Example**

#### **Count Documents in a Group**


Count the number of movies in each genre.

```python
pipeline = [
{
"$unwind": "$genres" # Unwind the genres array
},
{
"$group": {
"_id": "$genres",
"count": {"$sum": 1}
}
},
{
"$sort": {"count": -1} # Sort by count in descending order
}
]

# Execute the aggregation pipeline


results = collection.aggregate(pipeline)

for result in results:


print(result)
```

Dept. of ISE Atria Institute of Technology, Bengaluru


---
### **Step 6: Filter and Aggregate**
Find the total number of ratings for each movie and filter movies with more than 100 ratings.

```python
pipeline = [
{
"$group": {
"_id": "$movieId",
"totalRatings": {"$sum": 1}
}
},
{
"$match": {
"totalRatings": {"$gt": 100} # Filter movies with more than 100 ratings
}
},
{
"$sort": {"totalRatings": -1} # Sort by totalRatings in descending order
}
]

# Execute the aggregation pipeline


results = collection.aggregate(pipeline)

for result in results:


print(result)
```
---
### **Summary**
- Use `count_documents()` for counting.
- Use `sort()` for sorting.
- Use `limit()` and `skip()` for pagination.
- Use `aggregate()` for complex data transformations.

These examples demonstrate how to use MongoDB functions in Python with the `pymongo` library. You
can adapt these examples to your specific dataset and requirements.

Dept. of ISE Atria Institute of Technology, Bengaluru


6. Develop Pig Latin scripts to sort, group, join, project, and filter the data

employees.csv
1, John Doe, 101, 50000
2, Jane Smith, 102, 60000
3, Jim Brown, 101, 55000
4, Jake White, 103, 70000

departments.csv
101, HR
102, Engineering
103, Marketing

-- Load the employees data


employees = LOAD 'employees.csv' USING PigStorage(',') AS (employee_id:int, name:chararray,
department_id:int, salary:float);

-- Load the departments data


departments = LOAD 'departments.csv' USING PigStorage(',') AS (department_id:int,
department_name:chararray);

-- Filter employees with salary greater than 55000


high_salary_employees = FILTER employees BY salary > 55000;

-- Project only the name and salary of high salary employees


projected_employees = FOREACH high_salary_employees GENERATE name, salary;

-- Group employees by department_id


grouped_employees = GROUP employees BY department_id;

-- Join employees with departments to get department names


joined_data = JOIN employees BY department_id, departments BY department_id;

-- Sort employees by salary in descending order


sorted_employees = ORDER employees BY salary DESC;

-- Store the results of the sorted employees


STORE sorted_employees INTO 'sorted_employees_output' USING PigStorage(',');

Dept. of ISE Atria Institute of Technology, Bengaluru


Notes
• Make sure to replace the file paths with the actual paths where your data files are located.
• Adjust the schema in the LOAD statements according to your actual data structure.
• You can run these scripts in a Pig environment, such as Apache Pig on Hadoop.

7. Use Hive to create, alter, and drop databases, tables, views, functions, and
indexes.

-- Create a new database


CREATE DATABASE IF NOT EXISTS company_db;

-- Switch to the newly created database


USE company_db;

-- Create an employees table


CREATE TABLE IF NOT EXISTS employees (
employee_id INT,
name STRING,
department_id INT,
salary FLOAT
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

-- Create a departments table


CREATE TABLE IF NOT EXISTS departments (
department_id INT,
department_name STRING
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

-- Add a new column to the employees table


ALTER TABLE employees ADD COLUMNS (hire_date DATE);

-- Create a view to show high salary employees


CREATE VIEW IF NOT EXISTS high_salary_employees AS
SELECT name, salary
FROM employees
WHERE salary > 60000;

-- Create an index on the employees table for the department_id column


CREATE INDEX idx_department_id
ON TABLE employees (department_id)

Dept. of ISE Atria Institute of Technology, Bengaluru


AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
WITH DEFERRED REBUILD;

-- Drop the view


DROP VIEW IF EXISTS high_salary_employees;

-- Drop the employees table


DROP TABLE IF EXISTS employees;

-- Drop the departments table


DROP TABLE IF EXISTS departments;

-- Drop the index


DROP INDEX IF EXISTS idx_department_id ON TABLE employees;

-- Drop the database


DROP DATABASE IF EXISTS company_db CASCADE;

Notes
• Make sure to replace the paths and class names with the actual paths and class names for your
UDFs.
• The CASCADE option in the DROP DATABASE command will drop all tables and views in the
database.
• You can run these commands in a Hive shell or through a Hive client that supports HiveQL.

Dept. of ISE Atria Institute of Technology, Bengaluru


8. Implement a word count program in Hadoop and Spark

Word Count in Hadoop Using Python (Hadoop Streaming)

Step 1: Create the Mapper and Reducer Scripts

Create two Python scripts: mapper.py and reducer.py.

mapper.py
#!/usr/bin/env python

import sys

# Read from standard input


for line in sys.stdin:
# Strip whitespace and split the line into words
words = line.strip().split()
for word in words:
# Output each word with a count of 1
print(f"{word}\t1")

reducer.py

#!/usr/bin/env python

import sys

current_word = None
current_count = 0

# Read from standard input


for line in sys.stdin:
word, count = line.strip().split('\t')
count = int(count)

if current_word == word:
current_count += count
else:
if current_word:
# Output the count for the previous word
print(f"{current_word}\t{current_count}")
current_word = word
current_count = count

Dept. of ISE Atria Institute of Technology, Bengaluru


# Output the count for the last word
if current_word == word:
print(f"{current_word}\t{current_count}")

Step 2: Make the Scripts Executable


Make sure both scripts are executable:

chmod +x mapper.py reducer.py

Step 3: Run the Hadoop Streaming Job


You can run the Hadoop Streaming job using the following command:

hadoop jar /path/to/hadoop-streaming.jar \


-input <input_directory> \
-output <output_directory> \
-mapper ./mapper.py \
-reducer ./reducer.py \
-file ./mapper.py \
-file ./reducer.py

Word Count in Apache Spark Using Python (PySpark)

Step 1: Create the Spark Program


Create a Python file named word_count.py.
from pyspark import SparkContext

def main():
# Create a SparkContext
sc = SparkContext("local", "Word Count")

# Read the input file


input_file = "input.txt" # Change this to your input file path
text_file = sc.textFile(input_file)

# Split the lines into words and count them


word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)

# Save the output


output_file = "output" # Change this to your desired output path
word_counts.saveAsTextFile(output_file)

# Stop the SparkContext


sc.stop()

Dept. of ISE Atria Institute of Technology, Bengaluru


if __name__ == "__main__":
main()

Step 2: Run the Program


Run the Spark job using the following command:

spark-submit word_count.py

Notes
• For the Hadoop Streaming example, ensure you have Hadoop installed and configured properly.
• For the Spark example, ensure you have Apache Spark installed and configured.
• The input file for both examples should be a text file containing the text you want to analyze.
• The output will be saved in the specified output directory, which will contain the word counts in
text format. If the output directory already exists, you will need to delete it before running the job
again.

Sample Input: input.txt

Hello world
Hello Hadoop
Hello Spark
Hadoop is great
Spark is great

hadoop jar /path/to/hadoop-streaming.jar \


-input input.txt \
-output output_directory \
-mapper ./mapper.py \
-reducer ./reducer.py \
-file ./mapper.py \
-file ./reducer.py

Sample Output
Hadoop 2
Hello 3
Spark 2
is 2
great 2
world 1

Running the Spark Job: spark-submit word_count.py

Dept. of ISE Atria Institute of Technology, Bengaluru


9. Use CDH (Cloudera Distribution for Hadoop) and HUE (Hadoop User
Interface) to analyze data and generate reports for sample datasets

Prerequisites
1. CDH Installation: Ensure that you have CDH installed and running on your cluster.
2. Hue Installation: Ensure that Hue is installed and configured to connect to your Hadoop cluster.
3. Sample Dataset: For this example, we will use a sample dataset, such as a CSV file containing
employee data.
Sample Dataset
Let's assume we have a CSV file named employees.csv with the following content:
employee_id,name,department,salary
1,John Doe,Engineering,70000
2,Jane Smith,Marketing,60000
3,Jim Brown,Engineering,80000
4,Jake White,Sales,50000
5,Emily Davis,Marketing,75000

Step 1: Upload the Dataset to HDFS


1. Access Hue: Open your web browser and navigate to the Hue interface (usually at http://<your-
hue-server>:8888).
2. Login: Log in with your credentials.
3. File Browser: Navigate to the "File Browser" application in Hue.
4. Upload File: Click on the "Upload" button and select the employees.csv file from your local
machine. Choose the target directory in HDFS where you want to upload the file (e.g., /user/hue/).

Step 2: Create a Hive Table


1. Navigate to Hive: In the Hue interface, go to the "Hive" application.
2. Create Table: Use the following HiveQL command to create a table for the uploaded dataset:

CREATE TABLE IF NOT EXISTS employees (


employee_id INT,
name STRING,
department STRING,
salary FLOAT
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hue/employees.csv';

3. Run the Query: Click on the "Execute" button to run the query.

Step 3: Analyze Data


Now that we have the data in Hive, we can perform various analyses. Here are a few example queries:
1. Count Employees by Department

Dept. of ISE Atria Institute of Technology, Bengaluru


SELECT department, COUNT(*) AS employee_count
FROM employees
GROUP BY department;

2. Average Salary by Department


SELECT department, AVG(salary) AS average_salary
FROM employees
GROUP BY department;

3. List Employees with Salary Greater than 60000


SELECT name, salary
FROM employees
WHERE salary > 60000;

Step 4: Generate Reports


1. Run Queries: Execute the above queries in the Hive interface.
2. View Results: After executing each query, you will see the results displayed in a tabular format
below the query editor.
3. Export Results: You can export the results to CSV or other formats by clicking on the "Download"
button (usually found in the results section).

Step 5: Visualize Data (Optional)


If you want to create visualizations:
1. Navigate to the "Dashboard": In Hue, you can create dashboards to visualize your data.
2. Create a New Dashboard: Click on "Dashboards" and create a new dashboard.
3. Add Charts: You can add charts based on the results of your queries. For example, you can create
a bar chart to show the number of employees by department.

Using CDH and Hue, you can easily upload datasets, analyze them using Hive, and generate reports. The
Hue interface provides a user-friendly way to interact with Hadoop components, making it easier to perform
data analysis without needing to write complex code.

• Ensure that your CDH and Hue installations are properly configured and that you have the
necessary permissions to create tables and run queries.
• You can also explore other applications in Hue, such as Pig, Impala, and Oozie, for more advanced
data processing and scheduling tasks.

Dept. of ISE Atria Institute of Technology, Bengaluru


VIVA QUESTIONS
1. **What is Big Data?**
- Explain the characteristics of Big Data (Volume, Variety, Velocity, Veracity, Value).

2. **What are the different types of Big Data?**


- Discuss structured, semi-structured, and unstructured data.

3. **What is the difference between data and information?**


- Explain how data is processed to become information.

4. **What are the challenges associated with Big Data?**


- Discuss issues like data storage, processing, analysis, and privacy.

### Tools and Technologies


5. **What is Hadoop?**
- Explain the architecture of Hadoop and its components (HDFS, MapReduce, YARN).

6. **What is the role of HDFS in Hadoop?**


- Discuss how HDFS stores data and its advantages.

7. **What is MapReduce?**
- Explain the MapReduce programming model and its phases (Map, Shuffle, Reduce).

8. **What are some alternatives to Hadoop?**


- Discuss tools like Apache Spark, Apache Flink, and Apache Storm.

9. **What is Apache Spark?**


- Explain the architecture of Spark and how it differs from Hadoop MapReduce.

10. **What is a Data Lake?**


- Discuss the concept of a data lake and how it differs from a data warehouse.

Data Processing and Analysis


11. **What is ETL?**
- Explain the Extract, Transform, Load process in data warehousing.

12. **What are some common data processing frameworks?**


- Discuss frameworks like Apache Spark, Apache Flink, and Apache Beam.

13. **What is the difference between batch processing and stream processing?**

Dept. of ISE Atria Institute of Technology, Bengaluru


- Explain the use cases for each type of processing.

14. **What is a NoSQL database?**


- Discuss the types of NoSQL databases (document, key-value, column-family, graph).

15. **What is SQL-on-Hadoop?**


- Explain tools like Apache Hive and Impala that allow SQL queries on Hadoop.

### Machine Learning and Analytics


16. **What is the role of machine learning in Big Data analytics?**
- Discuss how machine learning algorithms can be applied to large datasets.

17. **What are some common machine learning algorithms used in Big Data?**
- Explain algorithms like linear regression, decision trees, clustering, etc.

18. **What is the difference between supervised and unsupervised learning?**


- Discuss examples of each type of learning.

19. **What is feature engineering?**


- Explain the importance of feature selection and transformation in machine learning.

20. **What is the significance of data visualization in Big Data analytics?**


- Discuss tools and techniques for visualizing large datasets.

### Practical Applications


21. **Can you describe a project you worked on in the lab?**
- Be prepared to discuss the objectives, tools used, and outcomes of your project.

22. **What challenges did you face during your project, and how did you overcome
them?**
- Discuss specific problems and your problem-solving approach.

23. **How do you ensure data quality in your analyses?**


- Explain techniques for data cleaning and validation.

24. **What are some best practices for working with Big Data?**
- Discuss practices related to data governance, security, and performance optimization.

25. **How do you interpret the results of your data analysis?**


- Explain how you derive insights and make decisions based on your findings.

Dept. of ISE Atria Institute of Technology, Bengaluru

You might also like