From b2d042a653e1df1c1af501eb5e5adc8ccfdb1ff8 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 13 Feb 2024 13:00:05 +0100 Subject: [PATCH] use public name and fix deprecation warning --- dask_deltatable/core.py | 3 ++- dask_deltatable/write.py | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index cc41106..3efcc66 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -94,7 +94,7 @@ def _read_from_filesystem( table_uri=path, version=version, storage_options=delta_storage_options ) if datetime is not None: - dt.load_with_datetime(datetime) + dt.load_as_version(datetime) schema = dt.schema().to_pyarrow() @@ -128,6 +128,7 @@ def _read_from_catalog( session = Session() credentials = session.get_credentials() + assert credentials is not None current_credentials = credentials.get_frozen_credentials() os.environ["AWS_ACCESS_KEY_ID"] = current_credentials.access_key os.environ["AWS_SECRET_ACCESS_KEY"] = current_credentials.secret_key diff --git a/dask_deltatable/write.py b/dask_deltatable/write.py index 6909183..75eca45 100644 --- a/dask_deltatable/write.py +++ b/dask_deltatable/write.py @@ -26,8 +26,8 @@ get_file_stats_from_metadata, get_partitions_from_path, try_get_table_and_table_uri, + write_deltalake_pyarrow, ) -from deltalake._internal import write_new_deltalake from toolz.itertoolz import pluck from ._schema import pyarrow_to_deltalake, validate_compatible @@ -54,6 +54,7 @@ def to_deltalake( storage_options: dict[str, str] | None = None, partition_filters: list[tuple[str, str, Any]] | None = None, compute: bool = True, + custom_metadata: dict[str, str] | None = None, ): """Write a given dask.DataFrame to a delta table. The returned value is a Dask Scalar, and the writing operation is only triggered when calling ``.compute()`` @@ -216,9 +217,10 @@ def to_deltalake( configuration, storage_options, partition_filters, + custom_metadata, ) } - graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=(written,)) + graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=(written,)) # type: ignore result = Scalar(graph, final_name, "") if compute: result = result.compute() @@ -237,6 +239,7 @@ def _commit( configuration, storage_options, partition_filters, + custom_metadata, ): schemas = list(flatten(pluck(0, schemas_add_actions_nested))) add_actions = list(flatten(pluck(1, schemas_add_actions_nested))) @@ -250,7 +253,7 @@ def _commit( schema = validate_compatible(schemas) assert schema if table is None: - _write_new_deltalake( + write_deltalake_pyarrow( table_uri, schema, add_actions, @@ -260,6 +263,7 @@ def _commit( description, configuration, storage_options, + custom_metadata, ) else: table._table.create_write_transaction(