Hadoop calculate maximum temperature explained

Analyzing the Data with Hadoop Using Map Reduce 



To take advantage of the parallel processing that Hadoop provides, we need to express
our query as a MapReduce job because MapReduce framwork will manage the parallel processing by it self.

MapReduce divides the processing into 2 phases -

  1. the map phase 
  2. the reduce phase 

Each phase has input in the form of key value pair, and both phases produces output as key value pair.
The output that has been generated by the map phase is given to a reduce phase as an input.
It is the programmers responsibility to specifies two functions:

  1. the map function 
  2. the reduce function
Lets take an example where input to the map phase is from the below link which has the NCDC data-

https://raw.githubusercontent.com/lmsamarawickrama/Hadoop-MapReduce/master/NCDC%20weather%20files/1901

Using the above data we need to calculate maximum temperature per year.
While writing a mapreudce code, We choose a text input format (which is also the default format for hadoop) that
gives us each line in the dataset as a text value. The key is the offset of the beginning of
the line from the beginning of the file, but as we have no need for this, we ignore it.

Since we are only concerned about the year and the temperature our map function is simple. We pull out the year and the air temperature. In this case, the map function is just a data
preparation phase, setting up the data in such a way that the reduce function can do its
work on it: finding the maximum temperature for each year. The map function is also
a good place to drop bad records for example removing the bad or invalid records.

Consider map gets below lines as an input - 

0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...

These lines are presented to the map function as the key-value pairs:

(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(412, 0043012650999991949032418004...0500001N9+00781+99999999999...)

The keys (0,106,212 etc in our example) are the line offsets within the file, which we ignore in our map function. The
map function will simply extract the year and the temperature (indicated in bold text),
and emits them as its output.

Below is the sample output 

(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)

Now Mapreduce framework takes the output generated by the map function and processes it before sending it to the reducer. Framework will sort the records based on the keys generated by the mapper and after sorting it will group all the values associated with single key as one record for reducer.
So our reduce function sees the following input:

(1949, [111, 78])
(1950, [0, 22, −11])

Here, We have got only 2 records instead of 4 (output of map function has 4 output records) and it is in sorted order.

Each year appears with a list of all its air temperature readings. All the reduce function
has to do now is iterate through the list and pick up the maximum value by iterating the list.
Below is the output of the reducer function.

(1949, 111)
(1950, 22)

The graphical data flow is illustrated in below figure -



Full Java Mapreduce Code for Max Temperature



import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemp {

    public static class MaxTempMapper
            extends Mapper<LongWritable , Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context
        ) throws IOException, InterruptedException {

String line = value.toString();
            String year = line.substring(15,19);               
            int temperature;
/*
Now data cleaning to get the temperature field from the file based on some conditions like if plus sign (+) at position 87 then temperature will start from  position 88 if not temperature will start from position 87 etc. 
*/
            if (line.charAt(87)=='+')
                        temperature = Integer.parseInt(line.substring(88, 92));
            else
                        temperature = Integer.parseInt(line.substring(87, 92));       
            String quality = line.substring(92, 93);
            if(temperature != MISSING && quality.matches("[01459]")) // Just to  ensures the the data is not curropt but it is a valid data
            context.write(new Text(year),new IntWritable(temperature));     // if data is valid then only send it to the reducer otherwise dont.

        }
    }

    public static class MaxTempReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable maxTempResult = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
int max_temp = 0; 
            for (IntWritable val : values) {     // loop to calculate max temperature wrt year
int temp = val.get()
if (temp > max_temp)
max_temp = temp
            }
maxTempResult.set(max_temp)
            context.write(key, maxTempResult);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Max Temp");
        job.setJarByClass(MaxTemp.class);
        job.setMapperClass(MaxTempMapper.class);
        job.setCombinerClass(MaxTempReducer.class);
        job.setReducerClass(MaxTempReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Comments

Popular posts from this blog

Sqoop In Depth

Introduction to Sqoop Part 1