From 7497a2a2b6e936914ad6ce3c6f05e94bc6c48c0f Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Fri, 20 Sep 2024 15:55:41 +0800 Subject: [PATCH 1/3] fix ray sink error when there are no data to write --- python/python/lance/ray/sink.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/python/lance/ray/sink.py b/python/python/lance/ray/sink.py index debf956ee9..8b39d56273 100644 --- a/python/python/lance/ray/sink.py +++ b/python/python/lance/ray/sink.py @@ -137,6 +137,10 @@ def on_write_complete( fragment = pickle.loads(fragment_str) fragments.append(fragment) schema = pickle.loads(schema_str) + # Check weather writer has fragments or not. + # Skip commit when there are no fragments. + if not schema: + return if self.mode in set(["create", "overwrite"]): op = lance.LanceOperation.Overwrite(schema, fragments) elif self.mode == "append": From 3dda1df33a414a1444ca2267f28201e4b82442d6 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Thu, 26 Sep 2024 14:21:58 +0800 Subject: [PATCH 2/3] add test case --- python/python/lance/ray/sink.py | 15 +++++++++++++++ python/python/tests/test_ray.py | 14 ++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/python/python/lance/ray/sink.py b/python/python/lance/ray/sink.py index 8b39d56273..944d05edd1 100644 --- a/python/python/lance/ray/sink.py +++ b/python/python/lance/ray/sink.py @@ -372,6 +372,21 @@ def write( """Passthrough the fragments to commit phase""" v = [] for block in blocks: + # Empty sink did not have "fragment" and "schema" in block + if isinstance(block, pa.Table) and block.num_columns == 0: + continue + + from ..dependencies import _PANDAS_AVAILABLE + from ..dependencies import pandas as pd + + # check block as pandas + if ( + _PANDAS_AVAILABLE + and isinstance(block, pd.DataFrame) + and len(block.columns) == 0 + ): + continue + for fragment, schema in zip( block["fragment"].to_pylist(), block["schema"].to_pylist() ): diff --git a/python/python/tests/test_ray.py b/python/python/tests/test_ray.py index babf1a14dc..ff993528b5 100644 --- a/python/python/tests/test_ray.py +++ b/python/python/tests/test_ray.py @@ -101,3 +101,17 @@ def test_ray_write_lance(tmp_path: Path): tbl = ds.to_table() assert sorted(tbl["id"].to_pylist()) == list(range(10)) assert set(tbl["str"].to_pylist()) == set([f"str-{i}" for i in range(10)]) + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +def test_ray_empty_write_lance(tmp_path: Path): + schema = pa.schema([pa.field("id", pa.int64()), pa.field("str", pa.string())]) + + ( + ray.data.range(10) + .filter((lambda row: row["id"] > 10)) + .map(lambda x: {"id": x["id"], "str": f"str-{x['id']}"}) + .write_lance(tmp_path, schema=schema) + ) + # empty write would not generate dataset. + # ds = lance.dataset(tmp_path) From a97595cfe78569c2e7763565b6b3d797fbc29c09 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Fri, 27 Sep 2024 14:12:14 +0800 Subject: [PATCH 3/3] use len for pa.table and data frame --- python/python/lance/ray/sink.py | 15 ++------------- python/python/tests/test_ray.py | 3 ++- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/python/python/lance/ray/sink.py b/python/python/lance/ray/sink.py index 944d05edd1..ea3b92e715 100644 --- a/python/python/lance/ray/sink.py +++ b/python/python/lance/ray/sink.py @@ -372,19 +372,8 @@ def write( """Passthrough the fragments to commit phase""" v = [] for block in blocks: - # Empty sink did not have "fragment" and "schema" in block - if isinstance(block, pa.Table) and block.num_columns == 0: - continue - - from ..dependencies import _PANDAS_AVAILABLE - from ..dependencies import pandas as pd - - # check block as pandas - if ( - _PANDAS_AVAILABLE - and isinstance(block, pd.DataFrame) - and len(block.columns) == 0 - ): + # If block is empty, skip to get "fragment" and "schema" filed + if len(block) == 0: continue for fragment, schema in zip( diff --git a/python/python/tests/test_ray.py b/python/python/tests/test_ray.py index ff993528b5..03f9253ae6 100644 --- a/python/python/tests/test_ray.py +++ b/python/python/tests/test_ray.py @@ -114,4 +114,5 @@ def test_ray_empty_write_lance(tmp_path: Path): .write_lance(tmp_path, schema=schema) ) # empty write would not generate dataset. - # ds = lance.dataset(tmp_path) + with pytest.raises(ValueError): + lance.dataset(tmp_path)