Skip to content

Commit

Permalink
add methods get_delta_table to return a table from UC as a DeltaTable…
Browse files Browse the repository at this point in the history
…, sync_delta_properties to keep UC table properties in sync with all delta properties of a Delta table
  • Loading branch information
VillePuuska authored Sep 20, 2024
1 parent 562c15b commit cae9a5a
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 1 deletion.
100 changes: 100 additions & 0 deletions tests/test_api_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import tempfile
import pytest
import polars as pl
from typing import Callable
from uchelper import (
UCClient,
Catalog,
Expand Down Expand Up @@ -487,6 +489,104 @@ def test_overwrite_table(client: UCClient):
assert_table_matches(client, new_table)


def test_sync_delta_properties(
client: UCClient,
random_df: Callable[[], pl.DataFrame],
):
assert client.health_check()

default_catalog = "unity"
default_schema = "default"
table_name = "test_table"

with tempfile.TemporaryDirectory() as tmpdir:
df = random_df()

client.create_as_table(
df=df,
catalog=default_catalog,
schema=default_schema,
name=table_name,
file_type="delta",
table_type="external",
location="file://" + tmpdir,
)

assert (
client.get_table(
catalog=default_catalog,
schema=default_schema,
table=table_name,
).properties
== {}
)

dt = client.get_delta_table(
catalog=default_catalog,
schema=default_schema,
name=table_name,
)
dt.alter.add_constraint(constraints={"id_positive": "id > 0"})

client.sync_delta_properties(
catalog=default_catalog,
schema=default_schema,
name=table_name,
)

table = client.get_table(
catalog=default_catalog,
schema=default_schema,
table=table_name,
)
assert table.properties is not None
assert len(table.properties) == 1
props_list = [(k, v) for k, v in table.properties.items()]
assert props_list[0][0].startswith("delta.")
assert props_list[0][0].endswith("id_positive")
assert props_list[0][1] == "id > 0"

table.properties["asd"] = "foo"
client.update_table(catalog=default_catalog, schema=default_schema, table=table)
client.sync_delta_properties(
catalog=default_catalog,
schema=default_schema,
name=table_name,
)

table = client.get_table(
catalog=default_catalog,
schema=default_schema,
table=table_name,
)
assert table.properties is not None
assert len(table.properties) == 2
props_list = [(k, v) for k, v in table.properties.items()]
assert (
props_list[0][0].startswith("delta.")
and props_list[0][0].endswith("id_positive")
and props_list[0][1] == "id > 0"
) or (
props_list[1][0].startswith("delta.")
and props_list[1][0].endswith("id_positive")
and props_list[1][1] == "id > 0"
)

dt.alter.drop_constraint(name="id_positive")
client.sync_delta_properties(
catalog=default_catalog,
schema=default_schema,
name=table_name,
)

table = client.get_table(
catalog=default_catalog,
schema=default_schema,
table=table_name,
)
assert table.properties == {"asd": "foo"}


def assert_table_matches(client: UCClient, default_table: Table):
table = client.get_table(
catalog=default_table.catalog_name,
Expand Down
28 changes: 28 additions & 0 deletions uchelper/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import polars as pl
import duckdb
from typing import Literal
from deltalake import DeltaTable
from deltalake.table import TableMerger
from .exceptions import UnsupportedOperationError, DuckDBConnectionSetupError
from .models import Catalog, Schema, Table, TableType, FileType
Expand Down Expand Up @@ -33,6 +34,7 @@
overwrite_table,
update_table,
set_table_default_merge_columns,
sync_delta_properties,
)
from .utils import (
literal_to_filetype,
Expand Down Expand Up @@ -317,6 +319,32 @@ def scan_table(self, catalog: str, schema: str, name: str) -> pl.LazyFrame:
table = self.get_table(catalog=catalog, schema=schema, table=name)
return scan_table(table=table)

def get_delta_table(self, catalog: str, schema: str, name: str) -> DeltaTable:
"""
Returns the specified table from Unity Catalog as a `DeltaTable`.
"""
table = self.get_table(catalog=catalog, schema=schema, table=name)
if table.file_type != FileType.DELTA:
raise UnsupportedOperationError(
"Can't return a DeltaTable since the table is not DELTA."
)
assert table.storage_location is not None
return DeltaTable(table_uri=table.storage_location)

def sync_delta_properties(self, catalog: str, schema: str, name: str) -> Table:
"""
Syncs the properties of the underlying Delta table with Unity Catalog.
These are the properties starting with 'delta.'
"""
table = self.get_table(catalog=catalog, schema=schema, table=name)
return sync_delta_properties(
session=self.session,
uc_url=self.uc_url,
catalog=catalog,
schema=schema,
table=table,
)

def write_table(
self,
df: pl.DataFrame,
Expand Down
25 changes: 24 additions & 1 deletion uchelper/uc_api_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Functions for handling calling Unity Catalog REST API endpoints and parsing results.
"""

from .exceptions import AlreadyExistsError, DoesNotExistError
from .exceptions import AlreadyExistsError, DoesNotExistError, UnsupportedOperationError
from .models import *
from deltalake import DeltaTable
import requests
import json

Expand Down Expand Up @@ -547,3 +548,25 @@ def set_table_default_merge_columns(
schema=schema,
table=existing_table,
)


def sync_delta_properties(
session: requests.Session,
uc_url: str,
catalog: str,
schema: str,
table: Table,
) -> Table:
if table.file_type != FileType.DELTA:
raise UnsupportedOperationError("The table is not DELTA.")
assert table.storage_location is not None
if table.properties is None:
table.properties = {}
dt = DeltaTable(table_uri=table.storage_location)
table.properties = {
k: v for k, v in table.properties.items() if not k.startswith("delta.")
}
table.properties.update(dt.metadata().configuration)
return update_table(
session=session, uc_url=uc_url, catalog=catalog, schema=schema, table=table
)

0 comments on commit cae9a5a

Please sign in to comment.