Skip to content

Commit

Permalink
fix: fix ray sink error when there are no data to write (#2919)
Browse files Browse the repository at this point in the history
Reproduce python code:

```python
import ray
from lance.ray.sink import LanceDatasink

ray.init()

sink = LanceDatasink("./data.lance")
ray.data.range(10).filter((lambda row: row["id"] > 10)).map(lambda x: {"id": x["id"], "str": f"str-{x['id']}"}).write_datasink(sink)
```

When using the lance ray sink to write lance file, the empty sink which
may be caused by filter operator in ray data will cause these exception.


```
  File "/opt/conda/lib/python3.11/site-packages/ray/data/dataset.py", line 3621, in write_datasink
    datasink.on_write_complete(write_results)
  File "/opt/conda/lib/python3.11/site-packages/lance/ray/sink.py", line 141, in on_write_complete
    op = lance.LanceOperation.Overwrite(schema, fragments)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 5, in __init__
  File "/opt/conda/lib/python3.11/site-packages/lance/dataset.py", line 1962, in __post_init__
    raise TypeError(
TypeError: schema must be pyarrow.Schema, got <class 'NoneType'>
```

The `on_write_complete` function assigns the `schema` by `fragments`. If
there is no `fragments`, the `schema` will be `None`
  • Loading branch information
SaintBacchus authored Sep 27, 2024
1 parent baacc63 commit 75aa2c2
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
8 changes: 8 additions & 0 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ def on_write_complete(
fragment = pickle.loads(fragment_str)
fragments.append(fragment)
schema = pickle.loads(schema_str)
# Check weather writer has fragments or not.
# Skip commit when there are no fragments.
if not schema:
return
if self.mode in set(["create", "overwrite"]):
op = lance.LanceOperation.Overwrite(schema, fragments)
elif self.mode == "append":
Expand Down Expand Up @@ -368,6 +372,10 @@ def write(
"""Passthrough the fragments to commit phase"""
v = []
for block in blocks:
# If block is empty, skip to get "fragment" and "schema" filed
if len(block) == 0:
continue

for fragment, schema in zip(
block["fragment"].to_pylist(), block["schema"].to_pylist()
):
Expand Down
15 changes: 15 additions & 0 deletions python/python/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,18 @@ def test_ray_write_lance(tmp_path: Path):
tbl = ds.to_table()
assert sorted(tbl["id"].to_pylist()) == list(range(10))
assert set(tbl["str"].to_pylist()) == set([f"str-{i}" for i in range(10)])


@pytest.mark.filterwarnings("ignore::DeprecationWarning")
def test_ray_empty_write_lance(tmp_path: Path):
schema = pa.schema([pa.field("id", pa.int64()), pa.field("str", pa.string())])

(
ray.data.range(10)
.filter((lambda row: row["id"] > 10))
.map(lambda x: {"id": x["id"], "str": f"str-{x['id']}"})
.write_lance(tmp_path, schema=schema)
)
# empty write would not generate dataset.
with pytest.raises(ValueError):
lance.dataset(tmp_path)

0 comments on commit 75aa2c2

Please sign in to comment.