Implementing RawComparator will speed up your Hadoop Map/Reduce (MR) Jobs

Introduction

Implementing the org.apache.hadoop.io.RawComparator interface will definitely help speed up your Map/Reduce (MR) Jobs. As you may recall, a MR Job is composed of receiving and sending key-value pairs. The process looks like the following.

  • (K1,V1) –> Map –> (K2,V2)
  • (K2,List[V2]) –> Reduce –> (K3,V3)

The key-value pairs (K2,V2) are called the intermediary key-value pairs. They are passed from the mapper to the reducer. Before these intermediary key-value pairs reach the reducer, a shuffle and sort step is performed. The shuffle is the assignment of the intermediary keys (K2) to reducers and the sort is the sorting of these keys. In this blog, by implementing the RawComparator to compare the intermediary keys, this extra effort will greatly improve sorting. Sorting is improved because the RawComparator will compare the keys by byte. If we did not use RawComparator, the intermediary keys would have to be completely deserialized to perform a comparison.

Background

Two ways you may compare your keys is by implementing the org.apache.hadoop.io.WritableComparable interface or by implementing the RawComparator interface. In the former approach, you will compare (deserialized) objects, but in the latter approach, you will compare the keys using their corresponding raw bytes.

I conducted an empirical test to demonstrate the advantage of RawComparator over WritableComparable. Let’s say we are processing a file that has a list of pairs of indexes {i,j}. These pairs of indexes could refer to the i-th and j-th matrix element. The input data (file) will look something like the following.

1, 2
3, 4
5, 6
...
...
...
0, 0

What we want to do is simply count the occurrences of the {i,j} pair of indexes. Our MR Job will look like the following.

  • (LongWritable,Text) –> Map –> ({i,j},IntWritable)
  • ({i,j},List[IntWritable]) –> Reduce –> ({i,j},IntWritable)

Method

The first thing we have to do is model our intermediary key K2={i,j}. Below is a snippet of the IndexPair. As you can see, it implements WritableComparable. Also, we are sorting the keys ascendingly by the i-th and then j-th indexes.

public class IndexPair implements WritableComparable<IndexPair> {
	private IntWritable i;
	private IntWritable j;
	//....
	/**
	 * Constructor.
	 * @param i i.
	 * @param j j.
	 */
	public IndexPair(int i, int j) {
		this.i = new IntWritable(i);
		this.j = new IntWritable(j);
	}
	//....
	@Override
	public int compareTo(IndexPair o) {
		int cmp = i.compareTo(o.i);
		if(0 != cmp)
			return cmp;
		return j.compareTo(o.j);
	}
	//....
}

Below is a snippet of the RawComparator. As you notice, it does not directly implement RawComparator. Rather, it extends WritableComparator (which implements RawComparator). We could have directly implemented RawComparator, but by extending WritableComparator, depending on the complexity of our intermediary key, we may use some of the utility methods of WritableComparator.

public class IndexPairComparator extends WritableComparator {
	protected IndexPairComparator() {
		super(IndexPair.class);
	}
	
	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		int i1 = readInt(b1, s1);
		int i2 = readInt(b2, s2);
		
		int comp = (i1 < i2) ? -1 : (i1 == i2) ? 0 : 1;
		if(0 != comp)
			return comp;
		
		int j1 = readInt(b1, s1+4);
		int j2 = readInt(b2, s2+4);
		comp = (j1 < j2) ? -1 : (j1 == j2) ? 0 : 1;
		
		return comp;
	}
}

As you can see the above code, for the two objects we are comparing, there are two corresponding byte arrays (b1 and b2), the starting positions of the objects in the byte arrays, and the length of the bytes they occupy. Please note that the byte arrays themselves represent other things and not only the objects we are comparing. That is why the starting position and length are also passed in as arguments. Since we want to sort ascendingly by i then j, we first compare the bytes representing the i-th indexes and if they are equal, then we compare the j-th indexes. You can also see that we use the util method, readInt(byte[], start), inherited from WritableComparator. This method simply converts the 4 consecutive bytes beginning at start into a primitive int (the primitive int in Java is 4 bytes). If the i-th indexes are equal, then we shift the starting point by 4, read in the j-th indexes and then compare them.

A snippet of the mapper is shown below.

	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String[] tokens = value.toString().split(",");
		int i = Integer.parseInt(tokens[0].trim());
		int j = Integer.parseInt(tokens[1].trim());
		
		IndexPair indexPair = new IndexPair(i, j);
		context.write(indexPair, ONE);
	}

A snippet of the reducer is shown below.

	public void reduce(IndexPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int sum = 0;
		for(IntWritable value : values) {
			sum += value.get();
		}
		
		context.write(key, new IntWritable(sum));
	}

The snippet of code below shows how I wired up the MR Job that does NOT use raw byte comparison.

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Job job = new Job(conf, "raw comparator example");
		
		job.setJarByClass(RcJob1.class);
		
		job.setMapOutputKeyClass(IndexPair.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(IndexPair.class);
		job.setOutputValueClass(IntWritable.class);
		
		job.setMapperClass(RcMapper.class);
		job.setReducerClass(RcReducer.class);
		
		job.waitForCompletion(true);
		
		return 0;
	}

The snippet of code below shows how I wired up the MR Job using the raw byte comparator.

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Job job = new Job(conf, "raw comparator example");
		
		job.setJarByClass(RcJob1.class);
		job.setSortComparatorClass(IndexPairComparator.class);

		job.setMapOutputKeyClass(IndexPair.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(IndexPair.class);
		job.setOutputValueClass(IntWritable.class);
		
		job.setMapperClass(RcMapper.class);
		job.setReducerClass(RcReducer.class);
		
		job.waitForCompletion(true);
		
		return 0;
	}

As you can see, the only difference is that in the MR Job using the raw comparator, we explicitly set its sort comparator class.

Results

I ran the MR Jobs (without and with raw byte comparisons) 10 times on a dataset of 4 million rows of {i,j} pairs. The runs were against Hadoop v0.20 in standalone mode on Cygwin. The average running time for the MR Job without raw byte comparison is 60.6 seconds, and the average running time for the job with raw byte comparison is 31.1 seconds. A two-tail paired t-test showed p < 0.001, meaning, there is a statistically significant difference between the two implementations in terms of empirical running time.

I then ran each implementation on datasets of increasing record sizes from 1, 2, …, and 10 million records. At 10 million records, without using raw byte comparison took 127 seconds (over 2 minutes) to complete, while using raw byte comparison took 75 seconds (1 minute and 15 seconds) to complete. Below is a line graph.

Summary and conclusion

In this blog, I tested how implementing raw byte comparison of intermediary keys will improve the speed of your MR Jobs as opposed to relying on a comparison based on deserialized objects. As of lately, I have been blogging about the intermediary key-value pairs. In one blog post, I talked about how these intermediary key-value pairs are a bottleneck in your MR Job if there are many of them emitted from the mapper and how to mitigate this issue using certain design patterns. In this blog, I talked about another form of optimization dealing with intermediary key-value pairs, and in particular, with the intermediary keys using raw byte comparison so as to improve sorting.

As usual, here is the link to the source code in this blog.

Sib ntsib dua nawb mog. Cheers and happy programming!

Leave a comment