A secondary sort problem relates to sorting values associated with a key in the reduce phase. Sometimes, it is called value-to-key conversion. The secondarysorting technique will enable us to sort the values (in ascending or descending order) passed to each reducer. Concrete examples will be provided of how to achieve secondary sorting in ascending or descending order.
The goal is to implement the Secondary Sort design pattern in MapReduce/Hadoop and Spark. In software design and programming, a design pattern is a reusable algorithm that is used to solve a commonly occurring problem. Typically, a design pattern can be implemented by many programming languages.
The MapReduce framework automatically sorts the keys generated by mappers. This means that, before starting reducers, all intermediate key-value pairs generated by mappers must be sorted by key (and not by value). Values passed to each reducer are not sorted at all; they can be in any order. What if you also want to sort a reducer’s values? MapReduce/Hadoop and Spark do not sort values for a reducer. So, for those applications (such as time series data) in which one wants to sort the reducer data, the Secondary Sort design pattern enables one to do so.
First let’s focus on the MapReduce/Hadoop solution. Let’s look at the MapReduce paradigm and then unpack the concept of the secondary sort:
map(key1, value1) → list(key2, value2)
reduce(key2, list(value2)) → list(key3, value3)
First, the map() function receives a key-value pair input, (key1, value1). Then it outputs any number of key-value pairs, (key2, value2). Next, the reduce()function receives as input another key-value pair, (key2, list(value2)), and outputs any number of (key3, value3) pairs.
Now consider the following key-value pair, (key2, list(value2)), as an input for a reducer:
list(value2) = (V1, V2, …, Vn)
where there is no ordering between reducer values (V1, V2, …, Vn).
The goal of the Secondary Sort pattern is to give some ordering to the values received by a reducer. So, once we apply the pattern to our MapReduce paradigm, thenwe will have:
SORT(V1, V2, …, Vn) = (S1, S2, …, Sn)
list(value2) = (S1, S2, …, Sn)
- S1 < S2 < … < Sn (ascending order), or
- S1 > S2 > … > Sn (descending order)
Here is an example of a secondary sorting problem: consider the temperature data from a scientific experiment. A dump of the temperature data might looksomething like the following (columns are year, month, day, and daily temperature, respectively):
Suppose we want to output the temperature for every year-month with the values sorted in ascending order. Essentially, we want the reducer values iterator tobe sorted. Therefore, we want to generate something like this output (the first column is year-month and the second column is the sorted temperatures):
Solutions to the secondary sort problem
There are at least two possible approaches for sorting the reducer values. These solutions may be applied to both the MapReduce/Hadoop and Spark frameworks:
- The first approach involves having the reducer read and buffer all of the values for a given key (in an array data structure, for example), then doing an in-reducer sort on the values. This approach will not scale: since the reducer will be receiving all values for a given key, this approach might cause the reducer to run out of memory (java.lang.OutOfMemoryError). On the other hand, this approach can work well if the number of values is small enough that it will not cause an out-of-memory error.
- The second approach involves using the MapReduce framework for sorting the reducer values (this does not require in-reducer sorting of values passed to the reducer). This approach consists of “creating a composite key by adding a part of, or the entire value to, the natural key to achieve your sorting objectives.” For the details on this approach, see Java Code Geeks. This option is scalable and will not generate out-of-memory errors. Here, we basically offload the sorting to the MapReduce framework (sorting is a paramount feature of the MapReduce/ Hadoop framework).
To implement the secondary sort feature, we need additional plug-in Java classes. We have to tell the MapReduce/Hadoop framework:
- How to sort reducer keys
- How to partition keys passed to reducers (custom partitioner)
- How to group data that has arrived at each reducer
Sort order of intermediate keys
To accomplish secondary sorting, we need to take control of the sort order of intermediate keys and the control order in which reducers process keys. First, we inject a value (temperature data) into the composite key, and then we take control of the sort order of intermediate keys. The relationships between the natural key, composite key, and key-value pairs are depicted in Figure 1.
Figure 1. Secondary sorting keys
The main question is what value we should add to the natural key to accomplish the secondary sort. The answer is the temperature data field (because we want thereducers’ values to be sorted by temperature). So, we have to indicate how DateTempera turePair objects should be sorted using the compareTo() method. Weneed to define a proper data structure for holding our key and value, while also providing the sort order of intermediate keys. In Hadoop, for custom data types(such as DateTempera turePair) to be persisted, they have to implement the Writable interface; and if we are going to compare custom data types, then theyhave to implement an additional interface called WritableComparable (see Example 1).
Example 1. DateTemperaturePair class
In a nutshell, the partitioner decides which mapper’s output goes to which reducer based on the mapper’s output key. For this, we need two plug-in classes: acustom partitioner to control which reducer processes which keys, and a custom Comparator to sort reducer values. The custom partitioner ensures that all data with the same key (the natural key, not including the composite key with the temperature value) is sent to the same reducer. The custom Comparator doessorting so that the natural key (year-month) groups the data once it arrives at the reducer.
Example 2. DateTemperaturePartitioner class
Hadoop provides a plug-in architecture for injecting the custom partitioner code into the framework. This is how we do so inside the driver class (which submits the MapReduce job to Hadoop):
import org.apache.hadoop.mapreduce.Job; …
Job job = …; …
In Example 3, we define the comparator (DateTemperatureGroupingComparator class) that controls which keys are grouped together for a single callto the Reducer.reduce() function.
Example 3. DateTemperatureGroupingComparator class
Hadoop provides a plug-in architecture for injecting the grouping comparator code into the framework. This is how we do so inside the driver class (which submits the MapReduce job to Hadoop):
Data flow using plug-in classes
In order for the reader to understand the map() and reduce() functions and custom plug-in classes, Figure 2 illustrates the data flow for a portion of input.
Figure 2. Secondary sorting data flow
The mappers create (K,V) pairs, where K is a composite key of (year,month,tempera ture) and V is temperature. The (year,month) part of the composite key isthe natural key. The partitioner plug-in class enables us to send all natural keys to the same reducer and the grouping comparator plug-in class enablestemperatures to arrive sorted at reducers. The Secondary Sort design pattern uses MapReduce’s framework for sorting the reducers’ values rather than collectingthem all and then sorting them in memory. The Secondary Sort design pattern enables us to “scale out” no matter how many reducer values we want to sort.