The SparkR package introduces a custom RPC method that allows calling arbitrary Java/Scala code within the Spark shell process from R. The SparkR package's higher level functions are in turn built upon this RPC layer.
SparkR provides one front-end from R to Spark, however the development of other front-end packages is desirable (e.g. a front-end that package that is compatible with dplyr, which SparkR is not due to it's masking of many of dplyr's functions). The sparklyr package is one example of an alternate front-end package for Spark.
The sparkapi package factors out the RPC protocol from SparkR, with the goal of making the core types used by Spark front-end packages inter-operable. The sparkapi package provides access to the SparkContext and HiveContext as well as enables calling the full Spark Scala API.
There are two ways to use the sparkapi package:
-
Standalone. Establish a connection to Spark and call the Spark API. This mode provides a fairly low-level interface to Spark (just invoking arbitrary methods of the Spark API) and would typically be used by packages building full front-end interfaces to Spark (e.g. for data frame manipulation or distributed computation).
-
Extension. Write an extension for a front-end package. This would be desirable when you want to leverage the capabilities of a front-end package for e.g. data manipulation, then provide functions that take the results of that manipulation (e.g. a Spark DataFrame object) and perform further processing or analysis.
The sparklyr package currently supports extensions written with the sparkapi package. We are hopeful that the SparkR package will also support extensions at some point soon.
To use the sparkapi package in standalone mode, you establish a connection to Spark using the start_shell function, then access the SparkContext, HiveContext, or any other part of the Spark API as necessary. Here's an example of connecting to Spark and calling the text file line counting function available via the SparkContext:
library(sparkapi)
# connect to spark shell
sc <- start_shell(master = "local", spark_home = "/opt/spark/spark-1.6.2-bin-hadoop2.6")
# implement a function which counts the lines of a text file
count_lines <- function(sc, file) {
spark_context(sc) %>%
invoke("textFile", file, 1L) %>%
invoke("count")
}
# call the function
count_lines(sc, "hdfs://path/data.csv")
# disconnect from the shell
stop_shell(sc)
Note that you can either explicitly pass the path to spark_home
as shown above or alternatively rely on the default behavior (reading the SPARK_HOME
environment variable).
You can use the following functions for initiating and terminating connections to Spark in standalone mode:
Function | Description |
---|---|
start_shell | Start an instance of the Spark shell and establish a connection to it. |
stop_shell | Stop the Spark shell associated with a connection. |
Spark extension packages are add-on R packages that implement R interfaces for Spark services.
Extension packages consist of R functions which don't themselves connect directly to Spark, but rather depend on a connection already made by a front-end package like sparklyr. We can take the same count_lines
function defined above and use it with a connection established via sparklyr. For example:
library(sparklyr)
# connect to spark
sc <- spark_connect(master = "local")
# call the function we defined above
count_lines(sc, "hdfs://path/data.csv")
# disconnect from spark
spark_disconnect(sc)
You'd typically create an extension package in cases where you wanted your users to take advantage of sparklyr's functions for accessing, filtering, and manipulating Spark data frames, then pass the result of those transformations to your extension functions.
The following sections describe the core mechanism uses to invoke the Spark API from within R. Following that, some simple examples of R functions that might be included in an extension package are provided.
Here are links to some additional examples of extension packages:
Package | Description |
---|---|
spark.sas7bdat |
Read in SAS data in parallel into Apache Spark. |
sparklingwater |
Demo / proof-of-concept extension for using the Sparking Water package from H2O. |
The sparkapi package defines 3 classes for representing the fundamental types of the R to Java bridge:
Function | Description |
---|---|
spark_connection | Connection between R and the Spark shell process |
spark_jobj | Instance of a remote Spark object |
spark_dataframe | Instance of a remote Spark DataFrame object |
S3 methods are defined for each of these classes so they can be easily converted to / from objects that contain or wrap them. Note that for any given spark_jobj
it's possible to discover the underlying spark_connection
.
There are several functions available for calling the methods of Java objects and static methods of Java classes:
Function | Description |
---|---|
invoke | Call a method on an object |
invoke_new | Create a new object by invoking a constructor |
invoke_static | Call a static method on an object |
For example, to create a new instance of the java.math.BigInteger
class and then call the longValue()
method on it you would use code like this:
billionBigInteger <- invoke_new(sc, "java.math.BigInteger", "1000000000")
billion <- invoke(billionBigInteger, "longValue")
Note the sc
argument: that's the spark_connection
object which is provided by either a call to spark_shell
or by a front-end package like sparklyr.
The previous example can be re-written to be more compact and clear using magrittr pipes:
billion <- sc %>%
invoke_new("java.math.BigInteger", "1000000000") %>%
invoke("longValue")
This syntax is similar to the method-chaining syntax often used with Scala code so is generally preferred.
Calling a static method of a class is also straightforward. For example, to call the Math::hypot()
static function you would use this code:
hypot <- sc %>%
invoke_static("java.lang.Math", "hypot", 10, 20)
Creating an extension typically consists of writing R wrapper functions for a set of Spark services. In this section we'll describe the typical form of these functions as well as how to handle special types like Spark DataFrames.
Let's take another look at the text file line counting function we created earlier:
count_lines <- function(sc, file) {
spark_context(sc) %>%
invoke("textFile", file, 1L) %>%
invoke("count")
}
The count_lines
function takes a spark_connection
(sc
) argument which enables it to obtain a reference to the SparkContext
object.
The following functions are useful for implementing wrapper functions of various kinds:
Function | Description |
---|---|
spark_connection | Get the Spark connection associated with an object (S3) |
spark_jobj | Get the Spark jobj associated with an object (S3) |
spark_dataframe | Get the Spark DataFrame associated with an object (S3) |
spark_context | Get the SparkContext for a spark_connection |
hive_context | Get the HiveContext for a spark_connection |
spark_version | Get the version of Spark (as a numeric_version ) for a spark_connection |
The use of these functions is illustrated in this simple example:
analyze <- function(x, features) {
# normalize whatever we were passed (e.g. a dplyr tbl) into a DataFrame
df <- spark_dataframe(x)
# get the underlying connection so we can create new objects
sc <- spark_connection(df)
# create an object to do the analysis and call its `analyze` and `summary`
# methods (note that the df and features are passed to the analyze function)
summary <- sc %>%
invoke_new("com.example.tools.Analyzer") %>%
invoke("analyze", df, features) %>%
invoke("summary")
# return the results
summary
}
The first argument is an object that can be accessed using the Spark DataFrame API (this might be an actual reference to a DataFrame or could rather be a dplyr tbl
which has a DataFrame reference inside it).
After using the spark_dataframe
function to normalize the reference, we extract the underlying Spark connection associated with the data frame using the spark_connection
function. Finally, we create a new Analyzer
object, call it's analyze
method with the DataFrame and list of features, and then call the summary
method on the results of the analysis.
Accepting a spark_jobj
or spark_dataframe
as the first argument of a function makes it very easy to incorporate into magrittr pipelines so this pattern is highly recommended when possible.
When creating R packages which implement interfaces to Spark you may need to include additional dependencies. Your dependencies might be a set of Spark Packages or might be a custom JAR file. In either case, you'll need a way to specify that these dependencies should be included during the initialization of a Spark session. A Spark dependency is defined using the spark_dependency
function:
Function | Description |
---|---|
spark_dependency | Define a Spark dependency consisting of JAR files and Spark packages |
Your extension package can specify it's dependencies by implementing a function named spark_dependencies
within the package (this function should not be publicly exported). For example, let's say you were creating an extension package named sparkds that needs to include a custom JAR as well as the Redshift and Apache Avro packages:
spark_dependencies <- function(spark_version, scala_version, ...) {
spark_dependency(
jars = c(
system.file(sprintf("java/sparkds_%s_%s.jar", spark_version, scala_version),
package = "sparkds")
),
packages = c(
sprintf("com.databricks:spark-redshift_%s:0.6.0", scala_version),
sprintf("com.databricks:spark-avro_%s:2.0.1", scala_version)
)
)
}
.onLoad <- function(libname, pkgname) {
sparkapi::register_extension(pkgname)
}
The spark_version
argument is provided so that a package can support multiple Spark versions for it's JARs. Note that the argument will include just the major and minor versions (e.g. 1.6
or 2.0
) and will not include the patch level (as JARs built for a given major/minor version are expected to work for all patch levels).
The scala_version
argument is provided so that a single package can support multiple Scala compiler versions for it's JARs and packages (currently Scala downloadable binaries are compiled with Scala 2.10 but at some point Scala 2.11 will also be supported).
The ...
argument is unused but nevertheless should be included to ensure compatibility if new arguments are added to spark_dependencies
in the future.
The .onLoad
function registers your extension package so that it's spark_dependencies
function will be automatically called when new connections to Spark are made via start_shell
or spark_connect
:
library(sparkapi)
library(sparkds)
sc <- start_shell(master = "local")