Skip to content

Commit

Permalink
feat: Add possibility to save dataset as table, when spark config has…
Browse files Browse the repository at this point in the history
… remote warehouse info (#3645)

feat: add possibility to save dataset as table, when spark config has remote warehouse info

Signed-off-by: nsuraeva <nsuraeva@neoflex.ru>
Co-authored-by: nsuraeva <nsuraeva@neoflex.ru>
  • Loading branch information
nadejdaSuraeva and nsuraeva authored Aug 14, 2023
1 parent ff199df commit 22c109b
Showing 1 changed file with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,36 @@ def persist(
):
"""
Run the retrieval and persist the results in the same offline store used for read.
Please note the persisting is done only within the scope of the spark session.
Please note the persisting is done only within the scope of the spark session for local warehouse directory.
"""
assert isinstance(storage, SavedDatasetSparkStorage)
table_name = storage.spark_options.table
if not table_name:
raise ValueError("Cannot persist, table_name is not defined")
self.to_spark_df().createOrReplaceTempView(table_name)
if self._has_remote_warehouse_in_config():
file_format = storage.spark_options.file_format
if not file_format:
self.to_spark_df().write.saveAsTable(table_name)
else:
self.to_spark_df().write.format(file_format).saveAsTable(table_name)
else:
self.to_spark_df().createOrReplaceTempView(table_name)

def _has_remote_warehouse_in_config(self) -> bool:
"""
Check if Spark Session config has info about hive metastore uri
or warehouse directory is not a local path
"""
self.spark_session.sparkContext.getConf().getAll()
try:
self.spark_session.conf.get("hive.metastore.uris")
return True
except Exception:
warehouse_dir = self.spark_session.conf.get("spark.sql.warehouse.dir")
if warehouse_dir and warehouse_dir.startswith("file:"):
return False
else:
return True

def supports_remote_storage_export(self) -> bool:
return self._config.offline_store.staging_location is not None
Expand Down

0 comments on commit 22c109b

Please sign in to comment.