-
Notifications
You must be signed in to change notification settings - Fork 59
Optimizing with Partitioners
Last lecture, we learned about the hash partitioners and range partitioners, and also learned about what kind of operations may introduce new partitioners or which may discard custom partitioners.
It can bring substantial performance gains, especially in face of shuffles.
We saw how using reduceByKey
instead of groupByKey
localizes data better due to different partitioning strategy and thus reduces latency to deliver performance gains.
By manually repartition the data, for the same example, we can improve the performance even further. By usnig range partitioners we can optimize the use of reduceByKey
in that example so that it does not involve any shuffling over the network at all!
/* Same as before */
// CFF is a Swiss train company
case class CFFPurchase(customerId: Int, destination: String, price: Double)
// Assume that we have an RDD of purchases users of CFFs mobile app have made in the past month
val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val pairs = purchasesRdd.map( p => (p.customerId, p.price) )
/* NEW! */
val tunedPartitioner = new RangePartitioners(8, pairs) // 8 partitions
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
/* Same as before */
val purchasesPerMonth = partitioned.map( p => (p._1, (1, p._2) ) //uses partitioned instead of purchasesRDD
.reduceByKey( (v1, v2) => (v1._1 + v2._1, v1._2 + v2._2) )
.collect()
This gives a 9 times speedup in practical tests.
Consider a media application that keeps a large table of user information in memory. Here users can subscribe to different topics.
-
userData
: BIG Pair RDD, contains(UserID, UserInfo)
pairs.UserInfo
contains a list of topics the user is subscribed to.
The application periodically combines this table with a smaller file representing events that happened in the past five minutes:
-
events
: SMALL Pair RDD, containing(UserID, LinkInfo)
pairs for users who have clicked a link on a website in those five minutes.
For example, we may wish to count how many users visited a link that was not to one of their subscribed topics. We can perform this combination with Spark's join
operation, which can be used to group the 1UserInfoand
LinkInfow.r.t. the
UserID` key.
Here is a program which does that:
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserIF, UserInfo]("hdfs://...").persist()
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](lofFileName)
val joined = userData.join(events) // RDD of (UserId, (UserInfo, LinkInfo))
// find the no.of links that the user clicks on that he is not subscribed to
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic)
}.count()
println("No. of visits to non-subscribed topics: " + offTopicVisits)
}
This code works as desired, but it is very inefficient: The join
operation is called everytime processNewLogs
is invoked, and it doe not know anything about how the keys are partitioned in the datasets. By default, this operation will hash all the keys of both datasets (userData
and events
) sending elements with the same key hash across the network to the same machine, and then join the elements with the same key on that machine, although that data already existed since the pairRdd userData
does not change.
Fixing this is easy! Use partitionBy
on the Big userData
PairRDD at the start of the program:
// change this
val userData = sc.sequenceFile[UserIF, UserInfo]("hdfs://...").persist()
// to this
val userData = sc.sequenceFile[UserIF, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // create 100 partitions
.persist()
Spark will now know that userData
is hash-partitioned, and when join
is called on it, Spark will take advantage of this information. So Spark will only shuffle the events
Pair Rdd, sending events with each particular UserId
to the machine that contains the corresponding hash partition of userData
.
Rule of Thumb: a shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD.
We can also figure out if a shuffle has been planned or executed via:
-
The return type of certain transformations. E.g.
org.apache.spark.rdd.RDD[(String, Int)] = ShuffleRDD[366] //shuffle has happended or planned
-
Using function
toDebugString
to see its execution plan:partitioned.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)).toDebugString res9: String = (8) MapPartitionRDD[633] at reduceNyKey at <console>:49 [] | ShuffledRDD[615] at partitionBy at <console>:48 [] | CachedPartitions: 8; MemorySize: 1754.8 MB, DiskSIze: 0.0 B.
Operations that might cause a shuffle:
cogroup
groupWith
join
leftOuterJoin
rightOuterJoin
groupByKey
reduceByKey
combineByKey
distinct
intersection
repartition
coalesce
-
reduceByKey
running on a pre-partitioned RDD will cause the values to be computed locally, requiring only the final reduced value to be sent from worker to the driver. -
join
called on 2 RDDs that are pre-partitioned with the same partitioner and cached on the same machine will cause the join to be computed locally, with no shuffling across the network.
How the data is organized on the cluster, and what operations we are doing on it matters!
Week 1
- Introduction
- Data Parallel to Distributed Data Parallel
- Latency
- RDDs: Spark's Distributed Collection
- RDDs: Transformation and Action
- Evaluation in Spark: Unlike Scala Collections!
- Cluster Topology Matters!
Week 2
- Reduction Operations (fold, foldLeft, aggregate)
- Pair RDDs
- Pair RDDs: Transformations and Actions
- Pair RDDs: Joins
Week 3
- Shuffling: What it is and why it's important
- Partitioning
- Optimizing with Partitioners
- Wide vs Narrow Dependencies
Week 4