Understand RDD operations: transformations and actions

This blog has been moved to new address: http://www.trongkhoanguyen.com.

As we’ve already known that each RDD has 2 sets of parallel operations: transformation and action. Today, let’s understand some of them.

1. Transformation

Transformation Result
map(f: T => U) Return a MappedRDD[U] by applying function f to each element
Map
flatmap(f: T => TraversableOnce[U]) Return a new FlatMappedRDD[U] by first applying a function to all elements and then flattening the results.
Map & FlatMap
For example: we want to print all words in a large text file, we can use the following code:

// create a RDD object from HDFS file
lines = spark.textFile("hdfs://input.txt");

// now we obtain a RDD which is a collection of
//words in the input.txt file
words = lines.flatMap(line => line.split(" "));
filter(f: T => Boolean) Return a FilteredRDD[T] having elemnts that f return true
mapPartitions(Iterator[T] => Iterator[U]) Return a new MapPartitionsRDD[U] by applying a function to each partition
sample(withReplacement, fraction, seed) Return a new PartitionwiseSampledRDD[T] which is a sampled subset
union(otherRdd[T]) Return a new UnionRDD[T] by making union with another Rdd
intersection(otherRdd[T]) Return a new RDD[T] by making intersection with another Rdd
distinct() Return a new RDD[T] containing distinct elements
groupByKey() Being called on (K,V) Rdd, return a new RDD[([K], Iterable[V])]
reduceByKey(f: (V, V) => V) Being called on (K, V) Rdd, return a new RDD[(K, V)] by aggregating values using feg: reduceByKey(_+_)
sortByKey([ascending]) Being called on (K,V) Rdd where K implements Ordered, return a new RDD[(K, V)] sorted by K
join(other: RDD[(K, W)) Being called on (K,V) Rdd, return a new RDD[(K, (V, W))] by joining them
cogroup(other: RDD[(K, W)) Being called on (K,V) Rdd, return a new RDD[(K, (Iterable[V], Iterable[W]))] such that for each key k in this & other, get a tuple with the list of values for that key in this as well as other
cartesian(other: RDD[U])  Return a  new RDD[(T, U)] by applying product

2. Actions

Action Result
reduce(f: (T, T) => T)  return T by reducing the elements using specified commutative and associative binary operator
Example:

rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd.reduce((a, b) => a + b)
collect() Return an Array[T] containing all elements
count() Return the number of elements
first() Return the first element
take(num) Return an Array[T] taking first num elements
takeSample(withReplacement, fraction, seed) Return an Array[T] which is a sampled subset
takeOrdered(num)(order) Return an Array[T] having num smallest or biggest (depend on order) elements
saveAsTextFile(fileName)
saveAsSequenceFile(fileName)
saveAsObjectFile(fileName)
Save (serialized) Rdd
countByValue() Return a Map[T, Long] having the count of each unique value
countByKey() Return a Map[K, Long] counting the number of elements for each key
foreach(f: T=>Unit) Apply function f to each element

3. Transformation & lazy evaluation will bring us more chance of optimizing our job

The figure below gives a quick overview of the flow of a spark job:

Schedule process

Flow of a Spark job [2]

Suppose we are running a simple word count job:

/* SimpleApp.scala */
val logFile = "YOUR_SPARK_HOME/README.md"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
println("Lines with a: %s".format(numAs))

Starting by creating a Rdd object by using SparkContext, then we transform it with the filter transformation and finally call action count. When an action is called on rdd, the SparkContext will submit a job to the DAGScheduler – where the very first optimizations happen.

The DAGSchedule receives target Rdds, functions to run on each partition (pipe the transformations, action), and a listener for results. It will:
– build Stages of Task objects (code + preferred location)
– submit them to TaskScheduler as ready
– Resubmit failed Stages if outputs are lost

The TaskScheduler is responsible for launching tasks at executors in our cluster, re-launch failed tasks several times, return the result to DAGScheduler.

We can now quickly summarize:
+ We submit a jar application which contains jobs
+ The job gets submitted to DAGScheduler via SparkContext will be split in to Stages. The DAGScheduler schedules the run order of these stages.
+ A Stage contains a set of tasks to run on Executors. The TaskScheduler schedules the run of tasks.

4. RDD Dependency types and the optimization at DAGScheduler:dependencies

Narrow dependency:  each partition of the parent RDD is used by at most one partition of the child RDD. This means the task can be executed locally and we don’t have to shuffle. (Eg: map, flatMap, Filter, sample)
Wide dependency: multiple child partitions may depend on one partition of the parent RDD. This means we have to shuffle data unless the parents are hash-partitioned (Eg: sortByKey, reduceByKey, groupByKey, cogroupByKey, join, cartesian)

Thanks to the lazy evaluation technique, the Scheduler will be able to optimize the stages before submitting the job: pipelines narrow operations within a stage, picks join algorithms based on partitioning (try to minimize shuffles), reuses previously cached data.

scheduler

Example of optimized stages in a job [2]

Further readings:
Implementation of narrow dependency & wide dependency in Spark
 Understand stage and task scheduler

References

[1] Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica. Technical Report UCB/EECS-2011-82. July 2011
[2] Introduction to AmpLab Spark Internals presentation-Matei Zaharia, https://www.youtube.com/watch?v=49Hr5xZyTEA

This entry was posted in Spark and tagged , . Bookmark the permalink.

4 Responses to Understand RDD operations: transformations and actions

  1. Pingback: A gentle introduction to Apache Spark | Khoa's IT blog

  2. Pingback: [Source code analysis] Narrow dependency & wide dependency implementation in Spark | Khoa's IT blog

  3. Pingback: Run applications in Spark cluster | Khoa's IT blog

  4. Pingback: [Source code analysis] Narrow dependency and wide dependency implementation in Spark | Khoa's IT blog

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s