From 28417538891de21d2473a51cb32d85ba6010eed4 Mon Sep 17 00:00:00 2001 From: Cody Date: Wed, 31 May 2023 08:49:37 -0700 Subject: [PATCH] feat: implement read_delta and to_delta for some backends save cleanup; add read_delta for duckdb chore(deps): add `deltalake` dependency and extra test(delta): add round trip deltalake format test test: skip if `deltalake` missing ci: add deltalake extra to polars and duckdb jobs Apply suggestions from code review Co-authored-by: Phillip Cloud <417981+cpcloud@users.noreply.github.com> test: hit the top-level API in roundtrip test docs(backends): fix typo in pip install command fix(docs): typo in code without selectors docstrings and try/catch deltalake import fix lint black; poetry stuff --- .github/workflows/ibis-backends.yml | 2 ++ ibis/backends/base/__init__.py | 39 +++++++++++++++++++++++++ ibis/backends/duckdb/__init__.py | 44 +++++++++++++++++++++++++++++ ibis/backends/polars/__init__.py | 35 +++++++++++++++++++++++ ibis/backends/tests/test_export.py | 43 ++++++++++++++++++++++++++++ ibis/expr/api.py | 27 ++++++++++++++++++ ibis/expr/types/core.py | 26 +++++++++++++++++ poetry.lock | 30 ++++++++++++++++++-- pyproject.toml | 3 ++ requirements.txt | 1 + 10 files changed, 248 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ibis-backends.yml b/.github/workflows/ibis-backends.yml index 6828c276286f..4047ee490b21 100644 --- a/.github/workflows/ibis-backends.yml +++ b/.github/workflows/ibis-backends.yml @@ -59,6 +59,7 @@ jobs: title: DuckDB extras: - duckdb + - deltalake - name: pandas title: Pandas extras: @@ -75,6 +76,7 @@ jobs: title: Polars extras: - polars + - deltalake - name: mysql title: MySQL services: diff --git a/ibis/backends/base/__init__.py b/ibis/backends/base/__init__.py index 696c5c93968d..d468287befa5 100644 --- a/ibis/backends/base/__init__.py +++ b/ibis/backends/base/__init__.py @@ -444,6 +444,45 @@ def to_csv( for batch in batch_reader: writer.write_batch(batch) + @util.experimental + def to_delta( + self, + expr: ir.Table, + path: str | Path, + *, + params: Mapping[ir.Scalar, Any] | None = None, + **kwargs: Any, + ) -> None: + """Write the results of executing the given expression to a Delta Lake table. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + expr + The ibis expression to execute and persist to Delta Lake table. + path + The data source. A string or Path to the Delta Lake table. + params + Mapping of scalar parameter expressions to value. + kwargs + Additional keyword arguments passed to deltalake.writer.write_deltalake method + + """ + try: + from deltalake.writer import write_deltalake + except ImportError: + raise ImportError( + "The deltalake extra is required to use the " + "to_delta method. You can install it using pip:\n\n" + "pip install ibis-framework[deltalake]\n" + ) + + batch_reader = expr.to_pyarrow_batches(params=params) + + write_deltalake(path, batch_reader, **kwargs) + class BaseBackend(abc.ABC, _FileIOHandler): """Base backend class. diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 7336d9808d28..9959034b2c6c 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -473,6 +473,50 @@ def read_in_memory( return self.table(table_name) + def read_delta( + self, + source_table: str, + table_name: str | None = None, + **kwargs: Any, + ) -> ir.Table: + """Register a Delta Lake table as a table in the current database. + + Parameters + ---------- + source_table + The data source. Must be a directory + containing a Delta Lake table. + table_name + An optional name to use for the created table. This defaults to + a sequentially generated name. + kwargs + Additional keyword arguments passed to DuckDB loading function. + Additional keyword arguments passed to deltalake.DeltaTable. + + Returns + ------- + ir.Table + The just-registered table. + """ + source_table = normalize_filenames(source_table)[0] + + table_name = table_name or util.gen_name("read_delta") + + try: + from deltalake import DeltaTable + except ImportError: + raise ImportError( + "The deltalake extra is required to use the " + "read_delta method. You can install it using pip:\n\n" + "pip install ibis-framework[deltalake]\n" + ) + + delta_table = DeltaTable(source_table, **kwargs) + + return self.read_in_memory( + delta_table.to_pyarrow_dataset(), table_name=table_name + ) + def list_tables(self, like=None, database=None): tables = self.inspector.get_table_names(schema=database) views = self.inspector.get_view_names(schema=database) diff --git a/ibis/backends/polars/__init__.py b/ibis/backends/polars/__init__.py index a2d7d6dc67a3..da71e1a1345c 100644 --- a/ibis/backends/polars/__init__.py +++ b/ibis/backends/polars/__init__.py @@ -224,6 +224,41 @@ def read_parquet( self._add_table(table_name, pl.scan_parquet(path, **kwargs)) return self.table(table_name) + def read_delta( + self, path: str | Path, table_name: str | None = None, **kwargs: Any + ) -> ir.Table: + """Register a Delta Lake as a table in the current database. + + Parameters + ---------- + path + The data source(s). Path to a Delta Lake table directory. + table_name + An optional name to use for the created table. This defaults to + a sequentially generated name. + **kwargs + Additional keyword arguments passed to Polars loading function. + See https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.scan_delta.html + for more information. + + Returns + ------- + ir.Table + The just-registered table + """ + try: + import deltalake # noqa: F401 + except ImportError: + raise ImportError( + "The deltalake extra is required to use the " + "read_delta method. You can install it using pip:\n\n" + "pip install ibis-framework[polars,deltalake]\n" + ) + path = normalize_filename(path) + table_name = table_name or gen_name("read_delta") + self._add_table(table_name, pl.scan_delta(path, **kwargs)) + return self.table(table_name) + def database(self, name=None): return Database(name, self) diff --git a/ibis/backends/tests/test_export.py b/ibis/backends/tests/test_export.py index 6227ea15736a..a2545e63a0ff 100644 --- a/ibis/backends/tests/test_export.py +++ b/ibis/backends/tests/test_export.py @@ -1,4 +1,5 @@ import pandas as pd +import pandas.testing as tm import pytest import sqlalchemy as sa from packaging.version import parse as vparse @@ -6,6 +7,7 @@ import ibis import ibis.expr.datatypes as dt +from ibis import selectors as s from ibis import util pa = pytest.importorskip("pyarrow") @@ -313,3 +315,44 @@ def test_to_pyarrow_decimal(backend, dtype, pyarrow_dtype): ) assert len(result) == 1 assert isinstance(result.type, pyarrow_dtype) + + +@pytest.mark.notyet( + [ + "bigquery", + "clickhouse", + "dask", + "datafusion", + "impala", + "mssql", + "mysql", + "oracle", + "pandas", + "postgres", + "pyspark", + "snowflake", + "sqlite", + "trino", + ], + raises=AttributeError, + reason="read_delta not yet implemented", +) +@pytest.mark.notyet( + ["druid"], + raises=pa.lib.ArrowTypeError, + reason="arrow type conversion fails in `to_delta` call", +) +def test_roundtrip_delta(con, alltypes, tmp_path, monkeypatch): + pytest.importorskip("deltalake") + + # delta can't handle nanosecond timestamp columns it seems + t = alltypes.drop(s.c("__time", "timestamp_col")).head() + expected = t.execute() + path = tmp_path / "test.delta" + t.to_delta(path) + + monkeypatch.setattr(ibis.options, "default_backend", con) + dt = ibis.read_delta(path) + result = dt.to_pandas() + + tm.assert_frame_equal(result, expected) diff --git a/ibis/expr/api.py b/ibis/expr/api.py index b4b1c22d5371..9dca25d4dfb4 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -136,6 +136,7 @@ 'random', 'range_window', 'read_csv', + 'read_delta', 'read_json', 'read_parquet', 'row_number', @@ -936,6 +937,32 @@ def read_parquet(sources: str | Path | Sequence[str | Path], **kwargs: Any) -> i return con.read_parquet(sources, **kwargs) +def read_delta(source: str | Path, **kwargs: Any) -> ir.Table: + """Lazily load a Delta Lake table. + + Parameters + ---------- + source + A filesystem path or URL. + kwargs + Backend-specific keyword arguments for the file type. + + Returns + ------- + ir.Table + Table expression representing a file + + Examples + -------- + >>> import ibis + >>> t = ibis.read_delta("path/to/delta") # doctest: +SKIP + """ + from ibis.config import _default_backend + + con = _default_backend() + return con.read_delta(source, **kwargs) + + def set_backend(backend: str | BaseBackend) -> None: """Set the default Ibis backend. diff --git a/ibis/expr/types/core.py b/ibis/expr/types/core.py index 5bb2bd5b333b..db99b5a4e1aa 100644 --- a/ibis/expr/types/core.py +++ b/ibis/expr/types/core.py @@ -467,6 +467,32 @@ def to_csv( """ self._find_backend(use_default=True).to_csv(self, path, **kwargs) + @experimental + def to_delta( + self, + path: str | Path, + *, + params: Mapping[ir.Scalar, Any] | None = None, + **kwargs: Any, + ) -> None: + """Write the results of executing the given expression to a Delta Lake table + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + path + The data source. A string or Path to the Delta Lake table directory. + params + Mapping of scalar parameter expressions to value. + **kwargs + Additional keyword arguments passed to pyarrow.csv.CSVWriter + + https://arrow.apache.org/docs/python/generated/pyarrow.csv.CSVWriter.html + """ + self._find_backend(use_default=True).to_delta(self, path, **kwargs) + def unbind(self) -> ir.Table: """Return an expression built on `UnboundTable` instead of backend-specific objects.""" from ibis.expr.analysis import substitute_unbound diff --git a/poetry.lock b/poetry.lock index ed82ccc67177..234691a44d15 100644 --- a/poetry.lock +++ b/poetry.lock @@ -955,6 +955,31 @@ files = [ {file = "defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69"}, ] +[[package]] +name = "deltalake" +version = "0.9.0" +description = "Native Delta Lake Python binding based on delta-rs with Pandas integration" +category = "main" +optional = true +python-versions = ">=3.7" +files = [ + {file = "deltalake-0.9.0-1-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f0c1775e4624412e2ccda8f96cb19e952960e2fa8aa59ed4fdfd6fc5dcac8f49"}, + {file = "deltalake-0.9.0-1-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5d3d8b20b2e3e98edb4a8ab4d6f05f22259b0c355186e557008ece981a39fd01"}, + {file = "deltalake-0.9.0-cp37-abi3-macosx_10_7_x86_64.whl", hash = "sha256:4de70eeac00bfab1375eb805f3832a6ebc30ea00af2b9773561667e440d65e4b"}, + {file = "deltalake-0.9.0-cp37-abi3-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:52448204032459058c8606cf3c058f321522f736d355c0391e8e5a6570f10e31"}, + {file = "deltalake-0.9.0-cp37-abi3-macosx_11_0_arm64.whl", hash = "sha256:cc52c4660dd47fc836a980897a1fb39dbb0282ca7f061e724f1e7c835dde744c"}, + {file = "deltalake-0.9.0-cp37-abi3-win_amd64.whl", hash = "sha256:45690f5b9e42da9fdc48f37656796e7cf3e26cd388a986fc6a52b38ea675f4ef"}, + {file = "deltalake-0.9.0.tar.gz", hash = "sha256:1634380220051e7bbd3f4b0b6b5901c46b62ad8326fd5da91b032e8d318216f5"}, +] + +[package.dependencies] +pyarrow = ">=7" + +[package.extras] +devel = ["black", "mypy", "packaging (>=20)", "pytest", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-timeout", "ruff", "sphinx (<=4.5)", "sphinx-rtd-theme", "toml", "wheel"] +pandas = ["pandas (<2)"] +pyspark = ["delta-spark", "numpy (==1.22.2)", "pyspark"] + [[package]] name = "distlib" version = "0.3.6" @@ -5272,12 +5297,13 @@ cffi = {version = ">=1.11", markers = "platform_python_implementation == \"PyPy\ cffi = ["cffi (>=1.11)"] [extras] -all = ["black", "clickhouse-connect", "dask", "datafusion", "db-dtypes", "duckdb", "duckdb-engine", "fsspec", "GeoAlchemy2", "geopandas", "google-cloud-bigquery", "google-cloud-bigquery-storage", "graphviz", "impyla", "oracledb", "packaging", "polars", "psycopg2", "pydata-google-auth", "pydruid", "pymssql", "pymysql", "pyspark", "regex", "requests", "shapely", "snowflake-connector-python", "snowflake-sqlalchemy", "sqlalchemy", "sqlalchemy-views", "trino"] +all = ["black", "clickhouse-connect", "dask", "datafusion", "db-dtypes", "duckdb", "duckdb-engine", "deltalake", "fsspec", "GeoAlchemy2", "geopandas", "google-cloud-bigquery", "google-cloud-bigquery-storage", "graphviz", "impyla", "oracledb", "packaging", "polars", "psycopg2", "pydata-google-auth", "pydruid", "pymssql", "pymysql", "pyspark", "regex", "requests", "shapely", "snowflake-connector-python", "snowflake-sqlalchemy", "sqlalchemy", "sqlalchemy-views", "trino"] bigquery = ["db-dtypes", "google-cloud-bigquery", "google-cloud-bigquery-storage", "pydata-google-auth"] clickhouse = ["clickhouse-connect", "sqlalchemy"] dask = ["dask", "regex"] datafusion = ["datafusion"] decompiler = ["black"] +deltalake = ["deltalake"] druid = ["pydruid", "sqlalchemy"] duckdb = ["duckdb", "duckdb-engine", "packaging", "sqlalchemy", "sqlalchemy-views"] geospatial = ["GeoAlchemy2", "geopandas", "shapely"] @@ -5297,4 +5323,4 @@ visualization = ["graphviz"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "c05f4e761f584737b43a6fd5e1abeeccaaaaa096901aca05b02afced8ed35860" +content-hash = "404920c32e86d7f8291f855b43c4e1ef314049b22cb6a1c1ffd4c447b689dc6c" diff --git a/pyproject.toml b/pyproject.toml index acaa2ebc5a80..fddcd84676af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ dask = { version = ">=2022.9.1", optional = true, extras = [ ] } datafusion = { version = ">=0.6,<26", optional = true } db-dtypes = { version = ">=0.3,<2", optional = true } +deltalake = { version = ">=0.9.0,<0.10.0", optional = true } duckdb = { version = ">=0.3.3,<1", optional = true } duckdb-engine = { version = ">=0.1.8,<1", optional = true } fsspec = { version = ">=2022.1.0", optional = true } @@ -143,6 +144,7 @@ all = [ "db-dtypes", "duckdb", "duckdb-engine", + "deltalake", "fsspec", "geoalchemy2", "geopandas", @@ -204,6 +206,7 @@ trino = ["trino", "sqlalchemy", "sqlalchemy-views"] # non-backend extras visualization = ["graphviz"] decompiler = ["black"] +deltalake = ["deltalake"] [tool.poetry.plugins."ibis.backends"] bigquery = "ibis.backends.bigquery" diff --git a/requirements.txt b/requirements.txt index 0e5dc3905c4e..6be793b0214d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,6 +31,7 @@ db-dtypes==1.1.1 ; python_version >= "3.8" and python_version < "4.0" debugpy==1.6.7 ; python_version >= "3.8" and python_version < "4.0" decorator==5.1.1 ; python_version >= "3.8" and python_version < "4.0" defusedxml==0.7.1 ; python_version >= "3.8" and python_version < "4.0" +deltalake==0.9.0 ; python_version >= "3.8" and python_version < "4.0" distlib==0.3.6 ; python_version >= "3.8" and python_version < "4.0" duckdb-engine==0.7.3 ; python_version >= "3.8" and python_version < "4.0" duckdb==0.8.0 ; python_version >= "3.8" and python_version < "4.0"