Introduction to SparkSQL

This blog has been moved to new address:

With Spark and RDD core API, we can do almost everything with datasets. Developers define steps of how to retrieve the data by applying functional transformations on RDDs. They are also the guy who try to optimize his code.

With SparkSQL (the replacement of Shark), we can even play further with data: specifying what data we want to retrieve on partitioned collection of tuples schema (table, columns, data types). SparkSQL will do the optimization for us.

So, what SparkSQL actually does?
Transforms SQL queries into optimal RDD transformations and actions.
– interactive query
– can be mixed with machine learning
– support Hive query

Module SparkSQL contains:
– Catalyst: query parser & query optimizer
– Executor
– Hive support

1. Catalyst


Catalyst provides a execution planning framework
– SQL parser & analyzer
– Logical operator & general expression
– Logical optimizer
– Transform operator tree


Parse -> Analyze -> Logical Plan -> Optimize -> Physical Plan -> Execute 

                          Catalyst                                                              SQL core 

2. Query optimization with simple rules transform:

A simple query example:
FROM (SELECT id, name
FROM People) p


Usual Logical plan to Physical Plan

Perform optimizations on plan


Optimization steps



‘Filter Push-Down’ rule example

Screenshot 2014-12-18 13.04.06

Filter Push Down implementation

3. Optimization rules in Spark-SQL

Optimizations Action
NullPropagation 1+null => null
count(null) = 0
 ConstantFolding   1+2=>3
 BooleanSimplification: false AND $right => false
true AND $right => $right
true OR $right => true
false OR $right => $right
if(true, $then, $else) => $then
 SimplifyFilters remove trivially filters:
Filter(true, child) => child
Filter(false, child)=> empty
  CombineFilters merges 2 filters:
Filter($fc, Filter($nc, child)) => Filter(AND($fc, $nc), child)
 PushPredicateThroughProject pushes Filter operators through project operator:
Filter(‘i==1’, Project(i, j, child)) => Project(i, j, Filter(‘i==1’, child))
 PushPredicateThroughJoin pushes Filter operators through join operator:
Filter(‘left.i’.att == 1, Join(left, right)) => Join(Filter(‘i==1’, left), right)
 ColumnPruning Eliminates the reading of unused columns:
Join(left, right, leftSemi, “”.attr == “”.attr)=> Join(left, Project(‘id, right’), leftSemi)


4. Increasing performance by generating bytecode at run-time:

– We know that we are dealing with schema data (with rows and columns), therefore, we will have to evaluate a huge number of expressions like: summing columns for each row (eg: tableA[row_i, column_i] + tableA[row_i, column_j]), subtracting rows,…

– The problem is the abstract implementation of Java for such expressions – generics: can reduce the number of code but disregard performance:
– A simple call on 2 integers: a + b  will cost: boxing cost, garbage collection cost and virtual functions called (to base class) cost.
– By generating byte code at run-time, we no longer have to worry about boxing cost and virtual function called. Garbage collection is also reduced.Screenshot 2014-12-18 13.24.28

Screenshot 2014-12-18 13.25.08

This entry was posted in Spark and tagged . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s