forked from rndp89/pyspark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
shuffling_aggregation.txt
44 lines (40 loc) · 2.79 KB
/
shuffling_aggregation.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Let us understand the concept of Shuffling.
As we have seen a Spark job will run in multiple stages
Stages will run in linear fashion. For example Stage 1 will run only after Stage 0 is completely done
In each stage data will be processed using tasks
Output of stage 0 tasks will be passed as input to stage 1 tasks
When the output of tasks in earlier stages is passed as input to tasks in later stages, following happen
Data will be partitioned by using hash mod algorithm
Data related to keys will be grouped together
This data will be cached in memory and it might be spilled to disk as well.
Data related to a particular key from all tasks of earlier stages will be passed to one task in later stages.
This entire process is called shuffling
When certain APIs such as reduceByKey/aggregateByKey is used, it will also perform some thing called combining which can improve the performance significantly.
APIs such as join, reduceByKey, aggregateByKey, groupByKey etc result in shuffling.
Number of tasks in subsequent stages are determined by one of these
Number of partitions from earlier stage
numTasks or numPartitions argument as part of APIs that result in shuffling
repartition or coalesce (covered as part of next topic)
Accurate number of tasks can only be determined after understanding data behavior in detail. Here is some of the criteria
Ratio between input data vs. output (in case of filtering and aggregations output size will be considerably lower)
Keys on which data is shuffled (sparse keys vs. dense keys)
Joins and potential cartesian products
and more
Here are the examples of groupByKey, reduceByKey and aggregateByKey to understand the differences.
orderItems = sc.textFile("/public/retail_db/order_items")
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
orderItemsGBK = orderItemsMap.groupByKey(3)
orderItemsGBKMap = orderItemsGBK.map(lambda oi: (oi[0], sum(oi[1])))
for i in orderItemsGBKMap.take(10): print(i)
orderItems = sc.textFile("/public/retail_db/order_items")
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
orderItemsRBK = orderItemsMap.reduceByKey(lambda x, y: x + y, 3)
for i in orderItemsRBK.take(10): print(i)
orderItems = sc.textFile("/public/retail_db/order_items")
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
orderItemsABK = orderItemsMap.aggregateByKey((0.0, 0),
lambda x, y: (x[0] + y, x[1] + 1),
lambda x, y: (x[0] + y[0], x[1] + y[1]),
3
)
for i in orderItemsABK.take(10): print(i)