The “in-mapper combining” design pattern for Map/Reduce programming in Java

Introduction

I am reading a book by (Lin and Dyer 2010). This book is very informative about designing efficient algorithms under the Map/Reduce (M/R) programming paradigm. Of particular interest is the “in-mapper combining” design pattern that I came across while reading this book. As if engineers and data miners did not have to change their way of thinking enough while adapting to the M/R programming paradigm, our change in thinking and development must also be sensitive to the particular M/R framework as well. The in-mapper combining design pattern is meant to address some issues with M/R programming, and in particular, M/R programming under the Hadoop platform. In this blog I will discuss this in-mapper combining design patterns and show some examples. This design pattern seems to me an excellent technical screening problem—if you are so (un)fortunate. šŸ™‚ Hereafter, I will refer to the in-mapper combining design pattern with the acronym IMCDP.

Need for the design pattern

IMCDP may result in a more efficient algorithm implementation. I would not agree that it may result in a more efficient algorithm, per say, since it is not necessarily changing the running time complexity. In Lin’s book, the driving factor for IMCDP is to “substantially reduce both the number and size of key-value pairs that need to be shuffled from the mappers to the reducers.”

In the canonical example of word counting, a key-value pair is emitted for every word found. For example, if we had 1,000 words, then 1,000 key-value pairs will be emitted from the mappers to the reducer(s). In between this handing off of data from the mappers to the reducer(s), a shuffle and sort step on the key-value pairs occurs. If the number of “intermediary” key-value pairs (these are they key-value pairs being sent from the mappers to the reducer(s)) are extremely high, then this amount of data could be a pain point in the speed of completing the overall M/R job. With IMCDP, the idea is to reduce the number of intermediary key-value pairs being sent from the mappers to the reducer(s).

Of course, you may be wondering, why not just use a combiner (the mini-reducer)? The reason why we should not just use a combiner is because even if we explicitly set one for a M/R job, Hadoop may or may not run the combiner. With IMCDP, the engineer can explicitly and deterministically control how to reduce the number of intermediary key-value pairs.

Canonical word count mapper

The canonical word count mapper program is shown below. As stated before, for every word found, a corresponding intermediary key-value pair is emitted from the mapper to the reducer. Also, again, if we had 1,000,000 words, then 1,000,000 key-value pairs will be emitted from the mapper to the reducer. We do should try to optimize the implementation of this word count mapper by reducing the amount of data that needs to be passed from the mapper to the reducer.

public static class TokenizerMapper 
 extends Mapper<Object, Text, Text, IntWritable> {
 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();
 
 public void map(Object key, Text value, Context context) 
  throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
   word.set(itr.nextToken());
   context.write(word, one);
  }
 }
}

A local IMCDP

In Lin’s book, he suggests that we use an associative array (i.e. in Java, this is a Map) to store the words and their associated frequency. After we have counted all the words in the incoming text, then we emit each word and its associated frequency. I have modified the canonical word count mapper as below.

public static class TokenizerMapper 
 extends Mapper<Object, Text, Text, IntWritable> {

 public void map(Object key, Text value, Context context) 
  throws IOException, InterruptedException {
  Map<String, Integer> map = new HashMap<String, Integer>();
  StringTokenizer itr = new StringTokenizer(value.toString());
  
  while (itr.hasMoreTokens()) {
   String token = itr.nextToken();
   if(map.containsKey(token)) {
    int total = map.get(token).get() + 1;
    map.put(token, total);
   } else {
    map.put(token, 1);
   }
  }
  
  Iterator<Map.Entry<String, Integer>> it = map.entrySet().iterator();
  while(it.hasNext()) {
   Map.Entry<String, Integer> entry = it.next();
   String sKey = entry.getKey();
   int total = entry.getValue().intValue();
   context.write(new Text(sKey), new IntWritable(total));
  }
 }
}

I refer to this IMCDP approach as a local one because the associative array is local with respect to the method. As you can see, instead of emitting one key-value pair per token, now, we emit one key-value pair per word.

A global IMCDP

Lin suggest we may even do better than the local IMCDP with a global IMCDP approach. Instead of using an associative array per key-value input, we use an associative array per mapper. The associative array is outside of the method, and so I refer to this approach as global. However, no matter the strategy or name, both approaches, local and global IMCDP, are still considered local aggregating techniques with respect to the mapper. The code below shows the global IMCDP approach.

public static class TokenizerMapper 
 extends Mapper<Object, Text, Text, IntWritable> {

 private Map<String, Integer> map;

 public void map(Object key, Text value, Context context) 
  throws IOException, InterruptedException {
  Map<String, Integer> map = getMap();
  StringTokenizer itr = new StringTokenizer(value.toString());
  
  while (itr.hasMoreTokens()) {
   String token = itr.nextToken();
   if(map.containsKey(token)) {
    int total = map.get(token).get() + 1;
    map.put(token, total);
   } else {
    map.put(token, 1);
   }
  }  
 }
 
 protected void cleanup(Context context)
  throws IOException, InterruptedException {
  Map<String, Integer> map = getMap();
  Iterator<Map.Entry<String, Integer>> it = map.entrySet().iterator();
  while(it.hasNext()) {
   Map.Entry<String, Integer> entry = it.next();
   String sKey = entry.getKey();
   int total = entry.getValue().intValue();
   context.write(new Text(sKey), new IntWritable(total));
  }
 }

 public Map<String, Integer> getMap() {
  if(null == map) //lazy loading
   map = new HashMap<String, Integer>();
  return map;
 }
}

As you can see, we override Mapper’s cleanup method, which is called only once before the Mapper is destroyed, to emit each word and its associated frequency. With the global IMCDP, we are emitting potentially even less key-value pairs to the reducer.

The global IMCDP approach may run into a memory limitation issue. If the associative array becomes very large and to the point where memory runs out, your mapper task will certainly crash. Lin suggests “flushing” the associative array every so often. Below, I show a way to flush the associative array.

public static class TokenizerMapper 
 extends Mapper<Object, Text, Text, IntWritable> {

 private static final int FLUSH_SIZE = 1000;
 private Map<String, Integer> map;

 public void map(Object key, Text value, Context context) 
  throws IOException, InterruptedException {
  Map<String, Integer> map = getMap();
  StringTokenizer itr = new StringTokenizer(value.toString());
  
  while (itr.hasMoreTokens()) {
   String token = itr.nextToken();
   if(map.containsKey(token)) {
    int total = map.get(token).get() + 1;
    map.put(token, total);
   } else {
    map.put(token, 1);
   }
  }  

  flush(context, false);
 }

 private void flush(Context context, boolean force) 
  throws IOException, InterruptedException {
  Map<String, Integer> map = getMap();
  if(!force) {
   int size = map.size();
   if(size < FLUSH_SIZE)
    return;
  }

  Iterator<Map.Entry<String, Integer>> it = map.entrySet().iterator();
  while(it.hasNext()) {
   Map.Entry<String, Integer> entry = it.next();
   String sKey = entry.getKey();
   int total = entry.getValue().intValue();
   context.write(new Text(sKey), new IntWritable(total));
  }

  map.clear(); //make sure to empty map
 }
 
 protected void cleanup(Context context)
  throws IOException, InterruptedException {
  flush(context, true); //force flush no matter what at the end
  }
 }

 public Map<String, Integer> getMap() {
  if(null == map) //lazy loading
   map = new HashMap<String, Integer>();
  return map;
 }
}

Summary and conclusion

IMCDP is a way to possibly improve the speed of a M/R job by reducing the number of intermediary key-value pairs emitted from mappers to reducers. Unlike a combiner, which may or may not run at all, IMCDP can be controlled and will always run when implemented. There are several ways to implement IMCDP, what I refer to as local and global IMCDP, but they are still local aggregation design patterns with respect to the mapper.

As always, I hoped you enjoyed reading and this blog post helped you. Cheers! Sib ntsib dua nawb mog! שלום!

Reference

Advertisements

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s