Skip to content

Commit

Permalink
feat: add standalone partition assignment operation (#2556)
Browse files Browse the repository at this point in the history
This also fixes up the tqdm progress bar for partition assignment so
that it has a definite end. This also makes one small tweak to the
indices builder, moving the column argument into the builder
constructor, since this argument will be shared by all methods in the
class.
  • Loading branch information
westonpace authored Jul 12, 2024
1 parent 7a2f828 commit 5a8fb8c
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 42 deletions.
22 changes: 20 additions & 2 deletions python/python/benchmarks/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,29 @@ def test_optimize_index(
@pytest.mark.benchmark(group="optimize_index")
@pytest.mark.parametrize("num_partitions", [100, 300])
def test_train_ivf(test_large_dataset, benchmark, num_partitions):
builder = IndicesBuilder(test_large_dataset)
builder = IndicesBuilder(test_large_dataset, "vector")
benchmark.pedantic(
builder.train_ivf,
args=["vector"],
kwargs={"num_partitions": num_partitions},
iterations=1,
rounds=1,
)


# Pre-computing partition assigment only makes sense on CUDA and so this benchmark runs
# only on CUDA.
@pytest.mark.benchmark(group="assign_partitions")
@pytest.mark.parametrize("num_partitions", [100, 300])
def test_partition_assignment(test_large_dataset, benchmark, num_partitions):
from lance.dependencies import torch

try:
if not torch.cuda.is_available():
return
except: # noqa: E722
return
builder = IndicesBuilder(test_large_dataset, "vector")
ivf = builder.train_ivf(num_partitions=num_partitions)
benchmark.pedantic(
builder.assign_ivf_partitions, args=[ivf, None, "cuda"], iterations=1, rounds=1
)
95 changes: 78 additions & 17 deletions python/python/lance/indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import warnings
from typing import TYPE_CHECKING, Optional, Union

import numpy as np
import pyarrow as pa

from lance.file import LanceFileReader, LanceFileWriter
Expand Down Expand Up @@ -153,12 +154,25 @@ class IndicesBuilder:
use the `create_index` method on the dataset object.
"""

def __init__(self, dataset):
def __init__(self, dataset, column: str):
"""
Create an index builder for the given vector column
Parameters
----------
dataset: LanceDataset
the dataset containing the data
column: str
The vector column to index, must be a fixed size list of floats
or 1-dimensional fixed-shape tensor column.
"""
self.dataset = dataset
self.column = self._normalize_column(column)
self.dimension = self.dataset.schema.field(self.column[0]).type.list_size

def train_ivf(
self,
column,
num_partitions=None,
*,
distance_type="l2",
Expand Down Expand Up @@ -209,19 +223,17 @@ def train_ivf(
possible minima. In these cases we must terminate or run forever. The
max_iters parameter defines a cutoff at which we terminate training.
"""
column = self._normalize_column(column)
num_rows = self.dataset.count_rows()
num_partitions = self._determine_num_partitions(num_partitions, num_rows)
self._verify_ivf_sample_rate(sample_rate, num_partitions, num_rows)
distance_type = self._normalize_distance_type(distance_type)
self._verify_ivf_params(num_partitions)

if accelerator is None:
dimension = self.dataset.schema.field(column[0]).type.list_size
ivf_centroids = indices.train_ivf_model(
self.dataset._ds,
column[0],
dimension,
self.column[0],
self.dimension,
num_partitions,
distance_type,
sample_rate,
Expand All @@ -234,7 +246,7 @@ def train_ivf(

ivf_centroids, _ = train_ivf_centroids_on_accelerator(
self.dataset,
column[0],
self.column[0],
num_partitions,
distance_type,
accelerator,
Expand All @@ -251,7 +263,6 @@ def train_ivf(

def train_pq(
self,
column,
ivf_model: IvfModel,
num_subvectors=None,
*,
Expand All @@ -270,9 +281,6 @@ def train_pq(
Parameters
----------
column: str
The vector column to quantize, must be a fixed size list of floats
or 1-dimensional fixed-shape tensor column.
ivf_model: IvfModel
The IVF model to use to partition the vectors into clusters. This is
needed because PQ is trained on residuals from the IVF model.
Expand All @@ -291,17 +299,15 @@ def train_pq(
max_iters: int
This parameter is used in the same way as in the IVF model.
"""
column = self._normalize_column(column)
num_rows = self.dataset.count_rows()
dimension = self.dataset.schema.field(column[0]).type.list_size
self.dataset.schema.field(column[0]).type.list_size
num_subvectors = self._normalize_pq_params(num_subvectors, dimension)
self.dataset.schema.field(self.column[0]).type.list_size
num_subvectors = self._normalize_pq_params(num_subvectors, self.dimension)
self._verify_pq_sample_rate(num_rows, sample_rate)
distance_type = ivf_model.distance_type
pq_codebook = indices.train_pq_model(
self.dataset._ds,
column[0],
dimension,
self.column[0],
self.dimension,
num_subvectors,
distance_type,
sample_rate,
Expand All @@ -310,6 +316,61 @@ def train_pq(
)
return PqModel(num_subvectors, pq_codebook)

def assign_ivf_partitions(
self,
ivf_model: IvfModel,
accelerator: Union[str, "torch.Device"],
*,
output_uri: Optional[str] = None,
) -> str:
"""
Calculates which IVF partition each vector belongs to. This searches the
IVF centroids and assigns the closest centroid to the vector. The result is
stored in a Lance dataset located at output_uri. The schema of the
partition assignment dataset is:
row_id: uint64
partition: uint32
Note: There is no advantage to separately computing the partition assignment
without an accelerator. If you are not using an accelerator then you should
skip this method and proceed without precomputed partition assignments.
Parameters
----------
ivf_model: IvfModel
An IvfModel, previously created by ``train_ivf`` which the data will be
assigned to.
accelerator: Union[str, torch.Device]
An optional accelerator to use to offload computation to specialized
hardware. Currently supported values are the same as those in ``train_ivf``
output_uri: Optional[str], default None
Destination Lance dataset where the partition assignments will be written
Can be None in which case a random directory will be used.
Returns
-------
str
The path of the partition assignment dataset (will be equal to
output_uri unless the value is None)
"""
from .dependencies import torch
from .torch.kmeans import KMeans
from .vector import compute_partitions

centroids = torch.from_numpy(
np.stack(ivf_model.centroids.to_numpy(zero_copy_only=False))
).to(accelerator)
kmeans = KMeans(
ivf_model.num_partitions,
metric=ivf_model.distance_type,
device=accelerator,
centroids=centroids,
)
return compute_partitions(
self.dataset, self.column[0], kmeans, dst_dataset_uri=output_uri
)

def _determine_num_partitions(self, num_partitions: Optional[int], num_rows: int):
if num_partitions is None:
return round(math.sqrt(num_rows))
Expand Down
34 changes: 19 additions & 15 deletions python/python/lance/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import logging
import re
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterable, Literal, Optional, Union

import pyarrow as pa
Expand All @@ -19,6 +18,8 @@
from .dependencies import numpy as np

if TYPE_CHECKING:
from pathlib import Path

from . import LanceDataset


Expand Down Expand Up @@ -191,7 +192,7 @@ def compute_partitions(
column: str,
kmeans: Any, # KMeans
batch_size: int = 10240,
spill_dir: Union[str, Path] = None,
dst_dataset_uri: Union[str, Path] = None,
) -> str:
"""Compute partitions for each row using GPU kmeans and spill to disk.
Expand All @@ -205,8 +206,9 @@ def compute_partitions(
KMeans model to use to compute partitions.
batch_size: int, default 10240
The batch size used to read the dataset.
spill_dir: Path
The path to store the partitions.
dst_dataset_uri: Union[str, Path], optional
The path to store the partitions. If not specified a random
directory is used instead
Returns
-------
Expand All @@ -215,6 +217,8 @@ def compute_partitions(
"""
from lance.torch.data import LanceDataset as PytorchLanceDataset

num_rows = dataset.count_rows()

torch_ds = PytorchLanceDataset(
dataset,
batch_size=batch_size,
Expand All @@ -228,6 +232,8 @@ def compute_partitions(
]
)

progress = tqdm(total=num_rows)

def _partition_assignment() -> Iterable[pa.RecordBatch]:
with torch.no_grad():
for batch in torch_ds:
Expand All @@ -254,27 +260,25 @@ def _partition_assignment() -> Iterable[pa.RecordBatch]:
len(part_batch) - len(ids),
)

progress.update(part_batch.num_rows)
yield part_batch

rbr = pa.RecordBatchReader.from_batches(
output_schema, tqdm(_partition_assignment())
)

if spill_dir is None:
spill_dir = tempfile.mkdtemp()

spill_uri = Path(spill_dir) / "precomputed_partitions.lance"
rbr = pa.RecordBatchReader.from_batches(output_schema, _partition_assignment())

if dst_dataset_uri is None:
dst_dataset_uri = tempfile.mkdtemp()
ds = write_dataset(
rbr,
spill_uri,
dst_dataset_uri,
schema=output_schema,
max_rows_per_file=dataset.count_rows(),
use_legacy_format=True,
)
assert len(ds.get_fragments()) == 1
files = ds.get_fragments()[0].data_files()
assert len(files) == 1

logging.info("Saved recomputed partitions to %s", spill_uri.absolute())
progress.close()

return str(spill_uri)
logging.info("Saved precomputed partitions to %s", dst_dataset_uri)
return str(dst_dataset_uri)
36 changes: 28 additions & 8 deletions python/python/tests/test_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def gen_dataset(tmpdir, datatype=np.float32):
def test_ivf_centroids(tmpdir):
ds = gen_dataset(tmpdir)

ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16)
ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16)

assert ivf.distance_type == "l2"
assert len(ivf.centroids) == 100
Expand All @@ -34,7 +34,7 @@ def test_ivf_centroids(tmpdir):
@pytest.mark.cuda
def test_ivf_centroids_cuda(tmpdir):
ds = gen_dataset(tmpdir)
ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16, accelerator="cuda")
ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16, accelerator="cuda")

assert ivf.distance_type == "l2"
assert len(ivf.centroids) == 100
Expand All @@ -43,7 +43,7 @@ def test_ivf_centroids_cuda(tmpdir):
def test_ivf_centroids_column_type(tmpdir):
def check(column_type, typename):
ds = gen_dataset(tmpdir / typename, column_type)
ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16)
ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16)
assert len(ivf.centroids) == 100
ivf.save(str(tmpdir / f"ivf_{typename}"))
reloaded = IvfModel.load(str(tmpdir / f"ivf_{typename}"))
Expand All @@ -58,8 +58,8 @@ def test_ivf_centroids_distance_type(tmpdir):
ds = gen_dataset(tmpdir)

def check(distance_type):
ivf = IndicesBuilder(ds).train_ivf(
"vectors", sample_rate=16, distance_type=distance_type
ivf = IndicesBuilder(ds, "vectors").train_ivf(
sample_rate=16, distance_type=distance_type
)
assert ivf.distance_type == distance_type
ivf.save(str(tmpdir / "ivf"))
Expand All @@ -74,25 +74,45 @@ def check(distance_type):
def test_num_partitions(tmpdir):
ds = gen_dataset(tmpdir)

ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16, num_partitions=10)
ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16, num_partitions=10)
assert ivf.num_partitions == 10


@pytest.fixture
def ds_with_ivf(tmpdir):
ds = gen_dataset(tmpdir)
ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16)
ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16)
return ds, ivf


def test_gen_pq(tmpdir, ds_with_ivf):
ds, ivf = ds_with_ivf

pq = IndicesBuilder(ds).train_pq("vectors", ivf, sample_rate=16)
pq = IndicesBuilder(ds, "vectors").train_pq(ivf, sample_rate=16)
assert pq.dimension == 128
assert pq.num_subvectors == 8

pq.save(str(tmpdir / "pq"))
reloaded = PqModel.load(str(tmpdir / "pq"))
assert pq.dimension == reloaded.dimension
assert pq.codebook == reloaded.codebook


@pytest.mark.cuda
def test_assign_partitions(tmpdir):
ds = gen_dataset(tmpdir)
builder = IndicesBuilder(ds, "vectors")

ivf = builder.train_ivf(sample_rate=16, num_partitions=20)
partitions_uri = builder.assign_ivf_partitions(ivf, accelerator="cuda")

partitions = lance.dataset(partitions_uri)
found_row_ids = set()
for batch in partitions.to_batches():
row_ids = batch["row_id"]
for row_id in row_ids:
found_row_ids.add(row_id)
part_ids = batch["partition"]
for part_id in part_ids:
assert part_id.as_py() < 20
assert len(found_row_ids) == ds.count_rows()

0 comments on commit 5a8fb8c

Please sign in to comment.