Understand the shuffle component in spark-core

This blog has been moved to new address: http://www.trongkhoanguyen.com.

Shuffle is one of the most expensive operations that will affect the performance of the job. Even though Spark tries to avoid shuffle as possible as it can, some operations require shuffle step to achieve the result (eg groupByKey, sortByKey, reduceByKey, distinct, …). These operations require one node fetches data from many the other nodes to has enough data for computing the result. Fetching data via network results in data partition, sorting, serialization, disk and network IO, … which are expensive to the application. The module shuffle in current Spark version has been evolving and bring significant improvements over time.

ShuffleOverall

Shuffle: write phase & read phase

Just same as hadoop map reduce, Spark shuffle involves the aggregate step (combiner) before writing map outputs (intermediate values) to buckets. Spark also writes to a small buffers (size of buffer is configurable via spark.shuffle.file.buffer.kb) before writing to physical files to increase disk I/O speed.

1. The input aggregation

The aggregate is done thanks to the implementation in ShuffleMapTask: combines input by using 2 special data structure: AppendOnlyMap (In-memory hash table combiner) & ExternalAppendOnlyMap (In-memory and disk hash table combiner).

Available implementations of Spark shuffle are hash and sort. (spark.shuffle.manager). Sort is set by default since version 1.2 of Spark.

 2. The basic hash shuffleBasic Hash Shuffle

In basic hash shuffle, each map task will write outputs into multiple files.
Suppose we have:
#MapTasks = M
#ReduceTasks = R
=> #ShuffleFiles = M * R, and #In-memoryBuffers = M*R respectively.
This implementation leads to the problem that: if we use 100Kb as the size of one buffer, then with 10,000 reducers and 10 mappers/executor, the total size of in-memory buffer would be: 100Kb * 10,000 * 10 = 10 Gb/executor. 10Gb/executor used only for buffer is unacceptable! Thus, this implementation won’t support ~10,000 reducers, we need to change something. The easiest and fastest way is to lower the buffer size (but not too small). The Spark-2503 decreased the buffer size spark.shuffle.file.buffer.kb to 32 Kb by default. https://issues.apache.org/jira/browse/SPARK-2503.

3. The consolidate hash shuffle version

Consolidate shuffle

Consolidate shuffle

The solution that naively decreases the buffer size as previous apparently doesn’t satisfy us. Can we do better? Yes. The consolidation shuffle files – https://issues.apache.org/jira/browse/SPARK-751: within an executor, each bucket is map to a segment of file (as shown in the figure). As a result, for each executor, #ShuffleFiles = R, and #In-memoryBuffers = R. Thus, #ShuffleFiles = C*R / worker node if each node runs C executors in its C cores.

You can enable this feature by setting spark.shuffle.consolidateFiles=true.

4. The sort based shuffle

sort based shuffle

Sort based shuffle

With the consolidate hash shuffle, we reduced the number of shuffle files from M*R to R files per Executor. Why can we do as Hadoop MR does: each MapTask spills only one shuffle file containing segments, and one index file. This will save significant memory for compression and serialization buffers and result in more sequential disk I/O.

Sort-based shuffle implementation – https://issues.apache.org/jira/browse/SPARK-2045

Each MapTask generates 1 shuffle data file and 1 index file. Output will be sorted using an ExternalSorter.
– if map-side combine is required, data will be sorted by key and partition for aggregation. Otherwise, only sort by partition
– if #reducers <= 200 (spark.shuffle.sort.bypassMergeThreshold = 200) and no aggregation or ordering: do as hash way (without sorting): spills to partitions then merges.

Make sort-based shuffle write files directly when there is no sorting / aggregation and # of partitions is small- https://issues.apache.org/jira/browse/SPARK-2787

Reference:
[1] http://www.slideshare.net/colorant/spark-shuffle-introduction
[2] Shuffle behavior configuration – https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
[3] Sort based shuffle in Spark – https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf

Posted in Spark | Tagged , | 1 Comment

Understand the scheduler component in spark-core

This blog has been moved to new address: http://www.trongkhoanguyen.com.

1. Introduction

To day, let’s get to understand what’s really happening behind the scene after we submit a Spark job to the cluster. I promise you that there will be many interesting stuffs to be discovered ;).

This is my first post on the series of understanding core components in module spark-core – the scheduling implementation. I would like to explain this first because this component requires us to start from the beginning (job submission) to the end of the job (gets back the results after all tasks are run and finished). Therefore, we will able to:

– have a really broad look at the whole system.
– understand the optimization on building graph of stages which are sets of tasks for running a job.
– understand how TaskScheduler delivers and schedules the works on cluster.
– understand how different components interact with each others: DAGScheduler, TaskScheduler, CacheManager, BlockManager, ConnectionManager,…

2. Going for details

Starting with an application submitted to the Master, the user’s code will be run on a driver.

MasterNode

Continue reading

Posted in Architecture, Spark | Tagged | 4 Comments

Apache Spark modules and their dependencies

This blog has been moved to new address: http://www.trongkhoanguyen.com.

Apache Spark Modules

Apache Spark Modules

As you can see, module spark-core is the foundation framework for all the others. This module provides the implementations for spark computing engine: rdd, schedule, deploy, executor, storage, shuffle, …

Module spark-sql including spark-hive and spark-catalyst lets you query structured data as a distributed dataset by using SQL queries. module spark-hive provides the capability of interacting with hive, and module spark-catalyst is used as a query optimization framework for spark.

Module spark-lib is a scalable machine learning library leveraging the power of computing of spark. spark-lib can even run on streaming data or use sql-queries to extract.

Module spark-streaming and spark-graphx make it easy to build scalable fault-tolerant streaming applications and graph-parallel computation, respectively.

Posted in Architecture, Spark | Tagged | Leave a comment

Apache Spark 1.3 architecture – module spark-core

This blog has been moved to new address: http://www.trongkhoanguyen.com.

After spending a significant time in reading the source code in spark-core project, I can briefly draw the architecture showing the relationships and the flow (messages passed) between important components in this module:Spark-core   See you in my next posts for more details on them. I believe that it’s extremely important to understand following components: schedule, shuffle and storage.

Update:
Understand the storage module in spark-core
Understand the scheduler component in spark-core
Understand the shuffle component in spark-core

Posted in Architecture, Spark | Tagged , | 2 Comments

Understand the Spark deployment modes

This blog has been moved to new address: http://www.trongkhoanguyen.com.

1. Spark deployment modes

Spark provides many modes to run our application:
Local mode: run our application locally. This is really useful for debugging, we can step our code line by line with an IDE (I use Intellij)
Standalone mode: we can easily deploy a standalone cluster with very few steps and configurations and then we can play around with it.
ApacheMesos
HadoopYARN

Running application in cluster mode

In general, the process of running an application on Spark is as followed:

– Firstly, we write a Spark application that creates a SparkContext to initialize the DriverProgram, and then registers the application to the ClusterManager.  The DriverProgram with pre-configured parameters will next ask ClusterManager for resources, which is a list of Executors. You can notice easily that different Applications run in different Executors (or we can say Tasks from different Applications run in different JVMs).

Continue reading

Posted in Spark | Tagged | 1 Comment

[Source code analysis] Narrow dependency and wide dependency implementation in Spark

This blog has been moved to new address: http://www.trongkhoanguyen.com.

Files: Dependency.scala
As mentioned about different types of dependencies of RDDs in previous post, today I’m going to dive more about its implementation.

Type of dependencies

As you can see from the class diagram, dependency is divided into two types: narrow and shuffle (wide).

– For NarrowDependency, we have 3 concrete classes: OneToOneDependencyRangeDependency and PruneDependency:
+ OneToOneDependency:  one partition of the parent RDD is used by one partition of the child RDD. So, method  getParents(partitionID: Int) which gets the parent partitions for a child partition just returns that partitionID.
RangeDependency: is useful for Union operation where a set of child partitions depend on a set of parent partitions.
PruneDependency: used by PartitionPruningRDD: the child RDD contains a subset of partitions of the parents.

Continue reading

Posted in Source code analysis, Spark | Tagged , | 1 Comment

Top reasons why shift to spark

This blog has been moved to new address: http://www.trongkhoanguyen.com.

– Fast, in-memory (100x faster) or disk (2-10x faster). See Daytona GraySort contest and Official Result
– Usability: rich APIs (Scala, Java, Python), concise, interactive shell

Complexity

LoC of Spark in comparison with other projects

Complexity2

LoC of Spark core-framework and its integrated libraries

 

– Well designed, unified: Spark is a general platform and SparkSQL, SparkStreaming, GraphX, MLib are standard libraries included with Spark. These libraries provide a wide range of features that support multiple usages.
– Concrete foundation: Databricks & UC Berkeley AMPLab & Community
– Many adopters: Amazon, Yahoo!, Autodesk, Technicolor, Baidu, Celtra, eBay Inc., IBM Almaden, SamSung SDS, Sonny, …

Posted in Spark | Leave a comment

Understand the storage module in spark-core

This blog has been moved to new address: http://www.trongkhoanguyen.com.

The module storage in Spark provides the data access service for application, including:

– Reads and stores data from various sources: HDFS, Local disk, RAM or even fetch blocks of data from other nodes via network.
– Caches data (RDDs) in different levels and formats: in-memory, disk, …

The most striking difference between Spark and Hadoop is the leverage of RAM memory in cluster to produce fast results. Thus, it’s worth to go a bit deeper on understand the caching API.

1. How RDD caching system works

RDDCaching

– Application will first call 2 methods: cache() or persist() on the RDD object.
– The object SparkContext keeps persistentRDDs as the list of  cached RDDs. SparkContext can hold the reference of RDD being cached or remove that and then call gc. It can even periodically remove null RDD or old cached RDD thanks to the TimeStampedWeakValueHashMap[A, B].

Continue reading

Posted in Spark | Tagged , | 2 Comments

Scala quick reference

This blog has been moved to new address: http://www.trongkhoanguyen.com.

Remembering many programming syntax is somehow painful for me. As I have not graduated yet, memorizing many programming languages for many courses at school really hurts me sometimes. Thus, It’s better for me to write here the scala code which can be used to solved regular tasks as I can quickly refer to.

1. Scala type hierarchy
scala class

 

Scala treats everything as objects.

Class Descrition
Any Every class inherits from a common super-class named Any.
AnyVal Base class of built-in value classes
AnyRef Base class of all reference classes (if being run on Java, AnyRef is an alias of java.lang.Object)For reference equality, AnyRef class has eq method, which cannot be overridden (behaves like == in Java for reference types). Opposite of eq is ne
ScalaObject Scala classes are different from Java classes in that they also inherit from a special marker trait called ScalaObject
Null The type of null reference, a subclass of every reference class
Nothing Subclass of every other class (including Null), don’t have value, useful for signaling abnormal termination

 2. Variable declaration with val & var
val immutableVar = value; // to declare an immutable variable (constant)
var mutableVar = value // to declare a mutable variable
If we don’t specify the keyword val or var, Scala will treat it as val.

Examples: Continue reading

Posted in Scala | Tagged | Leave a comment

Introduction to SparkSQL

This blog has been moved to new address: http://www.trongkhoanguyen.com.

With Spark and RDD core API, we can do almost everything with datasets. Developers define steps of how to retrieve the data by applying functional transformations on RDDs. They are also the guy who try to optimize his code.

With SparkSQL (the replacement of Shark), we can even play further with data: specifying what data we want to retrieve on partitioned collection of tuples schema (table, columns, data types). SparkSQL will do the optimization for us.

So, what SparkSQL actually does?
Transforms SQL queries into optimal RDD transformations and actions.
Features:
– interactive query
– can be mixed with machine learning
– support Hive query

Module SparkSQL contains:
– Catalyst: query parser & query optimizer
– Executor
– Hive support

1. Catalyst

CatalystDesign

Catalyst provides a execution planning framework
– SQL parser & analyzer
– Logical operator & general expression
– Logical optimizer
– Transform operator tree

Steps:

Parse -> Analyze -> Logical Plan -> Optimize -> Physical Plan -> Execute 

                          Catalyst                                                              SQL core  Continue reading

Posted in Spark | Tagged | Leave a comment