This blog has been moved to new address: http://www.trongkhoanguyen.com.
Shuffle is one of the most expensive operations that will affect the performance of the job. Even though Spark tries to avoid shuffle as possible as it can, some operations require shuffle step to achieve the result (eg groupByKey, sortByKey, reduceByKey, distinct, …). These operations require one node fetches data from many the other nodes to has enough data for computing the result. Fetching data via network results in data partition, sorting, serialization, disk and network IO, … which are expensive to the application. The module shuffle in current Spark version has been evolving and bring significant improvements over time.
Just same as hadoop map reduce, Spark shuffle involves the aggregate step (combiner) before writing map outputs (intermediate values) to buckets. Spark also writes to a small buffers (size of buffer is configurable via spark.shuffle.file.buffer.kb) before writing to physical files to increase disk I/O speed.
1. The input aggregation
The aggregate is done thanks to the implementation in ShuffleMapTask: combines input by using 2 special data structure: AppendOnlyMap (In-memory hash table combiner) & ExternalAppendOnlyMap (In-memory and disk hash table combiner).
Available implementations of Spark shuffle are hash and sort. (spark.shuffle.manager). Sort is set by default since version 1.2 of Spark.
In basic hash shuffle, each map task will write outputs into multiple files.
Suppose we have:
#MapTasks = M
#ReduceTasks = R
=> #ShuffleFiles = M * R, and #In-memoryBuffers = M*R respectively.
This implementation leads to the problem that: if we use 100Kb as the size of one buffer, then with 10,000 reducers and 10 mappers/executor, the total size of in-memory buffer would be: 100Kb * 10,000 * 10 = 10 Gb/executor. 10Gb/executor used only for buffer is unacceptable! Thus, this implementation won’t support ~10,000 reducers, we need to change something. The easiest and fastest way is to lower the buffer size (but not too small). The Spark-2503 decreased the buffer size spark.shuffle.file.buffer.kb to 32 Kb by default. https://issues.apache.org/jira/browse/SPARK-2503.
3. The consolidate hash shuffle version
The solution that naively decreases the buffer size as previous apparently doesn’t satisfy us. Can we do better? Yes. The consolidation shuffle files – https://issues.apache.org/jira/browse/SPARK-751: within an executor, each bucket is map to a segment of file (as shown in the figure). As a result, for each executor, #ShuffleFiles = R, and #In-memoryBuffers = R. Thus, #ShuffleFiles = C*R / worker node if each node runs C executors in its C cores.
You can enable this feature by setting spark.shuffle.consolidateFiles=true.
4. The sort based shuffle
With the consolidate hash shuffle, we reduced the number of shuffle files from M*R to R files per Executor. Why can we do as Hadoop MR does: each MapTask spills only one shuffle file containing segments, and one index file. This will save significant memory for compression and serialization buffers and result in more sequential disk I/O.
Sort-based shuffle implementation – https://issues.apache.org/jira/browse/SPARK-2045
Each MapTask generates 1 shuffle data file and 1 index file. Output will be sorted using an ExternalSorter.
– if map-side combine is required, data will be sorted by key and partition for aggregation. Otherwise, only sort by partition
– if #reducers <= 200 (spark.shuffle.sort.bypassMergeThreshold = 200) and no aggregation or ordering: do as hash way (without sorting): spills to partitions then merges.
Make sort-based shuffle write files directly when there is no sorting / aggregation and # of partitions is small- https://issues.apache.org/jira/browse/SPARK-2787
 Shuffle behavior configuration – https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
 Sort based shuffle in Spark – https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf