Skip to content

Commit

Permalink
use public name and fix deprecation warning
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Feb 13, 2024
1 parent 3cc7678 commit b2d042a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
3 changes: 2 additions & 1 deletion dask_deltatable/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _read_from_filesystem(
table_uri=path, version=version, storage_options=delta_storage_options
)
if datetime is not None:
dt.load_with_datetime(datetime)
dt.load_as_version(datetime)

schema = dt.schema().to_pyarrow()

Expand Down Expand Up @@ -128,6 +128,7 @@ def _read_from_catalog(

session = Session()
credentials = session.get_credentials()
assert credentials is not None
current_credentials = credentials.get_frozen_credentials()
os.environ["AWS_ACCESS_KEY_ID"] = current_credentials.access_key
os.environ["AWS_SECRET_ACCESS_KEY"] = current_credentials.secret_key
Expand Down
10 changes: 7 additions & 3 deletions dask_deltatable/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
get_file_stats_from_metadata,
get_partitions_from_path,
try_get_table_and_table_uri,
write_deltalake_pyarrow,
)
from deltalake._internal import write_new_deltalake
from toolz.itertoolz import pluck

from ._schema import pyarrow_to_deltalake, validate_compatible
Expand All @@ -54,6 +54,7 @@ def to_deltalake(
storage_options: dict[str, str] | None = None,
partition_filters: list[tuple[str, str, Any]] | None = None,
compute: bool = True,
custom_metadata: dict[str, str] | None = None,
):
"""Write a given dask.DataFrame to a delta table. The returned value is a Dask Scalar,
and the writing operation is only triggered when calling ``.compute()``
Expand Down Expand Up @@ -216,9 +217,10 @@ def to_deltalake(
configuration,
storage_options,
partition_filters,
custom_metadata,
)
}
graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=(written,))
graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=(written,)) # type: ignore
result = Scalar(graph, final_name, "")
if compute:
result = result.compute()
Expand All @@ -237,6 +239,7 @@ def _commit(
configuration,
storage_options,
partition_filters,
custom_metadata,
):
schemas = list(flatten(pluck(0, schemas_add_actions_nested)))
add_actions = list(flatten(pluck(1, schemas_add_actions_nested)))
Expand All @@ -250,7 +253,7 @@ def _commit(
schema = validate_compatible(schemas)
assert schema
if table is None:
_write_new_deltalake(
write_deltalake_pyarrow(
table_uri,
schema,
add_actions,
Expand All @@ -260,6 +263,7 @@ def _commit(
description,
configuration,
storage_options,
custom_metadata,
)
else:
table._table.create_write_transaction(
Expand Down

0 comments on commit b2d042a

Please sign in to comment.