Skip to content

Commit

Permalink
index optimizer (facebookresearch#3154)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookresearch#3154

Using the benchmark to find Pareto optimal indices, in this case on BigANN as an example.

Separately optimize the coarse quantizer and the vector codec and use Pareto optimal configurations to construct IVF indices, which are then retested at various scales. See `optimize()` in `optimize.py` as the main function driving the process.

The results can be interpreted with `bench_fw_notebook.ipynb`, which allows:
* filtering by maximum code size
* maximum time
* minimum accuracy
* space or time Pareto optimal options
* and visualize the results and output them as a table.

This version is intentionally limited to IVF(Flat|HNSW),PQ|SQ indices...

Reviewed By: mdouze

Differential Revision: D51781670

fbshipit-source-id: 2c0f800d374ea845255934f519cc28095c00a51f
  • Loading branch information
algoriddle authored and facebook-github-bot committed Jan 30, 2024
1 parent 75ae0bf commit 1d0e8d4
Show file tree
Hide file tree
Showing 8 changed files with 1,318 additions and 750 deletions.
122 changes: 90 additions & 32 deletions benchs/bench_fw/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@
from copy import copy
from dataclasses import dataclass
from operator import itemgetter
from statistics import median, mean
from statistics import mean, median
from typing import Any, Dict, List, Optional

from .utils import dict_merge
from .index import Index, IndexFromCodec, IndexFromFactory
from .descriptors import DatasetDescriptor, IndexDescriptor

import faiss # @manual=//faiss/python:pyfaiss_gpu

import numpy as np

from scipy.optimize import curve_fit

from .descriptors import DatasetDescriptor, IndexDescriptor
from .index import Index, IndexFromCodec, IndexFromFactory

from .utils import dict_merge

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -274,8 +275,8 @@ def range_search(
search_parameters: Optional[Dict[str, int]],
radius: Optional[float] = None,
gt_radius: Optional[float] = None,
range_search_metric_function = None,
gt_rsm = None,
range_search_metric_function=None,
gt_rsm=None,
):
logger.info("range_search: begin")
if radius is None:
Expand Down Expand Up @@ -328,7 +329,13 @@ def knn_ground_truth(self):
logger.info("knn_ground_truth: begin")
flat_desc = self.get_index_desc("Flat")
self.build_index_wrapper(flat_desc)
self.gt_knn_D, self.gt_knn_I, _, _, requires = flat_desc.index.knn_search(
(
self.gt_knn_D,
self.gt_knn_I,
_,
_,
requires,
) = flat_desc.index.knn_search(
dry_run=False,
search_parameters=None,
query_vectors=self.query_vectors,
Expand All @@ -338,13 +345,13 @@ def knn_ground_truth(self):
logger.info("knn_ground_truth: end")

def search_benchmark(
self,
self,
name,
search_func,
key_func,
cost_metrics,
perf_metrics,
results: Dict[str, Any],
results: Dict[str, Any],
index: Index,
):
index_name = index.get_index_name()
Expand Down Expand Up @@ -376,11 +383,18 @@ def experiment(parameters, cost_metric, perf_metric):
logger.info(f"{name}_benchmark: end")
return results, requires

def knn_search_benchmark(self, dry_run, results: Dict[str, Any], index: Index):
def knn_search_benchmark(
self, dry_run, results: Dict[str, Any], index: Index
):
return self.search_benchmark(
name="knn_search",
search_func=lambda parameters: index.knn_search(
dry_run, parameters, self.query_vectors, self.k, self.gt_knn_I, self.gt_knn_D,
dry_run,
parameters,
self.query_vectors,
self.k,
self.gt_knn_I,
self.gt_knn_D,
)[3:],
key_func=lambda parameters: index.get_knn_search_name(
search_parameters=parameters,
Expand All @@ -394,11 +408,17 @@ def knn_search_benchmark(self, dry_run, results: Dict[str, Any], index: Index):
index=index,
)

def reconstruct_benchmark(self, dry_run, results: Dict[str, Any], index: Index):
def reconstruct_benchmark(
self, dry_run, results: Dict[str, Any], index: Index
):
return self.search_benchmark(
name="reconstruct",
search_func=lambda parameters: index.reconstruct(
dry_run, parameters, self.query_vectors, self.k, self.gt_knn_I,
dry_run,
parameters,
self.query_vectors,
self.k,
self.gt_knn_I,
),
key_func=lambda parameters: index.get_knn_search_name(
search_parameters=parameters,
Expand Down Expand Up @@ -426,31 +446,33 @@ def range_search_benchmark(
return self.search_benchmark(
name="range_search",
search_func=lambda parameters: self.range_search(
dry_run=dry_run,
index=index,
search_parameters=parameters,
dry_run=dry_run,
index=index,
search_parameters=parameters,
radius=radius,
gt_radius=gt_radius,
range_search_metric_function=range_search_metric_function,
range_search_metric_function=range_search_metric_function,
gt_rsm=gt_rsm,
)[4:],
key_func=lambda parameters: index.get_range_search_name(
search_parameters=parameters,
query_vectors=self.query_vectors,
radius=radius,
) + metric_key,
)
+ metric_key,
cost_metrics=["time"],
perf_metrics=["range_score_max_recall"],
results=results,
index=index,
)

def build_index_wrapper(self, index_desc: IndexDescriptor):
if hasattr(index_desc, 'index'):
if hasattr(index_desc, "index"):
return
if index_desc.factory is not None:
training_vectors = copy(self.training_vectors)
training_vectors.num_vectors = index_desc.training_size
if index_desc.training_size is not None:
training_vectors.num_vectors = index_desc.training_size
index = IndexFromFactory(
num_threads=self.num_threads,
d=self.d,
Expand Down Expand Up @@ -481,15 +503,24 @@ def clone_one(self, index_desc):
training_vectors=self.training_vectors,
database_vectors=self.database_vectors,
query_vectors=self.query_vectors,
index_descs = [self.get_index_desc("Flat"), index_desc],
index_descs=[self.get_index_desc("Flat"), index_desc],
range_ref_index_desc=self.range_ref_index_desc,
k=self.k,
distance_metric=self.distance_metric,
)
benchmark.set_io(self.io)
benchmark.set_io(self.io.clone())
return benchmark

def benchmark_one(self, dry_run, results: Dict[str, Any], index_desc: IndexDescriptor, train, reconstruct, knn, range):
def benchmark_one(
self,
dry_run,
results: Dict[str, Any],
index_desc: IndexDescriptor,
train,
reconstruct,
knn,
range,
):
faiss.omp_set_num_threads(self.num_threads)
if not dry_run:
self.knn_ground_truth()
Expand Down Expand Up @@ -531,9 +562,12 @@ def benchmark_one(self, dry_run, results: Dict[str, Any], index_desc: IndexDescr
)
assert requires is None

if self.range_ref_index_desc is None or not index_desc.index.supports_range_search():
if (
self.range_ref_index_desc is None
or not index_desc.index.supports_range_search()
):
return results, None

ref_index_desc = self.get_index_desc(self.range_ref_index_desc)
if ref_index_desc is None:
raise ValueError(
Expand All @@ -550,7 +584,9 @@ def benchmark_one(self, dry_run, results: Dict[str, Any], index_desc: IndexDescr
coefficients,
coefficients_training_data,
) = self.range_search_reference(
ref_index_desc.index, ref_index_desc.search_params, range_metric
ref_index_desc.index,
ref_index_desc.search_params,
range_metric,
)
gt_rsm = self.range_ground_truth(
gt_radius, range_search_metric_function
Expand Down Expand Up @@ -583,7 +619,15 @@ def benchmark_one(self, dry_run, results: Dict[str, Any], index_desc: IndexDescr

return results, None

def benchmark(self, result_file=None, local=False, train=False, reconstruct=False, knn=False, range=False):
def benchmark(
self,
result_file=None,
local=False,
train=False,
reconstruct=False,
knn=False,
range=False,
):
logger.info("begin evaluate")

faiss.omp_set_num_threads(self.num_threads)
Expand Down Expand Up @@ -656,20 +700,34 @@ def benchmark(self, result_file=None, local=False, train=False, reconstruct=Fals

if current_todo:
results_one = {"indices": {}, "experiments": {}}
params = [(self.clone_one(index_desc), results_one, index_desc, train, reconstruct, knn, range) for index_desc in current_todo]
for result in self.io.launch_jobs(run_benchmark_one, params, local=local):
params = [
(
index_desc,
self.clone_one(index_desc),
results_one,
train,
reconstruct,
knn,
range,
)
for index_desc in current_todo
]
for result in self.io.launch_jobs(
run_benchmark_one, params, local=local
):
dict_merge(results, result)

todo = next_todo
todo = next_todo

if result_file is not None:
self.io.write_json(results, result_file, overwrite=True)
logger.info("end evaluate")
return results


def run_benchmark_one(params):
logger.info(params)
benchmark, results, index_desc, train, reconstruct, knn, range = params
index_desc, benchmark, results, train, reconstruct, knn, range = params
results, requires = benchmark.benchmark_one(
dry_run=False,
results=results,
Expand Down
34 changes: 23 additions & 11 deletions benchs/bench_fw/benchmark_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import os
import pickle
from dataclasses import dataclass
import submitit
from typing import Any, List, Optional
from zipfile import ZipFile

import faiss # @manual=//faiss/python:pyfaiss_gpu

import numpy as np
import submitit
from faiss.contrib.datasets import ( # @manual=//faiss/contrib:faiss_contrib_gpu
dataset_from_name,
)
Expand Down Expand Up @@ -47,6 +47,9 @@ def merge_rcq_itq(
class BenchmarkIO:
path: str

def clone(self):
return BenchmarkIO(path=self.path)

def __post_init__(self):
self.cached_ds = {}

Expand Down Expand Up @@ -119,18 +122,27 @@ def write_file(

def get_dataset(self, dataset):
if dataset not in self.cached_ds:
if dataset.namespace is not None and dataset.namespace[:4] == "std_":
if (
dataset.namespace is not None
and dataset.namespace[:4] == "std_"
):
if dataset.tablename not in self.cached_ds:
self.cached_ds[dataset.tablename] = dataset_from_name(
dataset.tablename,
)
p = dataset.namespace[4]
if p == "t":
self.cached_ds[dataset] = self.cached_ds[dataset.tablename].get_train(dataset.num_vectors)
self.cached_ds[dataset] = self.cached_ds[
dataset.tablename
].get_train(dataset.num_vectors)
elif p == "d":
self.cached_ds[dataset] = self.cached_ds[dataset.tablename].get_database()
self.cached_ds[dataset] = self.cached_ds[
dataset.tablename
].get_database()
elif p == "q":
self.cached_ds[dataset] = self.cached_ds[dataset.tablename].get_queries()
self.cached_ds[dataset] = self.cached_ds[
dataset.tablename
].get_queries()
else:
raise ValueError
elif dataset.namespace == "syn":
Expand Down Expand Up @@ -233,8 +245,8 @@ def launch_jobs(self, func, params, local=True):
if local:
results = [func(p) for p in params]
return results
print(f'launching {len(params)} jobs')
executor = submitit.AutoExecutor(folder='/checkpoint/gsz/jobs')
logger.info(f"launching {len(params)} jobs")
executor = submitit.AutoExecutor(folder="/checkpoint/gsz/jobs")
executor.update_parameters(
nodes=1,
gpus_per_node=8,
Expand All @@ -248,9 +260,9 @@ def launch_jobs(self, func, params, local=True):
slurm_constraint="bldg1",
)
jobs = executor.map_array(func, params)
print(f'launched {len(jobs)} jobs')
# for job, param in zip(jobs, params):
# print(f"{job.job_id=} {param=}")
logger.info(f"launched {len(jobs)} jobs")
for job, param in zip(jobs, params):
logger.info(f"{job.job_id=} {param[0]=}")
results = [job.result() for job in jobs]
print(f'received {len(results)} results')
print(f"received {len(results)} results")
return results
5 changes: 4 additions & 1 deletion benchs/bench_fw/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import faiss # @manual=//faiss/python:pyfaiss_gpu
from .utils import timer

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -101,7 +102,9 @@ def k_means(self, io, k, dry_run):
tablename=f"{self.get_filename()}kmeans_{k}.npy"
)
meta_filename = kmeans_vectors.tablename + ".json"
if not io.file_exist(kmeans_vectors.tablename) or not io.file_exist(meta_filename):
if not io.file_exist(kmeans_vectors.tablename) or not io.file_exist(
meta_filename
):
if dry_run:
return None, None, kmeans_vectors.tablename
x = io.get_dataset(self)
Expand Down
Loading

0 comments on commit 1d0e8d4

Please sign in to comment.