-
Notifications
You must be signed in to change notification settings - Fork 55
/
tuning.Rmd
607 lines (442 loc) · 36.9 KB
/
tuning.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
```{r include=FALSE, eval=TRUE}
knitr::opts_chunk$set(eval = FALSE)
library(sparklyr)
library(dplyr)
source("r/render.R")
```
# Tuning {#tuning}
> Chaos isn’t a pit. Chaos is a ladder.
>
> --- Petyr Baelish
In previous chapters, we’ve assumed that computation within a Spark cluster works efficiently. While this is true in some cases, it is often necessary to have some knowledge of the operations Spark runs internally to fine-tune configuration settings that will make computations run efficiently. This chapter explains how Spark computes data over large datasets and provides details on how to optimize its operations.
For instance, in this chapter you'll learn how to request more compute nodes and increase the amount of memory, which, if you remember from [Chapter 2](#starting), defaults to only 2 GB in local instances. You will learn how Spark unifies computation through partitioning, shuffling, and caching. As mentioned a few chapters back, this is the last chapter describing the internals of Spark; after you complete this chapter, we believe that you will have the intermediate Spark skills necessary to be productive at using Spark.
In Chapters [10](#extensions)–[12](#streaming) we explore exciting techniques to deal with specific modeling, scaling, and computation problems. However, we must first understand how Spark performs internal computations, what pieces we can control, and why.
## Overview
Spark<!--((("tuning", "overview of", id="Tover09")))--> performs distributed computation by configuring, partitioning, executing, shuffling, caching, and serializing data, tasks, and resources across multiple machines:
- [*Configuring*](#tuning-configuring) requests<!--((("configuring", "purpose of")))--> the cluster manager for resources: total machines, memory, and so on.
- [*Partitioning*](#tuning-configuring) splits<!--((("partitioning", "purpose of")))--> the data among various machines. Partitions can be either implicit or explicit.
- [*Executing*](#tuning-configuring) means<!--((("execution, defined")))--> running an arbitrary transformation over each partition.
- [*Shuffling*](#tuning-configuring) redistributes<!--((("shuffling")))--> data to the correct machine.
- [*Caching*](#tuning-configuring) preserves<!--((("caching", "purpose of")))--> data in memory across different computation cycles.
- [*Serializing*](#tuning-serializing) transforms<!--((("serialization", "purpose of")))--> data to be sent over the network to other workers or back to the driver node.
To illustrate each concept, let's create three partitions with unordered integers and then sort them using `arrange()`:
```{r echo=FALSE}
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")
```
```{r}
data <- copy_to(sc,
data.frame(id = c(4, 9, 1, 8, 2, 3, 5, 7, 6)),
repartition = 3)
data %>% arrange(id) %>% collect()
```
Figure \@ref(fig:tuning-overview) shows<!--((("job execution, process of")))--> how this<!--((("sorting operations")))--> sorting _job_ would conceptually work across a cluster of machines. First, Spark would _configure_ the cluster to use three worker machines. In this example, the numbers `1` through `9` are partitioned across three storage instances. Since the _data_ is already partitioned, each worker node loads this implicit _partition_; for instance, `4`, `9`, and `1` are loaded in the first worker node. Afterward, a _task_ is distributed to each worker to apply a transformation to each data partition in each worker node; this task is denoted by `f(x)`. In this example, `f(x)` _executes_ a sorting operation within a partition. Since Spark is general, execution over a partition can be as simple or complex as needed.
The result is then _shuffled_ to the correct machine to finish the sorting operation across the entire dataset, which completes a stage. A<!--((("stage")))--> _stage_ is a set of operations that Spark can execute without shuffling data between machines. After the data is sorted across the cluster, the sorted results can be optionally _cached_ in memory to avoid rerunning this computation multiple times.
Finally, a small subset of the results is _serialized_, through the network connecting the cluster machines, back to the driver node to print a preview of this sorting example.
Notice that while Figure \@ref(fig:tuning-overview) describes a sorting operation, a similar approach applies to filtering or joining datasets and analyzing and modeling data at scale. Spark provides support to perform custom partitions, custom shuffling, and so on, but most of these lower-level operations are not exposed in `sparklyr`; instead, `sparklyr` makes those operations available through higher-level commands provided by data [analysis](#analysis) tools like `dplyr` or `DBI`, [modeling](#modeling), and by using many [extensions](#extensions). For those few cases in which you might need to implement low-level operations, you can always use Spark’s Scala API through `sparklyr` extensions or run custom [distributed R](#distributed) code.
To effectively tune Spark, we will start by getting familiar with Spark’s computation [graph](#tuning-graph-visualization) and Spark’s event [_timeline_](#tuning-event-timeline). Both are accessible through [Spark’s web interface](#starting-spark-web-interface).
```{r tuning-overview-code, echo=FALSE}
r2d3::r2d3(
c(),
"images/tuning-spark-overview.js",
dependencies = "images/tuning-diagram.js",
css = "images/tuning-spark-overview.css"
)
```
```{r tuning-overview, eval=TRUE, echo=FALSE, fig.cap='Sorting distributed data with Apache Spark', fig.align='center'}
render_image("images/tuning-spark-overview.png", "Sorting Distributed Data with Apache Spark")
```
### Graph {#tuning-graph-visualization}
Spark describes<!--((("acyclic graphs")))((("Apache Spark", "computation graph")))((("Directed Acyclic Graph (DAG)")))--> all computation steps using a Directed Acyclic Graph (DAG), which means that all computations in Spark move computation forward without repeating previous steps, which helps Spark optimize computations effectively.
The best way to understand Spark’s computation graph for a given operation—sorting for our example—is to open the last _completed query_ on the SQL tab in [Spark’s web interface](#starting-spark-web-interface). Figure \@ref(fig:tuning-graph-sql-render) shows the resulting graph for this sorting operation, which contains the following operations:
`WholeStageCodegen`
: This block specifies that the operations it contains were used to generate computer code that was efficiently translated to byte code. There is usually a small cost associated with translating operations into byte code, but this is a worthwhile price to pay since the operations then can be executed much faster from Spark. In general, you can ignore this block and focus on the operations that it contains.
`InMemoryTableScan`
: This means that the original dataset `data` was stored in memory and traversed row by row once.
`Exchange`
: Partitions were exchanged—that is, shuffled—across executors in your cluster.
`Sort`
: Once the records arrived at the right executor, they were sorted in this final stage.
```{r tuning-graph-sql, echo=FALSE}
webshot::webshot(
"http://localhost:4040/SQL/execution/?id=7",
file = "images/tuning-spark-graph-visualization-sql.png",
selector = c("#plan-viz-graph"))
```
```{r tuning-graph-sql-render, eval=TRUE, echo=FALSE, fig.align = 'center', fig.cap='Spark graph for a sorting query'}
render_image("images/tuning-spark-graph-visualization-sql.png", "Spark Graph for a Sorting Query")
```
From<!--((("DAG Visualization")))--> the query details, you then can open the last Spark job to arrive to the job details page, which you can expand by using "DAG Visualization" to create a graph similar to Figure \@ref(fig:tuning-graph-render). This graph shows a few additional details and the stages in this job. Notice that there are no arrows pointing back to previous steps, since Spark makes use of acyclic graphs.
```{r tuning-graph, echo=FALSE}
webshot::webshot(
"http://localhost:4040/jobs/job/?id=4",
file = "images/tuning-spark-graph-visualization.png",
selector = c("#plan-viz-graph"))
```
```{r tuning-graph-render, eval=TRUE, echo=FALSE, fig.align = 'center', fig.cap='Spark graph for a sorting job'}
render_image("images/tuning-spark-graph-visualization.png", "Spark Graph for a Sorting Job")
```
Next, we dive into a Spark stage and explore its event timeline.
### Timeline {#tuning-event-timeline}
The _event timeline_ is<!--((("event timeline")))((("timeline")))--> a great summary of how Spark is spending computation cycles over each stage. Ideally, you want to see this timeline consisting of mostly CPU usage since other tasks can be considered overhead. You also want to see Spark using all the CPUs across all the cluster nodes available to you.
Select the first stage in the current job and expand the event timeline, which should look similar to Figure \@ref(fig:tuning-timeline-simple). Notice that we explicitly requested three partitions, which are represented by three lanes in this visualization.
```{r tuning-timeline-simple-webshot, echo=FALSE}
webshot::webshot(
"http://localhost:4040/stages/stage/?id=6&attempt=0",
file = "images/tuning-spark-event-timeline.png",
eval = "
casper.waitForSelector(
'#task-assignment-timeline',
function() {
this.click('.expand-task-assignment-timeline');
}
);",
selector = c(".legend-area", ".table-bordered")
)
```
```{r tuning-timeline-simple, eval=TRUE, echo=FALSE, fig.align='center', fig.cap='Spark event timeline'}
render_image("images/tuning-spark-event-timeline.png", "Spark event timeline")
```
Since our machine is equipped with four CPUs, we can((("parallel execution"))) parallelize this computation even further by explicitly repartitioning data using `sdf_repartition()`, with the result shown in Figure \@ref(fig:tuning-timeline-repartition):
```{r tuning-timeline-sort-repartition}
data %>% sdf_repartition(4) %>% arrange(id) %>% collect()
```
```{r tuning-timeline-webshot, echo=FALSE}
webshot::webshot(
"http://localhost:4040/stages/stage/?id=8&attempt=0",
file = "images/tuning-spark-event-timeline-repartition.png",
eval = "
casper.waitForSelector(
'#task-assignment-timeline',
function() {
this.click('.expand-task-assignment-timeline');
}
);",
selector = c(".legend-area", "#task-summary-table"))
```
```{r tuning-timeline-repartition, eval=TRUE, echo=FALSE, fig.align='center', fig.cap='Spark event timeline with additional partitions'}
render_image("images/tuning-spark-event-timeline-repartition.png", "Spark Event Timeline with Additional Partitions")
```
Figure \@ref(fig:tuning-timeline-repartition) now shows four execution lanes with most time spent under Executor Computing Time, which shows us that this particular operation is making better use of our compute resources. When you are working with clusters, requesting more compute nodes from your cluster should shorten computation time. In contrast, for timelines that show significant time spent shuffling, requesting more compute nodes might not shorten time and might actually make everything slower. There is no concrete set of rules to follow to optimize a stage; however, as you gain experience understanding this timeline over multiple operations, you will develop insights as to how to properly optimize Spark operations.<!--((("", startref="Tover09")))-->
## Configuring {#tuning-configuring}
When<!--((("configuring", "common resources to configure")))((("tuning", "configuring", id="Tconfig09")))--> tuning a Spark application, the most common resources to configure are memory and cores, specifically:
Memory in driver
: The amount of memory required in the driver node
Memory per worker
: The amount of memory required in the worker nodes
Cores per worker
: The number of CPUs required in the worker nodes
Number of workers
: The number of workers required for this session
**Note:** It is recommended to request significantly more memory for the driver than the memory available over each worker node. In most cases, you will want to request one core per worker.
In local mode there are no workers, but we can still configure memory and cores to use through the following:
```{r tuning-config-local}
# Initialize configuration with defaults
config <- spark_config()
# Memory
config["sparklyr.shell.driver-memory"] <- "2g"
# Cores
config["sparklyr.connect.cores.local"] <- 2
# Connect to local cluster with custom configuration
sc <- spark_connect(master = "local", config = config)
```
When<!--((("configuring", "Standalone and Mesos")))((("Mesos")))((("Apache Mesos")))--> using the Spark Standalone and the Mesos cluster managers, all the available memory and cores are assigned by default; therefore, there are no additional configuration changes required, unless you want to restrict resources to allow multiple users to share this cluster. In this case, you can use `total-executor-cores` to restrict the total executors requested. The [_Spark Standalone_](http://bit.ly/307YtM6) and [_Spark on Mesos_](http://bit.ly/31H4LCT) guides provide additional information on sharing clusters.
When<!--((("configuring", "YARN")))--> running under YARN Client, you would configure memory and cores as follows:
```{r}
# Memory in Driver
config["sparklyr.shell.driver-memory"] <- "2g"
# Memory per Worker
config["spark.executor.memory"] <- "2G"
# Cores per Worker
config["sparklyr.shell.executor-cores"] <- 1
# Number of Workers
config["sparklyr.shell.num-executors"] <- 3
```
When using YARN in cluster mode you can use `sparklyr.shell.driver-cores` to configure total cores requested in the driver node. The [Spark on YARN](http://bit.ly/306WsQx) guide provides additional configuration settings that can benefit you.
There<!--((("configuring", "types of configuration settings")))--> are a few types of configuration settings:
_Connect_
: These settings are set as parameters to `spark_connect()`. They are common settings used while connecting.
_Submit_
: These settings are set while `sparklyr` is being submitted to Spark through `spark-submit`; some are dependent on the cluster manager being used.
_Runtime_
: These settings configure Spark when the Spark session is created. They are independent of the cluster manager and specific to Spark.
_sparklyr_
: Use these to configure `sparklyr` behavior. These settings are independent of the cluster manager and particular to R.
The following subsections present extensive lists of all the available settings. It is not required that you fully understand them all while tuning Spark, but skimming through them could prove useful in the future for troubleshooting issues. If you prefer, you can skip these subsections and use them instead as reference material as needed.
### Connect Settings {#connect-settings}
You<!--((("configuring", "connect settings")))((("commands", "spark_connect()")))--> can use the parameters listed in Table \@ref(tab:tuning-connect-table) with `spark_connect()`. They configure high-level settings that define the connection method, Spark’s installation path, and the version of Spark to use.
```{r tuning-connect-table, eval=TRUE, echo=FALSE}
knitr::kable(
data.frame(
name = c("master", "spark_home", "method", "app_name", "version", "config"),
value = c(
"Spark cluster url to connect to. Use \"local\" to connect to a local instance of Spark installed via 'spark_install()'.",
"The path to a Spark installation. Defaults to the path provided by the SPARK_HOME environment variable. If SPARK_HOME is defined, it will always be used unless the version parameter is specified to force the use of a locally installed version.",
"The method used to connect to Spark. Default connection method is \"shell\" to connect using spark-submit, use \"livy\" to perform remote connections using HTTP, \"databricks\" when using a Databricks cluster or \"qubole\" when using a Qubole cluster.",
"The application name to be used while running in the Spark cluster.",
"The version of Spark to use. Only applicable to \"local\" Spark connections.",
"Custom configuration for the generated Spark connection. See spark_config for details."
)
),
booktabs = TRUE,
caption = 'Parameters used when connecting to Spark'
)
```
You can configure additional settings by specifying a list in the `config` parameter. Let's now take a look at what those settings can be.
### Submit Settings {#submit-settings}
Some<!--((("configuring", "submit settings")))((("spark-submit script")))((("sparklyr package", "spark-submit script")))--> settings must be specified when `spark-submit` (the terminal application that launches Spark) is run. For instance, since `spark-submit` launches a driver node that runs as a Java instance, how much memory is allocated needs to be specified as a parameter to `spark-submit`.
You can list all the available `spark-submit` parameters by running the following:
```{r tuning-submit-code}
spark_home_dir() %>% file.path("bin", "spark-submit") %>% system2()
```
For readability, we’ve provided the output of this command in Table \@ref(tab:tuning-submit-table), replacing the `spark-submit` parameter with the appropriate `spark_config()` setting and removing the parameters that are not applicable or already presented in this chapter.
```{r tuning-submit-table, eval=TRUE, echo=FALSE}
knitr::kable(
data.frame(
name = c(
"sparklyr.shell.jars", "sparklyr.shell.packages", "sparklyr.shell.exclude-packages",
"sparklyr.shell.repositories", "sparklyr.shell.files", "sparklyr.shell.conf",
"sparklyr.shell.properties-file", "sparklyr.shell.driver-java-options",
"sparklyr.shell.driver-library-path", "sparklyr.shell.driver-class-path",
"sparklyr.shell.proxy-user", "sparklyr.shell.verbose"
),
value = c(
"Specified as 'jars' parameter in 'spark_connect()'.",
"Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by 'sparklyr.shell.repositories'. The format for the coordinates should be groupId:artifactId:version.",
"Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in 'sparklyr.shell.packages' to avoid dependency conflicts.",
"Comma-separated list of additional remote repositories to search for the maven coordinates given with 'sparklyr.shell.packages'",
"Comma-separated list of files to be placed in the working directory of each executor. File paths of these files in executors can be accessed via SparkFiles.get(fileName).",
"Arbitrary Spark configuration property set as PROP=VALUE.",
"Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf.",
"Extra Java options to pass to the driver.",
"Extra library path entries to pass to the driver.",
"Extra class path entries to pass to the driver. Note that jars added with 'sparklyr.shell.jars' are automatically included in the classpath.",
"User to impersonate when submitting the application. This argument does not work with 'sparklyr.shell.principal' / 'sparklyr.shell.keytab'.",
"Print additional debug output."
)
),
booktabs = TRUE,
caption = 'Setting available to configure spark-submit'
)
```
The remaining settings, shown in Table \@ref(tab:tuning-yarn-table), are specific to YARN.
```{r tuning-yarn-table, eval=TRUE, echo=FALSE}
knitr::kable(
data.frame(
name = c("sparklyr.shell.queue", "sparklyr.shell.archives",
"sparklyr.shell.principal", "sparklyr.shell.keytab"
),
value = c(
"The YARN queue to submit to (Default: \"default\").",
"Comma separated list of archives to be extracted into the working directory of each executor.",
"Principal to be used to login to KDC, while running on secure HDFS.",
"The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically."
)
),
booktabs = TRUE,
caption = 'Settings avalable to configure spark-submit when using YARN'
)
```
In general, any `spark-submit` setting is configured through `sparklyr.shell.X`, where X is the name of the `spark-submit` parameter without the `--` prefix.
### Runtime Settings
As<!--((("configuring", "runtime settings")))--> mentioned, some Spark settings configure the session runtime. The runtime settings are a superset of the [submit settings](#submit-settings) given that it is usually helpful to retrieve the current configuration even if a setting can’t be changed.
To list the Spark settings set in your current Spark session, you can run the following:
```{r tuning-runtime-config}
spark_session_config(sc)
```
```{r tuning-runtime-config-table-code, echo=FALSE}
sc <- spark_connect(master = "local")
settings <- spark_session_config(sc)
cleaned <- gsub("Frameworks/R.framework/Versions/3.5/Resources/library", "...", settings)
cleaned <- gsub("javierluraschi", "...", cleaned)
cleaned <- gsub("^file:", "", cleaned)
saveRDS(data.frame(name = names(settings), value = unlist(unname(cleaned))), "data/tuning-runtime-config.rds")
```
Table \@ref(tab:tuning-runtime-config-table) describes the runtime settings.
```{r tuning-runtime-config-table, eval=TRUE, echo=FALSE}
knitr::kable(
readRDS("data/tuning-runtime-config.rds"),
booktabs = TRUE,
caption = "Setting available to configure the Spark session"
)
```
However, there are many more configuration settings available in Spark, as described in the [_Spark Configuration_](http://bit.ly/2P0Yalf) guide. It's beyond the scope of this book to describe them all, so, if possible, take some time to identify the ones that might be of interest to your particular use cases.
### sparklyr Settings
Apart<!--((("configuring", "sparklyr settings", id="Csparklyr09")))((("commands", "spark_config_settings()")))((("sparklyr package", "configuration settings", id="SPconfig09")))--> from Spark settings, there are a few settings particular to `sparklyr`. You usually don’t use these settings while tuning Spark; instead, they are helpful while troubleshooting Spark from R. For instance, you can use `sparklyr.log.console = TRUE` to output the Spark logs into the R console; this is ideal while troubleshooting but too noisy otherwise. Here's how to list the settings (results are presented in Table \@ref(tab:tuning-settings-sparklyr-code)):
```{r tuning-settings-sparklyr, eval=FALSE}
spark_config_settings()
```
```{r tuning-settings-sparklyr-code, eval=TRUE, echo=FALSE}
knitr::kable(
spark_config_settings(),
booktabs = TRUE,
caption = "Settings available to configure the sparklyr package"
)
```
## Partitioning {#tuning-partitioning}
As<!--((("", startref="SPconfig09")))((("", startref="Csparklyr09")))((("", startref="Tconfig09")))((("tuning", "partitioning")))--> mentioned in [Chapter 1](#intro), MapReduce and Spark were designed with the purpose of performing computations against data stored across many machines. The subset of the data available for computation over each compute instance is known as a _partition_.
By default, Spark computes over each existing _implicit_ partition since it’s more effective to run computations where the data is already located. However, there are cases for which you will want to set an _explicit_ partition to help Spark make more efficient use of your cluster resources.
### Implicit Partitions
As [Chapter 8](#data) explained, Spark<!--((("partitioning", "implicit partitions")))((("implicit partitions")))--> can read data stored in many formats and different storage systems; however, since shuffling data is an expensive operation, Spark executes tasks reusing the partitions in the storage system. Therefore, these partitions are implicit to Spark since they are already well defined and expensive to rearrange.
There is always an implicit partition for every computation in Spark defined by the distributed storage system, even for operations which you wouldn't expect that create partitions, like `copy_to()`.
You can explore the number of partitions a computation will require by using `sdf_num_partitions()`:
```{r tuning-partitioning-implicit}
sdf_len(sc, 10) %>% sdf_num_partitions()
```
```
[1] 2
```
While in most cases the default partitions work just fine, there are cases for which you will need to be explicit about the partitions you choose.
### Explicit Partitions
There<!--((("partitioning", "explicit partitions")))((("explicit partitions")))--> will be times when you have many more or far fewer compute instances than data partitions. In both cases, it<!--((("repartitioning")))--> can help to _repartition_ data to match your cluster resources.
Various [data](#data) functions, like `spark_read_csv()`, already support a `repartition` parameter to request that Spark repartition data appropriately. For instance, we can create a sequence of 10 numbers partitioned by 10 as follows:
```{r tuning-partitioning-explicit}
sdf_len(sc, 10, repartition = 10) %>% sdf_num_partitions()
```
```
[1] 10
```
For datasets that are already partitioned, we can also use `sdf_repartition()`:
```{r tuning-partitioning-explicit-repartition}
sdf_len(sc, 10, repartition = 10) %>%
sdf_repartition(4) %>%
sdf_num_partitions()
```
```
[1] 4
```
The number of partitions usually significantly changes the speed and resources being used; for instance, the following example calculates the mean over 10 million rows with different partition sizes:
```{r tuning-partitioning-explicit-code-run, echo=FALSE}
library(microbenchmark)
library(ggplot2)
partitions_plot <- microbenchmark(
"1 Partition(s)" = sdf_len(sc, 10^7, repartition = 1) %>%
summarise(mean(id)) %>% collect(),
"2 Partition(s)" = sdf_len(sc, 10^7, repartition = 2) %>%
summarise(mean(id)) %>% collect(),
times = 10
) %>%
autoplot() + theme_light()
ggsave("images/tuning-partition-explicit.png", partitions_plot, width = 10, height = 6)
```
```{r tuning-partitioning-explicit-code}
library(microbenchmark)
library(ggplot2)
microbenchmark(
"1 Partition(s)" = sdf_len(sc, 10^7, repartition = 1) %>%
summarise(mean(id)) %>% collect(),
"2 Partition(s)" = sdf_len(sc, 10^7, repartition = 2) %>%
summarise(mean(id)) %>% collect(),
times = 10
) %>% autoplot() + theme_light()
```
Figure \@ref(fig:tuning-partitioning-explicit-results) shows that sorting data with two partitions is almost twice as fast. This is because two CPUs can be used to execute this operation. However, it is not necessarily the case that higher partitions produce faster computation; instead, partitioning data is particular to your computing cluster and the data analysis operations being performed.
```{r tuning-partitioning-explicit-results, eval=TRUE, echo=FALSE, fig.cap='Computation speed with additional explicit partitions', fig.align = 'center'}
render_image("images/tuning-partition-explicit.png", "Computation Speed with Additional Explicit Partitions")
```
## Caching {#tuning-caching}
Recall<!--((("tuning", "caching")))((("caching", "benefits of")))((("resilient distributed dataset (RDD)")))--> from [Chapter 1](#intro) that Spark was designed to be faster than its predecessors by using memory instead of disk to store data. This is formally known as a Spark _resilient distributed dataset_ (RDD). An RDD distributes copies of the same data across many machines, such that if one machine fails, others can complete the task—hence, the term "resilient." Resiliency is important in distributed systems since, while things will usually work in one machine, when running over thousands of machines the likelihood of something failing is much higher. When a failure happens, it is preferable to be fault tolerant to avoid losing the work of all the other machines. RDDs accomplish this by tracking data lineage information to rebuild lost data automatically on failure.
In `sparklyr`, you<!--((("commands", "tbl_cache()")))((("commands", "tbl_uncache()")))--> can control when an RDD is loaded or unloaded from memory using `tbl_cache()` and `tbl_uncache()`.
Most `sparklyr` operations that retrieve a<!--((("DataFrames", "retrieving")))--> Spark DataFrame cache the results in memory. For instance, running `spark_read_parquet()` or `copy_to()` will provide a Spark DataFrame that is already cached in memory. As a Spark DataFrame, this object can be used in most `sparklyr` functions, including data analysis with `dplyr` or machine learning:
```{r tuning-caching-connect}
library(sparklyr)
sc <- spark_connect(master = "local")
```
```{r tuning-caching-copy}
iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
```
You can inspect which tables are cached by navigating to the Spark UI using `spark_web(sc)`, clicking the Storage tab, and then clicking on a specific RDD, as illustrated in Figure \@ref(fig:tuning-caching-rdd-shot).
```{r tuning-caching-rdd, echo=FALSE}
invisible(webshot::webshot(
"http://localhost:4040/storage/rdd/?id=9",
"images/tuning-cache-rdd-web.png",
cliprect = "viewport"
))
```
```{r tuning-caching-rdd-shot, eval=TRUE, fig.width = 4, fig.align = 'center', echo=FALSE, fig.cap='Cached RDD in the Spark web interface'}
render_image("images/tuning-cache-rdd-web.png", "Cached RDD in Spark Web Interface")
```
Data loaded in memory will be released when the R session terminates, either explicitly or implicitly, with a restart or disconnection; however, to free up resources, you can use `tbl_uncache()`:
```{r tuning-caching-uncache}
tbl_uncache(sc, "iris")
```
### Checkpointing
Checkpointing<!--((("checkpointing")))((("caching", "checkpointing")))--> is a slightly different type of caching; while it also saves data, it will additionally break the graph computation lineage. For example, if a cached partition is lost, it can be computed from the computation graph, which is not possible with checkpointing since the source of computation is lost.
When performing operations which create expensive computation graphs, it can make sense to checkpoint to save and break the computation lineage in order to help Spark reduce graph computation resources; otherwise, Spark might try to optimize a computation graph that is really not useful to optimize.
You<!--((("commands", "sdf_checkpoint()")))--> can checkpoint explicitly by saving to CSV, Parquet, and other file formats. Or, let Spark checkpoint this for you by using `sdf_checkpoint()` in `sparklyr`, as follows:
```{r tuning-checkpointing}
# set checkpoint path
spark_set_checkpoint_dir(sc, getwd())
# checkpoint the iris dataset
iris_tbl %>% sdf_checkpoint()
```
Notice that checkpointing truncates the computation lineage graph, which can speed up performance if the same intermediate result is used multiple times.
```{r tuning-caching-disconnect, echo=FALSE}
spark_disconnect(sc)
```
### Memory {#tuning-memory}
Memory<!--((("caching", "memory")))((("memory")))((("Apache Spark", "memory categories in")))--> in Spark is categorized into _reserved_, _user_, _execution_, or _storage_:
Reserved
: Reserved memory is the memory Spark needs to function and therefore is overhead that is required and should not be configured. This value defaults to 300 MB.
User
: User memory is the memory used to execute custom code. `sparklyr` makes use of this memory only indirectly when executing `dplyr` expressions or modeling a dataset.
Execution
: Execution memory is used to execute code by Spark, mostly to process the results from the partition and perform shuffling.
Storage
: Storage memory is used to cache RDDs—for instance, when using `compute()` in `sparklyr`.
As part of tuning execution, you can consider tweaking the amount of memory allocated for user, execution, and storage by creating a Spark connection with different values than the defaults provided in Spark:
```{r tuning-memory-fraction}
config <- spark_config()
# define memory available for storage and execution
config$spark.memory.fraction <- 0.75
# define memory available for storage
config$spark.memory.storageFraction <- 0.5
```
For instance, if you want to use Spark to store large amounts of data in memory with the purpose of quickly filtering and retrieving subsets, you can expect Spark to use little execution or user memory. Therefore, to maximize storage memory, you can tune Spark as follows:
```{r tuning-memory-change}
config <- spark_config()
# define memory available for storage and execution
config$spark.memory.fraction <- 0.90
# define memory available for storage
config$spark.memory.storageFraction <- 0.90
```
However, note that Spark will borrow execution memory from storage and vice versa if needed and if possible; therefore, in practice, there should be little need to tune the memory settings.
## Shuffling {#tuning-shuffling}
Shuffling is<!--((("tuning", "shuffling")))((("shuffling")))((("event timeline")))((("timeline")))--> the operation that redistributes data across machines; it is usually expensive and therefore something you should try to minimize. You can easily identify whether significant time is being spent shuffling by looking at the [event timeline](#tuning-event-timeline). It is possible to reduce shuffling by reframing data analysis questions or hinting Spark appropriately.
This<!--((("DataFrames", "joining frames that differ in size")))--> would be relevant, for instance, when joining DataFrames that differ in size significantly; that is, one set is orders of magnitude smaller than the other one. You can consider using `sdf_broadcast()` to mark a DataFrame as small enough for use in broadcast joins, meaning it pushes one of the smaller DataFrames to each of the worker nodes to reduce shuffling the bigger DataFrame. Here's one example for `sdf_broadcast()`:
```{r tuning-shuffling-boardcast}
sdf_len(sc, 10000) %>%
sdf_broadcast() %>%
left_join(sdf_len(sc, 100))
```
## Serialization {#tuning-serialization}
Serialization<!--((("tuning", "serialization")))((("serialization", "tuning")))--> is the process of translating data and tasks into a format that can be transmitted between machines and reconstructed on the receiving end.
It is not that common to need to adjust serialization when tuning Spark; however, it is worth mentioning that there are alternative serialization modules like the [Kryo Serializer](https://oreil.ly/TRbNh) that can provide performance improvements over the default [Java Serializer](https://oreil.ly/0DMsd).
You can turn on the Kryo Serializer in `sparklyr` through the following:
```{r tuning-serialization-kryo}
config <- spark_config()
config$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
sc <- spark_connect(master = "local", config = config)
```
## Configuration Files
Configuring<!--((("configuration files")))((("tuning", "configuration files")))--> the `spark_config()` settings before connecting is the most common approach while tuning Spark. However, after you identify the parameters in your connection, you should consider switching to use a configuration file since it will remove the clutter in your connection code and also allow you to share the configuration settings across projects and coworkers.
For instance, instead of connecting to Spark like this:
```{r}
config <- spark_config()
config["sparklyr.shell.driver-memory"] <- "2G"
sc <- spark_connect(master = "local", config = config)
```
you can define a _config.yml_ file with the desired settings. This file should be located in the current working directory or in parent directories. For example, you can create the following _config.yml_ file to modify the default driver memory:
```{yml}
default:
sparklyr.shell.driver-memory: 2G
```
Then, connecting with the same configuration settings becomes much cleaner by using instead:
```{r}
sc <- spark_connect(master = "local")
```
You can also specify an alternate configuration filename or location by setting the `file` parameter in `spark_config()`. One additional benefit from using configuration files is that a system administrator can change the default configuration by changing the value of the `R_CONFIG_ACTIVE` environment variable. See the GitHub [rstudio/config](https://oreil.ly/74jIL) repo for additional information.
## Recap
This chapter provided a broad overview of Spark internals and detailed configuration settings to help you speed up computation and enable high computation loads. It provided the foundations to understand bottlenecks and guidance on common configuration considerations. However, fine-tuning Spark is a broad topic that would require many more chapters to cover extensively. Therefore, while troubleshooting Spark’s performance and scalability, searching the web, and consulting online communities, it is often necessary to fine-tune your particular environment as well.
[Chapter 10](#extensions) introduces the ecosystem of Spark extensions that are available in R. Most extensions are highly specialized, but they will prove to be extremely useful in specific cases and for readers with particular needs. For instance, they can process nested data, perform graph analysis, and use different modeling libraries like `rsparkling` from H20. In addition, the next few chapters introduce many advanced data analysis and modeling topics that are required to master large-scale computing in R.