Skip to content

Commit

Permalink
feat!: change the default data storage version to "stable" (e.g. v2.0) (
Browse files Browse the repository at this point in the history
#2829)

Closes #2394 

This PR changes a few remaining tests. Also, by changing the default to
v2 we exposed a few minor inconsistencies with v1 that we fixed.

* When creating a fragment we reported progress before adding the
filename to the fragment. We now add the filename to the fragment before
reporting progress.
* Nested projection was broken (existing nested projection tests passed
by luck). This required some slight change to how we calculate
projection.

BREAKING CHANGE: new datasets will no longer be readable by versions
older than 0.16
  • Loading branch information
westonpace authored Sep 11, 2024
1 parent f4e3300 commit c0e1f15
Show file tree
Hide file tree
Showing 35 changed files with 801 additions and 342 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ jobs:
prefix-key: "manylinux2014" # use this to flush the cache
- uses: ./.github/workflows/build_linux_wheel
- uses: ./.github/workflows/run_tests
- name: Generate forward compatibility files
run: python python/tests/forward_compat/datagen.py
- name: Install old wheel
run: |
python -m venv venv
source venv/bin/activate
pip install pytest pylance==0.16.0
- name: Run forward compatibility tests
run: |
source venv/bin/activate
pytest python/tests/forward_compat --run-forward
# Make sure wheels are not included in the Rust cache
- name: Delete wheels
run: sudo rm -rf target/wheels
Expand Down
5 changes: 3 additions & 2 deletions docs/format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ The following values are supported:
- Any
- This is the initial Lance format.
* - 2.0
- 0.15.0
- 0.16.0
- Any
- Rework of the Lance file format that removed row groups and introduced null
support for lists, fixed size lists, and primtives
* - 2.1 (unstable)
- None
- Any
- Adds FSST string compression and bit packing
- Enhances integer and string compression, adds support for nulls in struct fields,
and improves random access performance with nested fields.
* - legacy
- N/A
- N/A
Expand Down
1 change: 1 addition & 0 deletions java/core/lance-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ crate-type = ["cdylib"]

[dependencies]
lance = { workspace = true, features = ["substrait"] }
lance-encoding = { path = "../../../rust/lance-encoding" }
lance-linalg = { path = "../../../rust/lance-linalg" }
lance-index = { path = "../../../rust/lance-index" }
lance-io.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions java/core/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use jni::objects::{JObject, JString};
use jni::JNIEnv;
use lance::dataset::{WriteMode, WriteParams};
use lance::index::vector::{StageParams, VectorIndexParams};
use lance_encoding::version::LanceFileVersion;
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::pq::PQBuildParams;
Expand Down Expand Up @@ -52,6 +53,8 @@ pub fn extract_write_params(
if let Some(mode_val) = env.get_string_opt(mode)? {
write_params.mode = WriteMode::try_from(mode_val.as_str())?;
}
// Java code always sets the data storage version to Legacy for now
write_params.data_storage_version = Some(LanceFileVersion::Legacy);
Ok(write_params)
}

Expand Down
2 changes: 1 addition & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2840,7 +2840,7 @@ def write_dataset(
commit_lock: Optional[CommitLock] = None,
progress: Optional[FragmentWriteProgress] = None,
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
enable_v2_manifest_paths: bool = False,
) -> LanceDataset:
Expand Down
4 changes: 2 additions & 2 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def create(
progress: Optional[FragmentWriteProgress] = None,
mode: str = "append",
*,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> FragmentMetadata:
Expand Down Expand Up @@ -528,7 +528,7 @@ def write_fragments(
max_rows_per_group: int = 1024,
max_bytes_per_file: int = DEFAULT_MAX_BYTES_PER_FILE,
progress: Optional[FragmentWriteProgress] = None,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> List[FragmentMetadata]:
Expand Down
8 changes: 4 additions & 4 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _write_fragment(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: int = 1024, # Only useful for v1 writer.
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
storage_options: Optional[Dict[str, Any]] = None,
) -> Tuple[FragmentMetadata, pa.Schema]:
from ..dependencies import _PANDAS_AVAILABLE
Expand Down Expand Up @@ -188,7 +188,7 @@ def __init__(
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "create",
max_rows_per_file: int = 1024 * 1024,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, Any]] = None,
*args,
Expand Down Expand Up @@ -295,7 +295,7 @@ def __init__(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: Optional[int] = None, # Only useful for v1 writer.
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = False,
storage_options: Optional[Dict[str, Any]] = None,
):
Expand Down Expand Up @@ -387,7 +387,7 @@ def write_lance(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
storage_options: Optional[Dict[str, Any]] = None,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
) -> None:
"""Write Ray dataset at scale.
Expand Down
21 changes: 21 additions & 0 deletions python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,34 @@ def pytest_addoption(parser):
default=False,
help="Run slow tests",
)
parser.addoption(
"--run-forward",
action="store_true",
default=False,
help="Run forward compatibility tests (requires files to be generated already)",
)


def pytest_configure(config):
config.addinivalue_line(
"markers",
"forward: mark tests that require forward compatibility datagen files",
)
config.addinivalue_line(
"markers", "integration: mark test that requires object storage integration"
)
config.addinivalue_line(
"markers", "slow: mark tests that require large CPU or RAM resources"
)


def pytest_collection_modifyitems(config, items):
if not config.getoption("--run-integration"):
disable_items_with_mark(items, "integration", "--run-integration not specified")
if not config.getoption("--run-slow"):
disable_items_with_mark(items, "slow", "--run-slow not specified")
if not config.getoption("--run-forward"):
disable_items_with_mark(items, "forward", "--run-forward not specified")
try:
import torch

Expand Down
2 changes: 2 additions & 0 deletions python/python/tests/forward_compat/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
97 changes: 97 additions & 0 deletions python/python/tests/forward_compat/datagen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

# This script generates Lance files that are read by test_forward_compat.py

from pathlib import Path

import pyarrow as pa
from lance.file import LanceFileWriter


def get_path(name: str):
dataset_dir = (
Path(__file__).parent.parent.parent.parent.parent
/ "test_data"
/ "forward_compat"
/ name
)
return dataset_dir


def build_basic_types():
schema = pa.schema(
[
pa.field("int", pa.int64()),
pa.field("float", pa.float32()),
pa.field("str", pa.string()),
pa.field("list_int", pa.list_(pa.int64())),
pa.field("list_str", pa.list_(pa.string())),
pa.field("struct", pa.struct([pa.field("a", pa.int64())])),
pa.field("dict", pa.dictionary(pa.int16(), pa.string())),
pa.field("str_as_dict", pa.string()),
]
)

return pa.table(
[
pa.array(range(1000)),
pa.array(range(1000), pa.float32()),
pa.array([str(i) for i in range(1000)]),
pa.array([list(range(i)) for i in range(1000)]),
pa.array([[str(i)] for i in range(1000)]),
pa.array([{"a": i} for i in range(1000)]),
pa.array(
[str(i % 10) for i in range(1000)],
pa.dictionary(pa.int16(), pa.string()),
),
pa.array(["a"] * 500 + ["b"] * 500),
],
schema=schema,
)


def write_basic_types():
path = get_path("basic_types.lance")
with LanceFileWriter(str(path)) as writer:
writer.write_batch(build_basic_types())


def build_large():
# ~40MB of vector embedding data (10K 1024-float32)
fsl_data = pa.array(range(1024 * 1000 * 10), pa.float32())
fsls = pa.FixedSizeListArray.from_arrays(fsl_data, 1024)
# ~40 MiB of binary data (10k 4KiB chunks)
bindata = pa.allocate_buffer(1024 * 1000 * 40)
offsets = pa.array(
range(0, (1024 * 1000 * 40) + 4 * 1024, 4 * 1024), pa.int32()
).buffers()[1]
bins = pa.BinaryArray.from_buffers(pa.binary(), 10000, [None, offsets, bindata])

schema = pa.schema(
[
pa.field("int", pa.int32()),
pa.field("fsl", pa.list_(pa.float32())),
pa.field("bin", pa.binary()),
]
)

return pa.table(
[
pa.array(range(10000), pa.int32()),
fsls,
bins,
],
schema=schema,
)


def write_large():
path = get_path("large.lance")
with LanceFileWriter(str(path)) as writer:
writer.write_batch(build_large())


if __name__ == "__main__":
write_basic_types()
write_large()
20 changes: 20 additions & 0 deletions python/python/tests/forward_compat/test_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import pytest
from lance.file import LanceFileReader

from .datagen import build_basic_types, build_large, get_path


@pytest.mark.forward
def test_scans():
expected_basic_types = build_basic_types()
actual_basic_types = (
LanceFileReader(str(get_path("basic_types.lance"))).read_all().to_table()
)
assert actual_basic_types.equals(expected_basic_types)

expected_large = build_large()
actual_large = LanceFileReader(str(get_path("large.lance"))).read_all().to_table()
assert actual_large.equals(expected_large)
49 changes: 48 additions & 1 deletion python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def test_roundtrip_types(tmp_path: Path):
}
)

dataset = lance.write_dataset(table, tmp_path)
# TODO: V2 does not currently handle large_dict
dataset = lance.write_dataset(table, tmp_path, data_storage_version="legacy")
assert dataset.schema == table.schema
assert dataset.to_table() == table

Expand Down Expand Up @@ -538,6 +539,28 @@ def test_pickle(tmp_path: Path):
assert dataset.to_table() == unpickled.to_table()


def test_nested_projection(tmp_path: Path):
table = pa.Table.from_pydict(
{
"a": range(100),
"b": range(100),
"struct": [{"x": counter, "y": counter % 2 == 0} for counter in range(100)],
}
)
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)

dataset = lance.dataset(base_dir)

projected = dataset.to_table(columns=["struct.x"])
assert projected == pa.Table.from_pydict({"struct.x": range(100)})

projected = dataset.to_table(columns=["struct.y"])
assert projected == pa.Table.from_pydict(
{"struct.y": [i % 2 == 0 for i in range(100)]}
)


def test_polar_scan(tmp_path: Path):
some_structs = [{"x": counter, "y": counter} for counter in range(100)]
table = pa.Table.from_pydict(
Expand Down Expand Up @@ -2273,3 +2296,27 @@ def test_late_materialization_batch_size(tmp_path: Path):
columns=["values"], filter="filter % 2 == 0", batch_size=32
):
assert batch.num_rows == 32


EXPECTED_DEFAULT_STORAGE_VERSION = "2.0"
EXPECTED_MAJOR_VERSION = 2
EXPECTED_MINOR_VERSION = 0


def test_default_storage_version(tmp_path: Path):
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path)
assert dataset.data_storage_version == EXPECTED_DEFAULT_STORAGE_VERSION

frag = lance.LanceFragment.create(dataset.uri, table)
sample_file = frag.to_json()["files"][0]
assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION
assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION

from lance.fragment import write_fragments

frags = write_fragments(table, dataset.uri)
frag = frags[0]
sample_file = frag.to_json()["files"][0]
assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION
assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION
Loading

0 comments on commit c0e1f15

Please sign in to comment.