[go: up one dir, main page]

0% found this document useful (0 votes)
21 views6 pages

semi-join

Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
21 views6 pages

semi-join

Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 6

EXERCISE-9

Perform an efficient semi-join in MapReduce. Hint: Perform a semi-join by


having the mappers load a Bloom filter from the Distributed Cache, and then
filter results from the actual MapReduce data source by performing
membership queries against the Bloom filter to determine which data source
records should be emitted to the reducers.
Description:
A semi-join is a type of join operation that returns only the rows from one
table that have matching records in another table. In MapReduce, an
efficient way to perform a semi-join operation is to use the Map-side Join
technique.

The Map-side Join technique involves replicating the smaller table in


memory across the Map tasks, and then broadcasting it to all the Map
tasks to perform the join operation locally. This reduces the amount of
data that needs to be shuffled across the network, thereby reducing the
overall computational time.

Another efficient way to perform a semi-join in MapReduce is by using a


Bloom filter. A Bloom filter is a probabilistic data structure used to test
whether an element is a member of a set. By loading a Bloom filter in the
mapper phase, we can filter out the records that do not match, without
performing a full reduce operation.

Here are the steps involved in performing a semi-join using a Bloom filter:

1. Create a Bloom filter on the join attribute of the dataset that we


want to filter.
2. Load the Bloom filter in the mapper phase using the Distributed
Cache.
3. Map each record in the dataset and test whether the join attribute
exists in the Bloom filter.
4. If the join attribute is present in the Bloom filter, emit the record to
the reducer phase.
5. In the reducer phase, only process the records from the first dataset
and ignore the records from the second dataset.

Here is the pseudo-code to perform a semi-join using a Bloom filter:

Setup:

// Load the Bloom filter from the Distributed Cache

bloomFilter = loadBloomFilter()

Map (key, value):


EXERCISE-9
// value is the record from dataset

joinAttribute = extractJoinAttribute(value)

if (bloomFilter.contains(joinAttribute)):

emit (joinAttribute, value)

Reduce (key, values):

// values is a list of records

for each value in values:

// Check if the record is from the first dataset

if (value is from first dataset):

// Process the record

...

Cleanup:

// Release the resources used by the Bloom filter

bloomFilter.cleanup()

By performing the semi-join in this way, we can significantly reduce the amount
of data that needs to be shuffled and sorted, leading to faster processing times.
However, it is important to note that Bloom filters have a probability of false
positives, which means that some records may be erroneously included in the
output. Therefore, Bloom filters should be used with caution and the probability
of false positives should be carefully controlled to ensure the correctness of the
output.

Program:
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
EXERCISE-9
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.bloom.BloomFilter;

import org.apache.hadoop.util.bloom.Key;

import java.io.IOException;

import java.util.Arrays;

import java.util.HashSet;

import java.util.Set;

public class SemiJoinMapReduce {

// The size of the Bloom filter

private static final int BLOOM_FILTER_SIZE = 1000000;

// The number of hash functions to use in the Bloom filter

private static final int NUM_HASH_FUNCTIONS = 5;

// The false positive rate of the Bloom filter

private static final float FALSE_POSITIVE_RATE = 0.01f;

public static class BloomFilterMapper extends Mapper<LongWritable, Text, Text, Text> {

private BloomFilter bloomFilter = new BloomFilter(BLOOM_FILTER_SIZE,


NUM_HASH_FUNCTIONS);

@Override

protected void setup(Context context) throws IOException, InterruptedException {

// Load the Bloom filter from the Distributed Cache

String[] bloomFilterStrings = context.getConfiguration().getStrings("bloomFilter");

for (String bloomFilterString : bloomFilterStrings) {

bloomFilter.add(new Key(bloomFilterString.getBytes()));

}
EXERCISE-9

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {

// Split the input record into fields

String[] fields = value.toString().split(",");

// Get the join attribute from the record

String joinAttribute = fields[0];

// Check if the join attribute is present in the Bloom filter

if (bloomFilter.membershipTest(new Key(joinAttribute.getBytes()))) {

// Emit the record to the reducer phase

context.write(new Text(joinAttribute), value);

public static class SemiJoinReducer extends Reducer<Text, Text, Text, Text> {

@Override

protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {

Set<String> firstDataset = new HashSet<>();

// Collect the records from the first dataset

for (Text value : values) {

String[] fields = value.toString().split(",");

if (fields.length > 1) {

firstDataset.add(Arrays.toString(Arrays.copyOfRange(fields, 1, fields.length)));

// Process the records from the first dataset


EXERCISE-9
for (String value : firstDataset) {

context.write(key, new Text(value));

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "Semi Join MapReduce");

job.setJarByClass(SemiJoinMapReduce.class);

job.setMapperClass(BloomFilterMapper.class);

job.setReducerClass(SemiJoinReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);

// Set the Bloom filter in the Distributed Cache

String[] bloomFilterStrings = {"value1", "value2", "value3"};

job.getConfiguration().setStrings("bloomFilter", bloomFilterStrings);

DistributedCache.addCacheFile(new Path("/path/to/bloomfilter/file").toUri(),
job.getConfiguration());

// Set the input and output paths

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// Submit the job to the cluster and wait for it to finish

boolean success = job.waitForCompletion(true);

// Print a message indicating whether the job was successful or not

if (success) {

System.out.println("Semi Join MapReduce job completed successfully!");


EXERCISE-9
} else {

System.out.println("Semi Join MapReduce job failed!");

You might also like