-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(datasets): add SparkStreamingDataSet
#198
Changes from all commits
46bb394
c9421ae
63f578a
4b387ff
39ad9fd
69eb8be
3106068
b8141a7
738625e
a54cc67
b924ad6
743b823
3bb3717
ae3bc87
eb634a1
115940b
35231af
8c2ea1b
a73b216
7f4527d
57a11d6
11c3888
634d884
9e8f55c
dbdf19c
4e49fd9
1a7a477
e877944
09e9cf2
2e30ec0
6147636
29376e9
42ed37a
d93d9b9
5b83444
5b0630e
1433808
c7778b5
7341429
d8d3bc2
be4a3e5
d3bc0d2
e39c639
66440f4
0ed5b90
04c623b
a76f944
30b002d
9bef3a2
0bb5fe1
e0ebe27
5bb5766
2075781
e8ea0d3
f08dd09
24bb527
437e77e
a3fdbf6
9d60f25
ced007d
0b88324
ed26aad
170b092
e63a53a
d986c75
88e6ee4
64232fa
8a61b41
37e66e8
07032a8
2470de1
c4e0f4e
7e3555e
6a0029d
e8f6696
9a5ebad
eacdd46
68b6e1b
5b2a479
b94f211
9381816
373e166
f033b95
3fdb71c
b08aa6f
148b464
be2431c
74a211f
9d7820a
36de4b9
870e623
9d66cc8
0aaa922
ccec03b
c2a7128
64446dc
497001d
7f25f3c
bd88b99
c094db1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# Spark Streaming | ||
|
||
``SparkStreamingDataSet`` loads and saves data to streaming DataFrames. | ||
See [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) for details. | ||
|
||
To work with multiple streaming nodes, 2 hooks are required for: | ||
- Integrating Pyspark, see [Build a Kedro pipeline with PySpark](https://docs.kedro.org/en/stable/integrations/pyspark_integration.html) for details | ||
- Running streaming query without termination unless exception | ||
|
||
#### Supported file formats | ||
|
||
Supported file formats are: | ||
|
||
- Text | ||
- CSV | ||
- JSON | ||
- ORC | ||
- Parquet | ||
|
||
#### Example SparkStreamsHook: | ||
|
||
```python | ||
from kedro.framework.hooks import hook_impl | ||
from pyspark.sql import SparkSession | ||
|
||
class SparkStreamsHook: | ||
@hook_impl | ||
def after_pipeline_run(self) -> None: | ||
"""Starts a spark streaming await session | ||
once the pipeline reaches the last node | ||
""" | ||
|
||
spark = SparkSession.builder.getOrCreate() | ||
spark.streams.awaitAnyTermination() | ||
``` | ||
To make the application work with Kafka format, the respective spark configuration needs to be added to``conf/base/spark.yml``. | ||
|
||
#### Example spark.yml: | ||
|
||
```yaml | ||
spark.driver.maxResultSize: 3g | ||
spark.scheduler.mode: FAIR | ||
|
||
``` |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,155 @@ | ||||||
"""SparkStreamingDataSet to load and save a PySpark Streaming DataFrame.""" | ||||||
from copy import deepcopy | ||||||
from pathlib import PurePosixPath | ||||||
from typing import Any, Dict | ||||||
|
||||||
from kedro.io.core import AbstractDataSet | ||||||
from pyspark.sql import DataFrame, SparkSession | ||||||
from pyspark.sql.utils import AnalysisException | ||||||
|
||||||
from kedro_datasets.spark.spark_dataset import ( | ||||||
SparkDataSet, | ||||||
_split_filepath, | ||||||
_strip_dbfs_prefix, | ||||||
) | ||||||
|
||||||
|
||||||
class SparkStreamingDataSet(AbstractDataSet): | ||||||
"""``SparkStreamingDataSet`` loads data into Spark Streaming Dataframe objects. | ||||||
Example usage for the | ||||||
`YAML API <https://kedro.readthedocs.io/en/stable/data/\ | ||||||
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_: | ||||||
.. code-block:: yaml | ||||||
raw.new_inventory: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
type: streaming.extras.datasets.spark_streaming_dataset.SparkStreamingDataSet | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
filepath: data/01_raw/stream/inventory/ | ||||||
file_format: json | ||||||
save_args: | ||||||
output_mode: append | ||||||
checkpoint: data/04_checkpoint/raw_new_inventory | ||||||
header: True | ||||||
load_args: | ||||||
schema: | ||||||
filepath: data/01_raw/schema/inventory_schema.json | ||||||
""" | ||||||
|
||||||
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any] | ||||||
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any] | ||||||
|
||||||
def __init__( | ||||||
self, | ||||||
filepath: str = "", | ||||||
file_format: str = "", | ||||||
save_args: Dict[str, Any] = None, | ||||||
load_args: Dict[str, Any] = None, | ||||||
) -> None: | ||||||
"""Creates a new instance of SparkStreamingDataSet. | ||||||
Args: | ||||||
filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks | ||||||
specify ``filepath``s starting with ``/dbfs/``. For message brokers such as | ||||||
Kafka and all filepath is not required. | ||||||
file_format: File format used during load and save | ||||||
operations. These are formats supported by the running | ||||||
SparkContext include parquet, csv, delta. For a list of supported | ||||||
formats please refer to Apache Spark documentation at | ||||||
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html | ||||||
load_args: Load args passed to Spark DataFrameReader load method. | ||||||
It is dependent on the selected file format. You can find | ||||||
a list of read options for each supported format | ||||||
in Spark DataFrame read documentation: | ||||||
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html, | ||||||
Please note that a schema is mandatory for a streaming DataFrame | ||||||
if ``schemaInference`` is not True. | ||||||
save_args: Save args passed to Spark DataFrame write options. | ||||||
Similar to load_args this is dependent on the selected file | ||||||
format. You can pass ``mode`` and ``partitionBy`` to specify | ||||||
your overwrite mode and partitioning respectively. You can find | ||||||
a list of options for each format in Spark DataFrame | ||||||
write documentation: | ||||||
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html | ||||||
""" | ||||||
self._file_format = file_format | ||||||
self._save_args = save_args | ||||||
self._load_args = load_args | ||||||
|
||||||
fs_prefix, filepath = _split_filepath(filepath) | ||||||
|
||||||
self._fs_prefix = fs_prefix | ||||||
self._filepath = PurePosixPath(filepath) | ||||||
|
||||||
# Handle default load and save arguments | ||||||
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) | ||||||
if load_args is not None: | ||||||
self._load_args.update(load_args) | ||||||
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) | ||||||
if save_args is not None: | ||||||
self._save_args.update(save_args) | ||||||
|
||||||
# Handle schema load argument | ||||||
self._schema = self._load_args.pop("schema", None) | ||||||
if self._schema is not None: | ||||||
if isinstance(self._schema, dict): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: wonder why this is a nested if statement rather than using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I missed these comment - maybe it should just inherit the SparkDataSet class? #135 I think in general we need to look at all SparkDataSet, many of it is weird but it's quite tricky to remove the code. the path handling is particular confusing because it's unique for Spark. @deepyaman |
||||||
self._schema = SparkDataSet._load_schema_from_file(self._schema) | ||||||
Comment on lines
+90
to
+92
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to have an empty schema at all given InferSchema is not enabled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Empty schema will throw error as by default structured streaming requires schema There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that stream jobs will run initially with empty schema, but when it is killed and restarted schema mismatch error is thrown(not always, mostly when dealing with timestamp cols and all). Schema inference file/struct is enforced to prevent this issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @noklam is it OK from a design perspective that SparkStreamingDataSet uses a private method of SparkDataSet? Feels a bit off to me, but perhaps no clear issues if their requirements are the same. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fine with requiring schema; schema concept is more critical in streaming anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @deepyaman the requirements are same as in the case of SparkDataSet, need to make it mandatory |
||||||
|
||||||
def _describe(self) -> Dict[str, Any]: | ||||||
"""Returns a dict that describes attributes of the dataset.""" | ||||||
return { | ||||||
"filepath": self._fs_prefix + str(self._filepath), | ||||||
"file_format": self._file_format, | ||||||
"load_args": self._load_args, | ||||||
"save_args": self._save_args, | ||||||
} | ||||||
|
||||||
@staticmethod | ||||||
def _get_spark(): | ||||||
return SparkSession.builder.getOrCreate() | ||||||
|
||||||
def _load(self) -> DataFrame: | ||||||
"""Loads data from filepath. | ||||||
If the connector type is kafka then no file_path is required, schema needs to be | ||||||
seperated from load_args. | ||||||
Returns: | ||||||
Data from filepath as pyspark dataframe. | ||||||
""" | ||||||
load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath)) | ||||||
data_stream_reader = ( | ||||||
self._get_spark() | ||||||
.readStream.schema(self._schema) | ||||||
.format(self._file_format) | ||||||
.options(**self._load_args) | ||||||
) | ||||||
return data_stream_reader.load(load_path) | ||||||
|
||||||
def _save(self, data: DataFrame) -> None: | ||||||
"""Saves pyspark dataframe. | ||||||
Args: | ||||||
data: PySpark streaming dataframe for saving | ||||||
""" | ||||||
save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath)) | ||||||
output_constructor = data.writeStream.format(self._file_format) | ||||||
|
||||||
( | ||||||
output_constructor.option( | ||||||
"checkpointLocation", self._save_args.pop("checkpoint") | ||||||
) | ||||||
.option("path", save_path) | ||||||
.outputMode(self._save_args.pop("output_mode")) | ||||||
.options(**self._save_args) | ||||||
.start() | ||||||
) | ||||||
|
||||||
def _exists(self) -> bool: | ||||||
load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath)) | ||||||
|
||||||
try: | ||||||
self._get_spark().readStream.schema(self._schema).load( | ||||||
load_path, self._file_format | ||||||
) | ||||||
except AnalysisException as exception: | ||||||
if ( | ||||||
exception.desc.startswith("Path does not exist:") | ||||||
or "is not a Streaming data" in exception.desc | ||||||
): | ||||||
return False | ||||||
raise | ||||||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really like how concise is this file here!