This blog has been moved to new address: http://www.trongkhoanguyen.com.
As we’ve already known that each RDD has 2 sets of parallel operations: transformation and action. Today, let’s understand some of them.
|map(f: T => U)||Return a MappedRDD[U] by applying function f to each element
|flatmap(f: T => TraversableOnce[U])||Return a new FlatMappedRDD[U] by first applying a function to all elements and then flattening the results.
For example: we want to print all words in a large text file, we can use the following code:
// create a RDD object from HDFS file lines = spark.textFile("hdfs://input.txt"); // now we obtain a RDD which is a collection of //words in the input.txt file words = lines.flatMap(line => line.split(" "));
|filter(f: T => Boolean)||Return a FilteredRDD[T] having elemnts that f return true|
|mapPartitions(Iterator[T] => Iterator[U])||Return a new MapPartitionsRDD[U] by applying a function to each partition|
|sample(withReplacement, fraction, seed)||Return a new PartitionwiseSampledRDD[T] which is a sampled subset|
|union(otherRdd[T])||Return a new UnionRDD[T] by making union with another Rdd|
|intersection(otherRdd[T])||Return a new RDD[T] by making intersection with another Rdd|
|distinct()||Return a new RDD[T] containing distinct elements|
|groupByKey()||Being called on (K,V) Rdd, return a new RDD[([K], Iterable[V])]|
|reduceByKey(f: (V, V) => V)||Being called on (K, V) Rdd, return a new RDD[(K, V)] by aggregating values using feg: reduceByKey(_+_)|
|sortByKey([ascending])||Being called on (K,V) Rdd where K implements Ordered, return a new RDD[(K, V)] sorted by K|
|join(other: RDD[(K, W))||Being called on (K,V) Rdd, return a new RDD[(K, (V, W))] by joining them|
|cogroup(other: RDD[(K, W))||Being called on (K,V) Rdd, return a new RDD[(K, (Iterable[V], Iterable[W]))] such that for each key k in this & other, get a tuple with the list of values for that key in this as well as other|
|cartesian(other: RDD[U])||Return a new RDD[(T, U)] by applying product|
|reduce(f: (T, T) => T)|| return T by reducing the elements using specified commutative and associative binary operator
rdd = sc.parallelize(Array(1, 2, 3, 4, 5)) rdd.reduce((a, b) => a + b)
|collect()||Return an Array[T] containing all elements|
|count()||Return the number of elements|
|first()||Return the first element|
|take(num)||Return an Array[T] taking first num elements|
|takeSample(withReplacement, fraction, seed)||Return an Array[T] which is a sampled subset|
|takeOrdered(num)(order)||Return an Array[T] having num smallest or biggest (depend on order) elements|
|Save (serialized) Rdd|
|countByValue()||Return a Map[T, Long] having the count of each unique value|
|countByKey()||Return a Map[K, Long] counting the number of elements for each key|
|foreach(f: T=>Unit)||Apply function f to each element|
3. Transformation & lazy evaluation will bring us more chance of optimizing our job
The figure below gives a quick overview of the flow of a spark job:
Suppose we are running a simple word count job:
/* SimpleApp.scala */ val logFile = "YOUR_SPARK_HOME/README.md" val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() println("Lines with a: %s".format(numAs))
Starting by creating a Rdd object by using SparkContext, then we transform it with the filter transformation and finally call action count. When an action is called on rdd, the SparkContext will submit a job to the DAGScheduler – where the very first optimizations happen.
The DAGSchedule receives target Rdds, functions to run on each partition (pipe the transformations, action), and a listener for results. It will:
– build Stages of Task objects (code + preferred location)
– submit them to TaskScheduler as ready
– Resubmit failed Stages if outputs are lost
The TaskScheduler is responsible for launching tasks at executors in our cluster, re-launch failed tasks several times, return the result to DAGScheduler.
We can now quickly summarize:
+ We submit a jar application which contains jobs
+ The job gets submitted to DAGScheduler via SparkContext will be split in to Stages. The DAGScheduler schedules the run order of these stages.
+ A Stage contains a set of tasks to run on Executors. The TaskScheduler schedules the run of tasks.
– Narrow dependency: each partition of the parent RDD is used by at most one partition of the child RDD. This means the task can be executed locally and we don’t have to shuffle. (Eg: map, flatMap, Filter, sample)
– Wide dependency: multiple child partitions may depend on one partition of the parent RDD. This means we have to shuffle data unless the parents are hash-partitioned (Eg: sortByKey, reduceByKey, groupByKey, cogroupByKey, join, cartesian)
Thanks to the lazy evaluation technique, the Scheduler will be able to optimize the stages before submitting the job: pipelines narrow operations within a stage, picks join algorithms based on partitioning (try to minimize shuffles), reuses previously cached data.
 Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica. Technical Report UCB/EECS-2011-82. July 2011
 Introduction to AmpLab Spark Internals presentation-Matei Zaharia, https://www.youtube.com/watch?v=49Hr5xZyTEA