You are currently viewing How to find top N elements using MapReduce.

How to find top N elements using MapReduce.

MapReduce is a programming approach that allows a cluster of commodity hardware devices to handle enormous amounts of data. We will understand how MapReduce is working by implementing to find the top N using MapReduce.

In this blog, you will learn about how to find top N with the help of the MapReduce program.

MapReduce diagram to find top N in MapReduce

Block diagram to find Top N using MapReduce

Explanation of the diagram

Let’s look at the diagram and see how we can find the top N elements from a file.

  • So, first and foremost, the input.txt file will be used as input. Because we’re using Hadoop, there will be several nodes. As a result, we can partition data into several pieces. It’s referred to as a block in Hadoop.
  • There are three blocks in the diagram: A, B, and C. Multiple mappers will be present. As a result, each mapper will be assigned to a distinct block.
  • The count of each unique element will be calculated at the mapper level. As a result, each mapper will have its own top element. It will be sent through the reducer.
  • Finally, we’ll calculate top elements at the Reducer level using inputs from the mapper side.

Implementation

In order to calculate top N elements, you must take the actions listed below. First and foremost, we must develop a driver class in which we will specify which class is the mapper and which is the reducer. What are the input and output formats, as well as other details?

In the below figure movie.txt file will be given as input and the output will be the top 10 movies based on popularity.

Input txt file

Let’s take a look at the implementation and see how it works:

Step 1- Implementation of the Mapper class

import java.io.*;
import java.util.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class top_10_Movies_Mapper extends Mapper<Object,
							Text, Text, LongWritable> {

	private TreeMap<Long, String> tmap;

	@Override
	public void setup(Context context) throws IOException,
									InterruptedException
	{
		tmap = new TreeMap<Long, String>();
	}

	@Override
	public void map(Object key, Text value,
	Context context) throws IOException,
					InterruptedException
	{

		// input data format => movie_name
		// no_of_views (tab separated)
		// we split the input data
		String[] tokens = value.toString().split("\t");

		String movie_name = tokens[0];
		long no_of_views = Long.parseLong(tokens[1]);

		// insert data into treeMap,
		// we want top 10 viewed movies
		// so we pass no_of_views as key
		tmap.put(no_of_views, movie_name);

		// we remove the first key-value
		// if it's size increases 10
		if (tmap.size() > 10)
		{
			tmap.remove(tmap.firstKey());
		}
	}

	@Override
	public void cleanup(Context context) throws IOException,
									InterruptedException
	{
		for (Map.Entry<Long, String> entry : tmap.entrySet())
		{

			long count = entry.getKey();
			String name = entry.getValue();

			context.write(new Text(name), new LongWritable(count));
		}
	}
}

The Mapper class has three functions: setup, map, and cleanup; the setup and cleanup methods are only used once, whereas the map method is invoked several times.

  • In the setup method, we create a treemap with key as a no. of views, and the value will be the movie name.
  • We read the file block line by line in the map method, having separated the file block by the tab with the first element movie name, and the second field will be seen.  It will be added to the TreeMap object.
  • We’ll iterate through TreeMap and write output to the reducer in the cleanup method.

Step 2- Implementation of the Reducer class

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class top_10_Movies_Reducer extends Reducer<Text,
					LongWritable, LongWritable, Text> {

	private TreeMap<Long, String> tmap2;

	@Override
	public void setup(Context context) throws IOException,
									InterruptedException
	{
		tmap2 = new TreeMap<Long, String>();
	}

	@Override
	public void reduce(Text key, Iterable<LongWritable> values,
	Context context) throws IOException, InterruptedException
	{

		// input data from mapper
		// key			 values
		// movie_name		 [ count ]
		String name = key.toString();
		long count = 0;

		for (LongWritable val : values)
		{
			count = val.get();
		}

		// insert data into treeMap,
		// we want top 10 viewed movies
		// so we pass count as key
		tmap2.put(count, name);

		// we remove the first key-value
		// if it's size increases 10
		if (tmap2.size() > 10)
		{
			tmap2.remove(tmap2.firstKey());
		}
	}

	@Override
	public void cleanup(Context context) throws IOException,
									InterruptedException
	{

		for (Map.Entry<Long, String> entry : tmap2.entrySet())
		{

			long count = entry.getKey();
			String name = entry.getValue();
			context.write(new LongWritable(count), new Text(name));
		}
	}
}

Setup, reduce, and cleanup are the three functions of the Reducer class. The setup and cleaning operations will be performed just once, and the reduce method will be called for each key.

  • In the setup method, we have initialized with TreeMap.
  • In reduce method, we will do the sum of views for each movie name. As TreeMap sorts data based on key. If data is more than ten, then we will remove entries after the first ten entries.
  • Iterate through the entries in the cleanup procedure and write to the output location.

Step 3- Implementation of the driver class

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Driver {

	public static void main(String[] args) throws Exception
	{
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,
								args).getRemainingArgs();

		// if less than two paths
		// provided will show error
		if (otherArgs.length < 2)
		{
			System.err.println("Error: please provide two paths");
			System.exit(2);
		}

		Job job = Job.getInstance(conf, "top 10 elementsw");
		job.setJarByClass(Driver.class);

		job.setMapperClass(top_10_Movies_Mapper.class);
		job.setReducerClass(top_10_Movies_Reducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);

		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Here, we have introduced the driver class, which calls a mapper class where we implement mapping. 

Reducer implementation is done in the Reducer class. Set the mapper output key and value classes for both the mapper and the reducer. Set the input and output formats. 

Finally, we check to see if the process was completed correctly.

Output

Let’s see input and output.

Output txt file

As you can see here it shows the top 10 movies based on popularity.

Conclusion

 I hope you find it useful in understanding how the map-reduce task works to discover the top N elements.

You can like and comment on this blog article if you like it. If you have any concerns about the implementation, feel free to comment, and share the blog with your friends and colleague

You may find me on LinkedIn, Twitter, and Instagram, among other social media platforms.

LinkedIn – https://www.linkedin.com/in/abhishek-kumar-singh-8a6326148

Twitter- https://twitter.com/Abhi007si

Instagram- www.instagram.com/dataspoof