Skip to content

Latest commit

 

History

History
232 lines (166 loc) · 6.17 KB

spark-sql-streaming-DataStreamReader.adoc

File metadata and controls

232 lines (166 loc) · 6.17 KB

DataStreamReader — Loading Data from Streaming Source

DataStreamReader is the interface to describe how data is loaded to a streaming Dataset from a streaming source.

Table 1. DataStreamReader’s Methods
Method Description

csv

csv(path: String): DataFrame

Sets csv as the format of the data source

format

format(source: String): DataStreamReader

Specifies the format of the data source

The format is used internally as the name (alias) of the streaming source to use to load the data

json

json(path: String): DataFrame

Sets json as the format of the data source

load

load(): DataFrame
load(path: String): DataFrame // (1)
  1. Explicit path (that could also be specified as an option)

"Loads" data as a streaming DataFrame (that is internally a logical plan with a StreamingRelationV2 or StreamingRelation leaf logical operators)

option

option(key: String, value: Boolean): DataStreamReader
option(key: String, value: Double): DataStreamReader
option(key: String, value: Long): DataStreamReader
option(key: String, value: String): DataStreamReader

Sets a loading option

options

options(options: Map[String, String]): DataStreamReader

Specifies the configuration options of a data source

Note
You could use option method if you prefer specifying the options one by one or there is only one in use.

orc

orc(path: String): DataFrame

Sets orc as the format of the data source

parquet

parquet(path: String): DataFrame

Sets parquet as the format of the data source

schema

schema(schema: StructType): DataStreamReader
schema(schemaString: String): DataStreamReader // (1)
  1. Uses a DDL-formatted table schema

Specifies the user-defined schema of the streaming data source (as a StructType or DDL-formatted table schema, e.g. a INT, b STRING)

text

text(path: String): DataFrame

Sets text as the format of the data source

textFile

textFile(path: String): Dataset[String]
DataStreamReader SparkSession StreamingRelation
Figure 1. DataStreamReader and The Others

DataStreamReader is used for a Spark developer to describe how Spark Structured Streaming loads datasets from a streaming source (that in the end creates a logical plan for a streaming query).

Note
DataStreamReader is the Spark developer-friendly API to create a StreamingRelation logical operator (that represents a streaming source in a logical plan).

You can access DataStreamReader using SparkSession.readStream method.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

val streamReader = spark.readStream

DataStreamReader supports many source formats natively and offers the interface to define custom formats:

Note
DataStreamReader assumes parquet file format by default that you can change using spark.sql.sources.default property.
Note
hive source format is not supported.

After you have described the streaming pipeline to read datasets from an external streaming data source, you eventually trigger the loading using format-agnostic load or format-specific (e.g. json, csv) operators.

Table 2. DataStreamReader’s Internal Properties (in alphabetical order)
Name Initial Value Description

source

spark.sql.sources.default property

Source format of datasets in a streaming data source

userSpecifiedSchema

(empty)

Optional user-defined schema

extraOptions

(empty)

Collection of key-value configuration options

Specifying Loading Options — option Method

option(key: String, value: String): DataStreamReader
option(key: String, value: Boolean): DataStreamReader
option(key: String, value: Long): DataStreamReader
option(key: String, value: Double): DataStreamReader

option family of methods specifies additional options to a streaming data source.

There is support for values of String, Boolean, Long, and Double types for user convenience, and internally are converted to String type.

Internally, option sets extraOptions internal property.

Note
You can also set options in bulk using options method. You have to do the type conversion yourself, though.

Creating Streaming Dataset (to Represent Loading Data From Streaming Source) — load Method

load(): DataFrame
load(path: String): DataFrame // (1)
  1. Specifies path option before passing the call to parameterless load()

load…​FIXME

Built-in Formats

json(path: String): DataFrame
csv(path: String): DataFrame
parquet(path: String): DataFrame
text(path: String): DataFrame
textFile(path: String): Dataset[String] // (1)
  1. Returns Dataset[String] not DataFrame

DataStreamReader can load streaming datasets from data sources of the following formats:

  • json

  • csv

  • parquet

  • text

The methods simply pass calls to format followed by load(path).