Skip to content

Commit

Permalink
fix: Fix materialization when running on Spark cluster. (#3166)
Browse files Browse the repository at this point in the history
* Fix materialization when running on Spark cluster.

When running materialization and have Spark offline store configured to use cluster (`spark.master` pointing to actual Spark master node) `self.to_spark_df().write.parquet(temp_dir, mode="overwrite")` will create parquet file in worker node but `return pq.read_table(temp_dir)` is executed on driver node and it can't read from worker. Proposed fix makes materialization work when run on Spark cluster.

Signed-off-by: ckarwicki <104110169+ckarwicki-deloitte@users.noreply.github.com>
Signed-off-by: ckarwicki <71740096+ckarwicki@users.noreply.github.com>

* Fix linter.

Signed-off-by: ckarwicki <jdeveloper98@gmail.com>
Signed-off-by: ckarwicki <104110169+ckarwicki-deloitte@users.noreply.github.com>
Signed-off-by: ckarwicki <71740096+ckarwicki@users.noreply.github.com>

* Fix linter.

Signed-off-by: ckarwicki <jdeveloper98@gmail.com>
Signed-off-by: ckarwicki <104110169+ckarwicki-deloitte@users.noreply.github.com>
Signed-off-by: ckarwicki <71740096+ckarwicki@users.noreply.github.com>

Signed-off-by: ckarwicki <104110169+ckarwicki-deloitte@users.noreply.github.com>
Signed-off-by: ckarwicki <71740096+ckarwicki@users.noreply.github.com>
Signed-off-by: ckarwicki <jdeveloper98@gmail.com>
  • Loading branch information
ckarwicki authored Sep 9, 2022
1 parent 782c759 commit 175fd25
Showing 1 changed file with 1 addition and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,7 @@ def _to_df_internal(self) -> pd.DataFrame:

def _to_arrow_internal(self) -> pyarrow.Table:
"""Return dataset as pyarrow Table synchronously"""

# write to temp parquet and then load it as pyarrow table from disk
with tempfile.TemporaryDirectory() as temp_dir:
self.to_spark_df().write.parquet(temp_dir, mode="overwrite")
return pq.read_table(temp_dir)
return pyarrow.Table.from_pandas(self._to_df_internal())

def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
"""
Expand Down

0 comments on commit 175fd25

Please sign in to comment.