Understand the shuffle component in spark-core

This blog has been moved to new address: http://www.trongkhoanguyen.com.

Shuffle is one of the most expensive operations that will affect the performance of the job. Even though Spark tries to avoid shuffle as possible as it can, some operations require shuffle step to achieve the result (eg groupByKey, sortByKey, reduceByKey, distinct, …). These operations require one node fetches data from many the other nodes to has enough data for computing the result. Fetching data via network results in data partition, sorting, serialization, disk and network IO, … which are expensive to the application. The module shuffle in current Spark version has been evolving and bring significant improvements over time.

ShuffleOverall

Shuffle: write phase & read phase

Just same as hadoop map reduce, Spark shuffle involves the aggregate step (combiner) before writing map outputs (intermediate values) to buckets. Spark also writes to a small buffers (size of buffer is configurable via spark.shuffle.file.buffer.kb) before writing to physical files to increase disk I/O speed.

1. The input aggregation

The aggregate is done thanks to the implementation in ShuffleMapTask: combines input by using 2 special data structure: AppendOnlyMap (In-memory hash table combiner) & ExternalAppendOnlyMap (In-memory and disk hash table combiner).

Available implementations of Spark shuffle are hash and sort. (spark.shuffle.manager). Sort is set by default since version 1.2 of Spark.

 2. The basic hash shuffleBasic Hash Shuffle

In basic hash shuffle, each map task will write outputs into multiple files.
Suppose we have:
#MapTasks = M
#ReduceTasks = R
=> #ShuffleFiles = M * R, and #In-memoryBuffers = M*R respectively.
This implementation leads to the problem that: if we use 100Kb as the size of one buffer, then with 10,000 reducers and 10 mappers/executor, the total size of in-memory buffer would be: 100Kb * 10,000 * 10 = 10 Gb/executor. 10Gb/executor used only for buffer is unacceptable! Thus, this implementation won’t support ~10,000 reducers, we need to change something. The easiest and fastest way is to lower the buffer size (but not too small). The Spark-2503 decreased the buffer size spark.shuffle.file.buffer.kb to 32 Kb by default. https://issues.apache.org/jira/browse/SPARK-2503.

3. The consolidate hash shuffle version

Consolidate shuffle

Consolidate shuffle

The solution that naively decreases the buffer size as previous apparently doesn’t satisfy us. Can we do better? Yes. The consolidation shuffle files – https://issues.apache.org/jira/browse/SPARK-751: within an executor, each bucket is map to a segment of file (as shown in the figure). As a result, for each executor, #ShuffleFiles = R, and #In-memoryBuffers = R. Thus, #ShuffleFiles = C*R / worker node if each node runs C executors in its C cores.

You can enable this feature by setting spark.shuffle.consolidateFiles=true.

4. The sort based shuffle

sort based shuffle

Sort based shuffle

With the consolidate hash shuffle, we reduced the number of shuffle files from M*R to R files per Executor. Why can we do as Hadoop MR does: each MapTask spills only one shuffle file containing segments, and one index file. This will save significant memory for compression and serialization buffers and result in more sequential disk I/O.

Sort-based shuffle implementation – https://issues.apache.org/jira/browse/SPARK-2045

Each MapTask generates 1 shuffle data file and 1 index file. Output will be sorted using an ExternalSorter.
– if map-side combine is required, data will be sorted by key and partition for aggregation. Otherwise, only sort by partition
– if #reducers <= 200 (spark.shuffle.sort.bypassMergeThreshold = 200) and no aggregation or ordering: do as hash way (without sorting): spills to partitions then merges.

Make sort-based shuffle write files directly when there is no sorting / aggregation and # of partitions is small- https://issues.apache.org/jira/browse/SPARK-2787

Reference:
[1] http://www.slideshare.net/colorant/spark-shuffle-introduction
[2] Shuffle behavior configuration – https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
[3] Sort based shuffle in Spark – https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf

Advertisements
This entry was posted in Spark and tagged , . Bookmark the permalink.

One Response to Understand the shuffle component in spark-core

  1. Pingback: Apache Spark 1.3 architecture – module spark-core | Khoa's IT blog

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s