Skip to content

Commit

Permalink
add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
SaintBacchus committed Sep 26, 2024
1 parent 254355e commit 1e1e010
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
15 changes: 15 additions & 0 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,21 @@ def write(
"""Passthrough the fragments to commit phase"""
v = []
for block in blocks:
# Empty sink did not have "fragment" and "schema" in block
if isinstance(block, pa.Table) and block.num_columns == 0:
continue

# check block as pandas
from ..dependencies import _PANDAS_AVAILABLE
from ..dependencies import pandas as pd

if (
_PANDAS_AVAILABLE
and isinstance(block, pd.DataFrame)
and block.columns == 0
):
continue

for fragment, schema in zip(
block["fragment"].to_pylist(), block["schema"].to_pylist()
):
Expand Down
14 changes: 14 additions & 0 deletions python/python/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,17 @@ def test_ray_write_lance(tmp_path: Path):
tbl = ds.to_table()
assert sorted(tbl["id"].to_pylist()) == list(range(10))
assert set(tbl["str"].to_pylist()) == set([f"str-{i}" for i in range(10)])


@pytest.mark.filterwarnings("ignore::DeprecationWarning")
def test_ray_empty_write_lance(tmp_path: Path):
schema = pa.schema([pa.field("id", pa.int64()), pa.field("str", pa.string())])

(
ray.data.range(10)
.filter((lambda row: row["id"] > 10))
.map(lambda x: {"id": x["id"], "str": f"str-{x['id']}"})
.write_lance(tmp_path, schema=schema)
)
# empty write would not generate dataset.
# ds = lance.dataset(tmp_path)

0 comments on commit 1e1e010

Please sign in to comment.