Skip to content

Commit

Permalink
feat: implement read_delta and to_delta for some backends
Browse files Browse the repository at this point in the history
save

cleanup; add read_delta for duckdb

chore(deps): add `deltalake` dependency and extra

test(delta): add round trip deltalake format test

test: skip if `deltalake` missing

ci: add deltalake extra to polars and duckdb jobs

Apply suggestions from code review

Co-authored-by: Phillip Cloud <417981+cpcloud@users.noreply.github.com>

test: hit the top-level API in roundtrip test

docs(backends): fix typo in pip install command

fix(docs): typo in code without selectors

docstrings and try/catch deltalake import

fix lint

black; poetry stuff
  • Loading branch information
lostmygithubaccount authored and cpcloud committed Jun 5, 2023
1 parent e6fb703 commit 2841753
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:
title: DuckDB
extras:
- duckdb
- deltalake
- name: pandas
title: Pandas
extras:
Expand All @@ -75,6 +76,7 @@ jobs:
title: Polars
extras:
- polars
- deltalake
- name: mysql
title: MySQL
services:
Expand Down
39 changes: 39 additions & 0 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,45 @@ def to_csv(
for batch in batch_reader:
writer.write_batch(batch)

@util.experimental
def to_delta(
self,
expr: ir.Table,
path: str | Path,
*,
params: Mapping[ir.Scalar, Any] | None = None,
**kwargs: Any,
) -> None:
"""Write the results of executing the given expression to a Delta Lake table.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
expr
The ibis expression to execute and persist to Delta Lake table.
path
The data source. A string or Path to the Delta Lake table.
params
Mapping of scalar parameter expressions to value.
kwargs
Additional keyword arguments passed to deltalake.writer.write_deltalake method
"""
try:
from deltalake.writer import write_deltalake
except ImportError:
raise ImportError(
"The deltalake extra is required to use the "
"to_delta method. You can install it using pip:\n\n"
"pip install ibis-framework[deltalake]\n"
)

batch_reader = expr.to_pyarrow_batches(params=params)

write_deltalake(path, batch_reader, **kwargs)


class BaseBackend(abc.ABC, _FileIOHandler):
"""Base backend class.
Expand Down
44 changes: 44 additions & 0 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,50 @@ def read_in_memory(

return self.table(table_name)

def read_delta(
self,
source_table: str,
table_name: str | None = None,
**kwargs: Any,
) -> ir.Table:
"""Register a Delta Lake table as a table in the current database.
Parameters
----------
source_table
The data source. Must be a directory
containing a Delta Lake table.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
kwargs
Additional keyword arguments passed to DuckDB loading function.
Additional keyword arguments passed to deltalake.DeltaTable.
Returns
-------
ir.Table
The just-registered table.
"""
source_table = normalize_filenames(source_table)[0]

table_name = table_name or util.gen_name("read_delta")

try:
from deltalake import DeltaTable
except ImportError:
raise ImportError(
"The deltalake extra is required to use the "
"read_delta method. You can install it using pip:\n\n"
"pip install ibis-framework[deltalake]\n"
)

delta_table = DeltaTable(source_table, **kwargs)

return self.read_in_memory(
delta_table.to_pyarrow_dataset(), table_name=table_name
)

def list_tables(self, like=None, database=None):
tables = self.inspector.get_table_names(schema=database)
views = self.inspector.get_view_names(schema=database)
Expand Down
35 changes: 35 additions & 0 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,41 @@ def read_parquet(
self._add_table(table_name, pl.scan_parquet(path, **kwargs))
return self.table(table_name)

def read_delta(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
"""Register a Delta Lake as a table in the current database.
Parameters
----------
path
The data source(s). Path to a Delta Lake table directory.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to Polars loading function.
See https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.scan_delta.html
for more information.
Returns
-------
ir.Table
The just-registered table
"""
try:
import deltalake # noqa: F401
except ImportError:
raise ImportError(
"The deltalake extra is required to use the "
"read_delta method. You can install it using pip:\n\n"
"pip install ibis-framework[polars,deltalake]\n"
)
path = normalize_filename(path)
table_name = table_name or gen_name("read_delta")
self._add_table(table_name, pl.scan_delta(path, **kwargs))
return self.table(table_name)

def database(self, name=None):
return Database(name, self)

Expand Down
43 changes: 43 additions & 0 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import pandas as pd
import pandas.testing as tm
import pytest
import sqlalchemy as sa
from packaging.version import parse as vparse
from pytest import param

import ibis
import ibis.expr.datatypes as dt
from ibis import selectors as s
from ibis import util

pa = pytest.importorskip("pyarrow")
Expand Down Expand Up @@ -313,3 +315,44 @@ def test_to_pyarrow_decimal(backend, dtype, pyarrow_dtype):
)
assert len(result) == 1
assert isinstance(result.type, pyarrow_dtype)


@pytest.mark.notyet(
[
"bigquery",
"clickhouse",
"dask",
"datafusion",
"impala",
"mssql",
"mysql",
"oracle",
"pandas",
"postgres",
"pyspark",
"snowflake",
"sqlite",
"trino",
],
raises=AttributeError,
reason="read_delta not yet implemented",
)
@pytest.mark.notyet(
["druid"],
raises=pa.lib.ArrowTypeError,
reason="arrow type conversion fails in `to_delta` call",
)
def test_roundtrip_delta(con, alltypes, tmp_path, monkeypatch):
pytest.importorskip("deltalake")

# delta can't handle nanosecond timestamp columns it seems
t = alltypes.drop(s.c("__time", "timestamp_col")).head()
expected = t.execute()
path = tmp_path / "test.delta"
t.to_delta(path)

monkeypatch.setattr(ibis.options, "default_backend", con)
dt = ibis.read_delta(path)
result = dt.to_pandas()

tm.assert_frame_equal(result, expected)
27 changes: 27 additions & 0 deletions ibis/expr/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
'random',
'range_window',
'read_csv',
'read_delta',
'read_json',
'read_parquet',
'row_number',
Expand Down Expand Up @@ -936,6 +937,32 @@ def read_parquet(sources: str | Path | Sequence[str | Path], **kwargs: Any) -> i
return con.read_parquet(sources, **kwargs)


def read_delta(source: str | Path, **kwargs: Any) -> ir.Table:
"""Lazily load a Delta Lake table.
Parameters
----------
source
A filesystem path or URL.
kwargs
Backend-specific keyword arguments for the file type.
Returns
-------
ir.Table
Table expression representing a file
Examples
--------
>>> import ibis
>>> t = ibis.read_delta("path/to/delta") # doctest: +SKIP
"""
from ibis.config import _default_backend

con = _default_backend()
return con.read_delta(source, **kwargs)


def set_backend(backend: str | BaseBackend) -> None:
"""Set the default Ibis backend.
Expand Down
26 changes: 26 additions & 0 deletions ibis/expr/types/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,32 @@ def to_csv(
"""
self._find_backend(use_default=True).to_csv(self, path, **kwargs)

@experimental
def to_delta(
self,
path: str | Path,
*,
params: Mapping[ir.Scalar, Any] | None = None,
**kwargs: Any,
) -> None:
"""Write the results of executing the given expression to a Delta Lake table
This method is eager and will execute the associated expression
immediately.
Parameters
----------
path
The data source. A string or Path to the Delta Lake table directory.
params
Mapping of scalar parameter expressions to value.
**kwargs
Additional keyword arguments passed to pyarrow.csv.CSVWriter
https://arrow.apache.org/docs/python/generated/pyarrow.csv.CSVWriter.html
"""
self._find_backend(use_default=True).to_delta(self, path, **kwargs)

def unbind(self) -> ir.Table:
"""Return an expression built on `UnboundTable` instead of backend-specific objects."""
from ibis.expr.analysis import substitute_unbound
Expand Down
30 changes: 28 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ dask = { version = ">=2022.9.1", optional = true, extras = [
] }
datafusion = { version = ">=0.6,<26", optional = true }
db-dtypes = { version = ">=0.3,<2", optional = true }
deltalake = { version = ">=0.9.0,<0.10.0", optional = true }
duckdb = { version = ">=0.3.3,<1", optional = true }
duckdb-engine = { version = ">=0.1.8,<1", optional = true }
fsspec = { version = ">=2022.1.0", optional = true }
Expand Down Expand Up @@ -143,6 +144,7 @@ all = [
"db-dtypes",
"duckdb",
"duckdb-engine",
"deltalake",
"fsspec",
"geoalchemy2",
"geopandas",
Expand Down Expand Up @@ -204,6 +206,7 @@ trino = ["trino", "sqlalchemy", "sqlalchemy-views"]
# non-backend extras
visualization = ["graphviz"]
decompiler = ["black"]
deltalake = ["deltalake"]

[tool.poetry.plugins."ibis.backends"]
bigquery = "ibis.backends.bigquery"
Expand Down
1 change: 1 addition & 0 deletions requirements.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 2841753

Please sign in to comment.