Hadoop Map-Reduce Word Count Java Example

This hadoop tutorial aims to give developers a great start in the world of hadoop mapreduce programming by giving them a hands-on experience in developing their first hadoop based WordCount application. Hadoop MapReduce WordCount example is a standard example where hadoop developers begin their hands-on programming with. This tutorial will help hadoop developers learn how to implement WordCount example code in MapReduce to count the number of occurrences of a given word in the input file.

(so called) Pre-requisites to follow this Hadoop WordCount Example Tutorial


  1. Hadoop must be installed or you should have a sandbox running on your Virtualbox (or VMWare).
  2. In case you have installed Hadoop on your machine Single node hadoop cluster must be configured and running.
  3. Optional - IDE must be installed (IntelliJ or Eclipse or any IDE)


Hadoop Map Reduce Example - Word Count – How it works?

Hadoop WordCount operation occurs in 3 stages –


  1. Mapper Phase
  2. Shuffle Phase
  3. Reducer Phase


Mapper Phase Execution


The text from the input text file is tokenized into words to form a key value pair with all the words present in the input text file. The key is the word from the input file and value is ‘1’.

For instance if you consider the sentence -
Sachin is the god of cricket and cricket is incomplete without Sachin. 
The mapper phase in the WordCount example will split the string into individual tokens i.e. words. In this case, the entire sentence will be split into 5 tokens (one for each word) with a value 1 as shown below –

Key-Value pairs from Hadoop Map Phase Execution-
(Sachin,1)
(is,1)
(the ,1)
(god ,1)
(of ,1)
(cricket ,1)
(and ,1)
(cricket ,1)
(is ,1)
(incomplete ,1)
(without ,1)
(Sachin,1)

Shuffle Phase Execution

After the map phase execution is completed successfully, shuffle phase is executed automatically wherein the key-value pairs generated in the map phase are taken as input and then sorted in alphabetical order. After the shuffle phase is executed from the WordCount example code, the output will look like this -

(and,1)
(cricket,1)
(cricket,1)
(god,1)
(incomplete,1)
(is,1)
(is,1)
(of,1)
(Sachin,1)
(Sachin,1)
(the,1)
(without,1)

Reducer Phase Execution


In the reduce phase, all the keys are grouped together and the values for similar keys are added up to find the occurrences for a particular word. It is like an aggregation phase for the keys generated by the map phase. The reducer phase takes the output of shuffle phase as input and then reduces the key-value pairs to unique keys with values added up. In our example “Sachin is the god of cricket and cricket is incomplete without Sachin”  there are many words that appears twice in the sentence. Result after the  execution of the reduce phase of MapReduce WordCount example program, It produces below result -

(and,1)
(cricket,2)
(god,1)
(incomplete,1)
(is,2)
(of,1)
(Sachin,2)
(the,1)
(without,1)


This is how the MapReduce word count program executes and outputs the number of occurrences of a word in any given input file. An important point to note during the execution of the WordCount example is that the mapper class in the WordCount program will execute completely on the entire input file and not just a single sentence. Suppose if the input file has 15 lines then the mapper class will split the words of all the 15 lines and form initial key value pairs for the entire dataset. The reducer execution will begin only after the mapper phase is executed successfully.


Now lets write a Java Mpareduce WordCount code

Step 1 

Open IntelliJ and create a new project with 3 class files – WordCount.java , WordCountMapper.java and WordCountReducer.java

Step 2

Open WordCount.java and paste the following code.

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount extends Configured implements Tool{
public int run(String[] args) throws Exception
{
//creating a JobConf object and assigning a job name for identification purposes

JobConf conf = new JobConf(getConf(), WordCount.class);

conf.setJobName("WordCount");

//Setting configuration object with the Data Type of output Key and Value


conf.setOutputKeyClass(Text.class);


conf.setOutputValueClass(IntWritable.class);

//Providing the mapper and reducer class names

conf.setMapperClass(WordCountMapper.class);


conf.setReducerClass(WordCountReducer.class);


//We wil give 2 arguments at the run time, one in input path and other is output path


Path inp = new Path(args[0]);
Path out = new Path(args[1]);
//the hdfs input and output directory to be fetched from the command line
FileInputFormat.addInputPath(conf, inp);

FileOutputFormat.setOutputPath(conf, out);

JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception
{
// this main function will call run method defined above.
int res = ToolRunner.run(new Configuration(), new WordCount(),args);
System.exit(res);
}
}

Step 3: 

Open WordCountMapper.java and paste the following code.

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
{
//hadoop supported data types
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

//map method that performs the tokenizer job and framing the initial key value pairs
// after all lines are converted into key-value pairs, reducer is called.

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
//taking one line at a time from input file and tokenizing the same
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

//iterating through all the words available in that line and forming the key value pair
while (tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());

//sending to output collector which inturn passes the same to reducer
output.collect(word, one);
}
}
}


Step 4:

Open WordCountReducer.java and paste the following code.

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
{
//reduce method accepts the Key Value pairs from mappers, do the aggregation based on keys and produce the final out put

public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException


{
int sum = 0;

/*iterates through all the values available with a key and add them together and give the
final result as the key and sum of its values*/
while (values.hasNext())
{
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

Step 5:

Now Add External JARs (like hadoop-core, hadoop-common etc)


Step 6:

Now Export Jar file of the java code and copy it on to the sandboz cluster (using scp).

Step 7:

hadoop jar {name_of_jar_file_generated} hdfs_input_file_location hdfs_output_file_location



Step 8:

You can check if output file has been generated using below command-
hdfs dfs -ls hdfs_output_file_location
To view the results-
hdfs dfs -cat hdfs_output_file_location/* 



Comments

Popular posts from this blog

Hadoop calculate maximum temperature explained

Sqoop In Depth