semi-join
semi-join
Here are the steps involved in performing a semi-join using a Bloom filter:
Setup:
bloomFilter = loadBloomFilter()
joinAttribute = extractJoinAttribute(value)
if (bloomFilter.contains(joinAttribute)):
...
Cleanup:
bloomFilter.cleanup()
By performing the semi-join in this way, we can significantly reduce the amount
of data that needs to be shuffled and sorted, leading to faster processing times.
However, it is important to note that Bloom filters have a probability of false
positives, which means that some records may be erroneously included in the
output. Therefore, Bloom filters should be used with caution and the probability
of false positives should be carefully controlled to ensure the correctness of the
output.
Program:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
EXERCISE-9
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@Override
bloomFilter.add(new Key(bloomFilterString.getBytes()));
}
EXERCISE-9
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
if (bloomFilter.membershipTest(new Key(joinAttribute.getBytes()))) {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
if (fields.length > 1) {
firstDataset.add(Arrays.toString(Arrays.copyOfRange(fields, 1, fields.length)));
job.setJarByClass(SemiJoinMapReduce.class);
job.setMapperClass(BloomFilterMapper.class);
job.setReducerClass(SemiJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.getConfiguration().setStrings("bloomFilter", bloomFilterStrings);
DistributedCache.addCacheFile(new Path("/path/to/bloomfilter/file").toUri(),
job.getConfiguration());
if (success) {