Last time I analyzed Mahout’s collaborative filtering algorithm. In this blog, I will be writing about computing the canonical Pearson correlation between two variables for a set of data using Hadoop’s M/R paradigm. If you have already written your own M/R tasks for Jobs, this tutorial is not for you. If you are just starting out, this article might help you to start learning.

# The Data Set and Computing Pearson Correlation using Plain Old Java Out-of-the-Box

Let’s talk about the way a data set might come to us. Ideally, we want to handle a data set that is a matrix where the columns correspond to variables and rows correspond to instances. The data look like the following where each row’s values are delimited by a comma. Call this a comma-separated value (CSV) file.

1, 1, 3, -1 2, 2, 1, -2 3, 3, 8, -3 ...

What we want to do is compute the canonical Pearson correlation between all unique pairs of variables. This link provides a computational formula for computing the Pearson correlation between two continuous variables.

In a traditional Java approach without M/R, one might compute the pairwise correlation like the following (assuming you have read in the CSV file and put the data into matrix).

public void computeAllPairwiseCorrelations(double[][] data) { int numCols = data[0].length; for(int i=0; i < numCols; i++) { for(int j=i+1; j < numCols; j++) { computeCorrelation(i, j, data); //compute the correlation between the i-th and j-th columns/variables } } }

The method to compute the pairwise correlation might look like the following.

public double computeCorrelation(int i, int j, double[][] data) { double x, y, xx, yy, xy, n; n = data.length; for(int row=0; row < data.length; row++) { x += data[row][i]; y += data[row][j]; xx += Math.pow(data[row][i], 2.0d); yy += Math.pow(data[row][j], 2.0d); xy += data[row][i] * data[row][j]; } double numerator = xy - ((x * y) / n); double denominator1 = xx - (Math.pow(x, 2.0d) / n); double denominator2 = yy - (Maht.pow(y, 2.0d) / n); double denominator = Math.sqrt(xx * yy); double corr = numerator / denominator; return corr; }

# Using M/R

So how would you do something similar using the M/R framework? First, the Map task only receives a key-value pair input, <K1,V1>. It outputs another key-value pair, <K2,V2>. Second, the Reduce task receives as input another key-value pair, <K2, List<V2>>, and outputs <K3,V3>. Assuming you just want to parse the CSV file as described above, how would you use M/R tasks to accomplish computing Pearson correlation for all pairs of variables?

My approach was rather simple. My Mapper.map(…) method looks like the following.

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tokens = line.split(DELIM); double[] arr = toDouble(tokens); for(int i=0; i < arr.length; i++) { for(int j=i+1; j < arr.length; j++) { VariableIndexPairsWritable k2 = new VariableIndexPairsWritable(i, j); VariableValuePairsWritable v2 = new VariableValuePairsWritable(arr[i], arr[j]); context.write(k2, v2); } } } public double[] toDouble(String[] tokens) { double[] arr = new double[tokens.length]; for(int i=0; i < tokens.length; i++) { arr[i] = Double.parseDouble(tokens[i]); } return arr; }

First, I take the line of Text passed in and I split it up into its component parts. I then convert the array of String tokens into an array of double. I then iterate using 2 for loops over each value and emit a pair of value out. So, for the example above for the first line, I emit the following.

- K2=(0,1) and V2=(1,1)
- K2=(0,2) and V2=(1,3)
- K2=(0,3) and V2=(1,-1)
- K2=(1,2) and V2=(1,3)
- K2=(1,3) and V2=(1,-1)
- K2=(2,3) and V2=(3,-1)

I emit as the key the indices of the columns, and as the value, the corresponding values to the indices.

For the first 3 lines, I emit the following pairs.

- K2=(0,1) and V2=(1,1)
- K2=(0,2) and V2=(1,3)
- K2=(0,3) and V2=(1,-1)
- K2=(1,2) and V2=(1,3)
- K2=(1,3) and V2=(1,-1)
- K2=(2,3) and V2=(3,-1)
- K2=(0,1) and V2=(2,2)
- K2=(0,2) and V2=(2,1)
- K2=(0,3) and V2=(2,-2)
- K2=(1,2) and V2=(2,1)
- K2=(1,3) and V2=(1,-2)
- K2=(2,3) and V2=(1,-2)
- K2=(0,1) and V2=(3,3)
- K2=(0,2) and V2=(3,8)
- K2=(0,3) and V2=(3,-3)
- K2=(1,2) and V2=(3,8)
- K2=(1,3) and V2=(3,-3)
- K2=(2,3) and V2=(8,-3)

The Keys are sorted before they are passed into a Reducer. Also, the Reducer does not receive the pairs of <K2,V2> but rather, <K2, List<V2>>, which is a key and a list of values associated with that key. Also, although I used List, the actual Java object is a Iterable, so, it looks like <K2, Iterable<V2>>. So, in the example above, we have 6 unique keys, each corresponding to the unique pairs of columns/variables for which we are trying to compute Pearson correlation.

- K2=(0,1)
- K2=(0,2)
- K2=(0,3)
- K2=(1,2)
- K2=(1,3)
- K2=(2,3)

Now, my Reducer looks like the following.

public void reduce(VariableIndexPairsWritable key, Iterable<VariableValuePairsWritable> values, Context context) throws IOException, InterruptedException { double x = 0.0d; double y = 0.0d; double xx = 0.0d; double yy = 0.0d; double xy = 0.0d; double n = 0.0d; for(VariableValuePairsWritable pairs : values) { x += pairs.getI(); y += pairs.getJ(); xx += Math.pow(pairs.getI(), 2.0d); yy += Math.pow(pairs.getJ(), 2.0d); xy += (pairs.getI() * pairs.getJ()); n += 1.0d; } PearsonComputationWritable pearson = new PearsonComputationWritable(x, y, xx, yy, xy, n); context.write(key, pearson); }

It may get a little confusing at this point, but at the Reducer, we only get the pairs of values associated with the i-th and j-th columns/variables. Since we get all these pairs, we simply iterate over then and compute Pearson correlation as before. Our Reducer emits <K3,V3> where K3 is the pair of i-th and j-th column indices and V3 is the Pearson correlation.

So, what did we gain? I do not think we gained too much computationally. I am a little bit cautious of the Mapper since we do O(n^2) operations per line. What if we scale up (i.e. add more data/task nodes)? I need to experiment with scaling some more, but, theoretically, we might expect to see linear speed improvement with multiple Mappers and Reducers processing the data (says the book anyways). You can see that the Mapper’s running time complexity is O(n^2) and the Reducer’s is O(n). If we had multiple Reducers running in parallel (let’s say we have 20 variables and have 20 Reducer tasks running this Job), we should definitely see speed improvement based on scale. For the strong minded and spirited tinkerer, all these approaches of using traditional Java, M/R, or Java’s newer threading API (see Fork/Join and ExecutorService) should be compared and contrasted not only in terms of speed (complexity of running time) but also complexity in code design (how natural and easy is it to code a solution).

# Code You May Use

Of course, there’s actual code you may use to study this blog post. Download the source code here. This code is licensed under the Apache 2.0 license. You will find classes to generate dummy data into CSV format and to convert the CSV files into Sequence Files. A Sequence File is a binary file format that stores data in a row; each row is a binary pair, <K,V>.

To generate dummy data of 2 files where each file has 10 data points.

java -cp demo-0.1.jar corr.util.DummyDataGen 2 10

To convert the dummy data above to sequence files, type in the following. Note we specify the “.” (dot) value to look in the current directory. All *.csv files will be converted to *.seq file.

java -cp demo-0.1.jar;hadoop-0.20.2-core.jar;commons-logging-1.0.4.jar corr.util.DummyDataToSeqFile .

Assuming you have Cygwin + Hadoop installed and configured correctly, type in the following command to run a correlation analysis against the data you generated.

hadoop jar demo-0.1.jar corr.job.SfPearsonJob /path/to/toydata10000.seq

Alternatively, for the CSV file, type in the following.

hadoop jar demo-0.1.jar corr.job.PearsonJob /path/to/toydata10000.csv

# Summary and Conclusion

So, there is a lot of different ways to do one thing now. Even for something like computing Pearson correlation between pairs of continuous variables, there are so many ways to go about it. We can use a traditional Java approach (no threading, no frameworks), which may work well for comparatively small data sets (let’s say a couple of thousands of rows and tens of columns). But when there are billions of records and hundreds (or more) of columns, we might need something like M/R + Hadoop. If we can transform the problem to a M/R form, and if we can throw “commodity” hardware at the problem (well, that’s like throwing money at a problem since even commodity hardware costs a lot of money), we should expect to see improvement in speed due to the scaling out factor.

Hope you enjoyed this post, the code, and it helped you “think” a little bit differently with M/R + Hadoop. See you soon! Sayonara! Sib ntsib dua lwm zaus nawb mog!