Hadoop calculate maximum temperature explained
Analyzing the Data with Hadoop Using Map Reduce
To take advantage of the parallel processing that Hadoop provides, we need to express
our query as a MapReduce job because MapReduce framwork will manage the parallel processing by it self.
MapReduce divides the processing into 2 phases -
- the map phase
- the reduce phase
Each phase has input in the form of key value pair, and both phases produces output as key value pair.
The output that has been generated by the map phase is given to a reduce phase as an input.
It is the programmers responsibility to specifies two functions:
- the map function
- the reduce function
Lets take an example where input to the map phase is from the below link which has the NCDC data-
https://raw.githubusercontent.com/lmsamarawickrama/Hadoop-MapReduce/master/NCDC%20weather%20files/1901
Using the above data we need to calculate maximum temperature per year.
While writing a mapreudce code, We choose a text input format (which is also the default format for hadoop) that
gives us each line in the dataset as a text value. The key is the offset of the beginning of
the line from the beginning of the file, but as we have no need for this, we ignore it.
Since we are only concerned about the year and the temperature our map function is simple. We pull out the year and the air temperature. In this case, the map function is just a data
preparation phase, setting up the data in such a way that the reduce function can do its
work on it: finding the maximum temperature for each year. The map function is also
a good place to drop bad records for example removing the bad or invalid records.
Consider map gets below lines as an input -
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
These lines are presented to the map function as the key-value pairs:
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(412, 0043012650999991949032418004...0500001N9+00781+99999999999...)
The keys (0,106,212 etc in our example) are the line offsets within the file, which we ignore in our map function. The
map function will simply extract the year and the temperature (indicated in bold text),
and emits them as its output.
Below is the sample output
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)
Now Mapreduce framework takes the output generated by the map function and processes it before sending it to the reducer. Framework will sort the records based on the keys generated by the mapper and after sorting it will group all the values associated with single key as one record for reducer.
So our reduce function sees the following input:
(1949, [111, 78])
(1950, [0, 22, −11])
Here, We have got only 2 records instead of 4 (output of map function has 4 output records) and it is in sorted order.
Each year appears with a list of all its air temperature readings. All the reduce function
has to do now is iterate through the list and pick up the maximum value by iterating the list.
Below is the output of the reducer function.
(1949, 111)
(1950, 22)
The graphical data flow is illustrated in below figure -
Full Java Mapreduce Code for Max Temperature
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemp {
public static class MaxTempMapper
extends Mapper<LongWritable , Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15,19);
int temperature;
/*
Now data cleaning to get the temperature field from the file based on some conditions like if plus sign (+) at position 87 then temperature will start from position 88 if not temperature will start from position 87 etc.
*/
if (line.charAt(87)=='+')
temperature = Integer.parseInt(line.substring(88, 92));
else
temperature = Integer.parseInt(line.substring(87, 92));
String quality = line.substring(92, 93);
if(temperature != MISSING && quality.matches("[01459]")) // Just to ensures the the data is not curropt but it is a valid data
context.write(new Text(year),new IntWritable(temperature)); // if data is valid then only send it to the reducer otherwise dont.
}
}
public static class MaxTempReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable maxTempResult = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int max_temp = 0;
for (IntWritable val : values) { // loop to calculate max temperature wrt year
int temp = val.get()
if (temp > max_temp)
max_temp = temp
}
maxTempResult.set(max_temp)
context.write(key, maxTempResult);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Max Temp");
job.setJarByClass(MaxTemp.class);
job.setMapperClass(MaxTempMapper.class);
job.setCombinerClass(MaxTempReducer.class);
job.setReducerClass(MaxTempReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Comments
Post a Comment