diff --git a/crates/service/src/algorithms/clustering/elkan_k_means.rs b/crates/service/src/algorithms/clustering/elkan_k_means.rs index e746568b7..9dc7fd97b 100644 --- a/crates/service/src/algorithms/clustering/elkan_k_means.rs +++ b/crates/service/src/algorithms/clustering/elkan_k_means.rs @@ -3,6 +3,8 @@ use crate::utils::vec2::Vec2; use base::scalar::FloatCast; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}; +use rayon::slice::ParallelSliceMut; use std::ops::{Index, IndexMut}; pub struct ElkanKMeans { @@ -32,13 +34,16 @@ impl ElkanKMeans { centroids[0].copy_from_slice(&samples[rand.gen_range(0..n)]); let mut weight = vec![F32::infinity(); n]; + let mut dis = vec![F32::zero(); n]; for i in 0..c { let mut sum = F32::zero(); + dis.par_iter_mut().enumerate().for_each(|(j, x)| { + *x = S::elkan_k_means_distance(&samples[j], ¢roids[i]); + }); for j in 0..n { - let dis = S::elkan_k_means_distance(&samples[j], ¢roids[i]); - lowerbound[(j, i)] = dis; - if dis * dis < weight[j] { - weight[j] = dis * dis; + lowerbound[(j, i)] = dis[j]; + if dis[j] * dis[j] < weight[j] { + weight[j] = dis[j] * dis[j]; } sum += weight[j]; } @@ -132,11 +137,16 @@ impl ElkanKMeans { // Step 1 let mut dist0 = Square::new(c, c); let mut sp = vec![F32::zero(); c]; - for i in 0..c { - for j in i + 1..c { - let dis = S::elkan_k_means_distance(¢roids[i], ¢roids[j]) * 0.5; - dist0[(i, j)] = dis; - dist0[(j, i)] = dis; + dist0.v.par_iter_mut().enumerate().for_each(|(ii, v)| { + let i = ii / c; + let j = ii % c; + if i <= j { + *v = S::elkan_k_means_distance(¢roids[i], ¢roids[j]) * 0.5; + } + }); + for i in 1..c { + for j in 0..i - 1 { + dist0[(i, j)] = dist0[(j, i)]; } } for i in 0..c { @@ -153,12 +163,18 @@ impl ElkanKMeans { sp[i] = minimal; } + let mut dis = vec![F32::zero(); n]; + dis.par_iter_mut().enumerate().for_each(|(i, x)| { + if upperbound[i] > sp[assign[i]] { + *x = S::elkan_k_means_distance(&samples[i], ¢roids[assign[i]]); + } + }); for i in 0..n { // Step 2 if upperbound[i] <= sp[assign[i]] { continue; } - let mut minimal = S::elkan_k_means_distance(&samples[i], ¢roids[assign[i]]); + let mut minimal = dis[i]; lowerbound[(i, assign[i])] = minimal; upperbound[i] = minimal; // Step 3 @@ -191,9 +207,9 @@ impl ElkanKMeans { centroids.fill(S::Scalar::zero()); for i in 0..n { for j in 0..dims as usize { - centroids[assign[i]][j] += samples[i][j]; + centroids[self.assign[i]][j] += samples[i][j]; } - count[assign[i]] += 1.0; + count[self.assign[i]] += 1.0; } for i in 0..c { if count[i] == F32::zero() { @@ -229,22 +245,23 @@ impl ElkanKMeans { count[i] = count[o] / 2.0; count[o] = count[o] - count[i]; } - for i in 0..c { - S::elkan_k_means_normalize(&mut centroids[i]); - } + centroids.par_chunks_mut(dims as usize).for_each(|v| { + S::elkan_k_means_normalize(v); + }); // Step 5, 6 let mut dist1 = vec![F32::zero(); c]; - for i in 0..c { - dist1[i] = S::elkan_k_means_distance(&old[i], ¢roids[i]); - } + dist1.par_iter_mut().enumerate().for_each(|(i, v)| { + *v = S::elkan_k_means_distance(&old[i], ¢roids[i]); + }); for i in 0..n { for j in 0..c { - lowerbound[(i, j)] = std::cmp::max(lowerbound[(i, j)] - dist1[j], F32::zero()); + self.lowerbound[(i, j)] = + std::cmp::max(self.lowerbound[(i, j)] - dist1[j], F32::zero()); } } for i in 0..n { - upperbound[i] += dist1[assign[i]]; + self.upperbound[i] += dist1[self.assign[i]]; } change == 0 diff --git a/crates/service/src/algorithms/flat.rs b/crates/service/src/algorithms/flat.rs index 73d6c8d78..7ee1f1a5b 100644 --- a/crates/service/src/algorithms/flat.rs +++ b/crates/service/src/algorithms/flat.rs @@ -99,6 +99,7 @@ pub fn make( options.clone(), idx_opts.quantization, &raw, + (0..raw.len()).collect::>(), ); FlatRam { raw, quantization } } diff --git a/crates/service/src/algorithms/hnsw.rs b/crates/service/src/algorithms/hnsw.rs index 82f233b16..cf2ced090 100644 --- a/crates/service/src/algorithms/hnsw.rs +++ b/crates/service/src/algorithms/hnsw.rs @@ -150,6 +150,7 @@ pub fn make( options.clone(), quantization_opts, &raw, + (0..raw.len()).collect::>(), ); let n = raw.len(); let graph = HnswRamGraph { diff --git a/crates/service/src/algorithms/ivf/ivf_naive.rs b/crates/service/src/algorithms/ivf/ivf_naive.rs index 85c50d3f3..278570236 100644 --- a/crates/service/src/algorithms/ivf/ivf_naive.rs +++ b/crates/service/src/algorithms/ivf/ivf_naive.rs @@ -8,20 +8,18 @@ use crate::index::IndexOptions; use crate::index::SearchOptions; use crate::index::VectorOptions; use crate::prelude::*; -use crate::utils::cells::SyncUnsafeCell; use crate::utils::dir_ops::sync_dir; use crate::utils::element_heap::ElementHeap; use crate::utils::mmap_array::MmapArray; use crate::utils::vec2::Vec2; use rand::seq::index::sample; use rand::thread_rng; -use rayon::prelude::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator}; +use rayon::prelude::ParallelIterator; use std::cmp::Reverse; use std::collections::BinaryHeap; use std::fs::create_dir; use std::path::Path; -use std::sync::atomic::AtomicU32; -use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use std::sync::Arc; pub struct IvfNaive { @@ -90,8 +88,8 @@ pub struct IvfRam { nlist: u32, // ---------------------- centroids: Vec2, - heads: Vec, - nexts: Vec>, + ptr: Vec, + payloads: Vec, } unsafe impl Send for IvfRam {} @@ -106,8 +104,8 @@ pub struct IvfMmap { nlist: u32, // ---------------------- centroids: MmapArray, - heads: MmapArray, - nexts: MmapArray, + ptr: MmapArray, + payloads: MmapArray, } unsafe impl Send for IvfMmap {} @@ -141,12 +139,6 @@ pub fn make( sealed, growing, )); - let quantization = Quantization::open( - &path.join("quantization"), - options.clone(), - quantization_opts, - &raw, - ); let n = raw.len(); let m = std::cmp::min(nsample, n); let f = sample(&mut thread_rng(), n as usize, m as usize).into_vec(); @@ -165,70 +157,68 @@ pub fn make( } } let centroids = k_means.finish(); - let heads = { - let mut heads = Vec::with_capacity(nlist as usize); - heads.resize_with(nlist as usize, || AtomicU32::new(u32::MAX)); - heads - }; - let nexts = { - let mut nexts = Vec::with_capacity(nlist as usize); - nexts.resize_with(n as usize, || SyncUnsafeCell::new(u32::MAX)); - nexts - }; - (0..n).into_par_iter().for_each(|i| { - let mut vector = S::ref_to_owned(raw.vector(i)); + let mut idx = vec![0usize; n as usize]; + idx.par_iter_mut().enumerate().for_each(|(i, x)| { + let mut vector = S::ref_to_owned(raw.vector(i as u32)); S::elkan_k_means_normalize2(&mut vector); let mut result = (F32::infinity(), 0); - for i in 0..nlist { - let dis = S::elkan_k_means_distance2(S::owned_to_ref(&vector), ¢roids[i as usize]); + for i in 0..nlist as usize { + let dis = S::elkan_k_means_distance2(S::owned_to_ref(&vector), ¢roids[i]); result = std::cmp::min(result, (dis, i)); } - let centroid_id = result.1; - loop { - let next = heads[centroid_id as usize].load(Acquire); - unsafe { - nexts[i as usize].get().write(next); - } - let o = &heads[centroid_id as usize]; - if o.compare_exchange(next, i, Release, Relaxed).is_ok() { - break; - } - } + *x = result.1; }); + let mut invlists_ids = vec![Vec::new(); nlist as usize]; + let mut invlists_payloads = vec![Vec::new(); nlist as usize]; + for i in 0..n { + invlists_ids[idx[i as usize]].push(i); + invlists_payloads[idx[i as usize]].push(raw.payload(i)); + } + let permutation = Vec::from_iter((0..nlist).flat_map(|i| &invlists_ids[i as usize]).copied()); + let payloads = Vec::from_iter( + (0..nlist) + .flat_map(|i| &invlists_payloads[i as usize]) + .copied(), + ); + let quantization = Quantization::create( + &path.join("quantization"), + options.clone(), + quantization_opts, + &raw, + permutation, + ); + let mut ptr = vec![0usize; nlist as usize + 1]; + for i in 0..nlist { + ptr[i as usize + 1] = ptr[i as usize] + invlists_ids[i as usize].len(); + } IvfRam { raw, quantization, centroids, - heads, - nexts, nlist, dims, + ptr, + payloads, } } -pub fn save(mut ram: IvfRam, path: &Path) -> IvfMmap { +pub fn save(ram: IvfRam, path: &Path) -> IvfMmap { let centroids = MmapArray::create( &path.join("centroids"), (0..ram.nlist) .flat_map(|i| &ram.centroids[i as usize]) .copied(), ); - let heads = MmapArray::create( - &path.join("heads"), - ram.heads.iter_mut().map(|x| *x.get_mut()), - ); - let nexts = MmapArray::create( - &path.join("nexts"), - ram.nexts.iter_mut().map(|x| *x.get_mut()), - ); + let ptr = MmapArray::create(&path.join("ptr"), ram.ptr.iter().copied()); + let payloads = MmapArray::create(&path.join("payload"), ram.payloads.iter().copied()); IvfMmap { raw: ram.raw, quantization: ram.quantization, dims: ram.dims, nlist: ram.nlist, centroids, - heads, - nexts, + ptr, + payloads, } } @@ -241,8 +231,8 @@ pub fn open(path: &Path, options: IndexOptions) -> IvfMmap { &raw, ); let centroids = MmapArray::open(&path.join("centroids")); - let heads = MmapArray::open(&path.join("heads")); - let nexts = MmapArray::open(&path.join("nexts")); + let ptr = MmapArray::open(&path.join("ptr")); + let payloads = MmapArray::open(&path.join("payload")); let IvfIndexingOptions { nlist, .. } = options.indexing.unwrap_ivf(); IvfMmap { raw, @@ -250,8 +240,8 @@ pub fn open(path: &Path, options: IndexOptions) -> IvfMmap { dims: options.vector.dims, nlist, centroids, - heads, - nexts, + ptr, + payloads, } } @@ -277,14 +267,14 @@ pub fn basic( let lists = lists.into_sorted_vec(); let mut result = BinaryHeap::new(); for i in lists.iter().map(|e| e.payload as usize) { - let mut j = mmap.heads[i]; - while u32::MAX != j { - let payload = mmap.raw.payload(j); + let start = mmap.ptr[i]; + let end = mmap.ptr[i + 1]; + for j in start..end { + let payload = mmap.payloads[j]; if filter.check(payload) { - let distance = mmap.quantization.distance(vector, j); + let distance = mmap.quantization.distance(vector, j as u32); result.push(Reverse(Element { distance, payload })); } - j = mmap.nexts[j as usize]; } } result @@ -311,15 +301,15 @@ pub fn vbase<'a, S: G>( } let lists = lists.into_sorted_vec(); let mut result = Vec::new(); - for i in lists.iter().map(|e| e.payload as u32) { - let mut j = mmap.heads[i as usize]; - while u32::MAX != j { - let payload = mmap.raw.payload(j); + for i in lists.iter().map(|e| e.payload as usize) { + let start = mmap.ptr[i]; + let end = mmap.ptr[i + 1]; + for j in start..end { + let payload = mmap.payloads[j]; if filter.check(payload) { - let distance = mmap.quantization.distance(vector, j); + let distance = mmap.quantization.distance(vector, j as u32); result.push(Element { distance, payload }); } - j = mmap.nexts[j as usize]; } } (result, Box::new(std::iter::empty())) diff --git a/crates/service/src/algorithms/ivf/ivf_pq.rs b/crates/service/src/algorithms/ivf/ivf_pq.rs index 8294dac22..e2ca36b17 100644 --- a/crates/service/src/algorithms/ivf/ivf_pq.rs +++ b/crates/service/src/algorithms/ivf/ivf_pq.rs @@ -9,19 +9,19 @@ use crate::index::IndexOptions; use crate::index::SearchOptions; use crate::index::VectorOptions; use crate::prelude::*; -use crate::utils::cells::SyncUnsafeCell; use crate::utils::dir_ops::sync_dir; use crate::utils::element_heap::ElementHeap; use crate::utils::mmap_array::MmapArray; use crate::utils::vec2::Vec2; use rand::seq::index::sample; use rand::thread_rng; +use rayon::iter::IntoParallelRefMutIterator; +use rayon::iter::{IndexedParallelIterator, ParallelIterator}; +use rayon::prelude::ParallelSliceMut; use std::cmp::Reverse; use std::collections::BinaryHeap; use std::fs::create_dir; use std::path::Path; -use std::sync::atomic::AtomicU32; -use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use std::sync::Arc; pub struct IvfPq { @@ -90,8 +90,8 @@ pub struct IvfRam { nlist: u32, // ---------------------- centroids: Vec2, - heads: Vec, - nexts: Vec>, + ptr: Vec, + payloads: Vec, } unsafe impl Send for IvfRam {} @@ -106,8 +106,8 @@ pub struct IvfMmap { nlist: u32, // ---------------------- centroids: MmapArray, - heads: MmapArray, - nexts: MmapArray, + ptr: MmapArray, + payloads: MmapArray, } unsafe impl Send for IvfMmap {} @@ -159,93 +159,96 @@ pub fn make( } } let centroids = k_means.finish(); - let heads = { - let mut heads = Vec::with_capacity(nlist as usize); - heads.resize_with(nlist as usize, || AtomicU32::new(u32::MAX)); - heads - }; - let nexts = { - let mut nexts = Vec::with_capacity(nlist as usize); - nexts.resize_with(n as usize, || SyncUnsafeCell::new(u32::MAX)); - nexts + let mut idx = vec![0usize; n as usize]; + idx.par_iter_mut().enumerate().for_each(|(i, x)| { + let mut vector = S::ref_to_owned(raw.vector(i as u32)); + S::elkan_k_means_normalize2(&mut vector); + let mut result = (F32::infinity(), 0); + for i in 0..nlist as usize { + let dis = S::elkan_k_means_distance2(S::owned_to_ref(&vector), ¢roids[i]); + result = std::cmp::min(result, (dis, i)); + } + *x = result.1; + }); + let mut invlists_ids = vec![Vec::new(); nlist as usize]; + let mut invlists_payloads = vec![Vec::new(); nlist as usize]; + for i in 0..n { + invlists_ids[idx[i as usize]].push(i); + invlists_payloads[idx[i as usize]].push(raw.payload(i)); + } + let mut ptr = vec![0usize; nlist as usize + 1]; + for i in 0..nlist { + ptr[i as usize + 1] = ptr[i as usize] + invlists_ids[i as usize].len(); + } + let ids = Vec::from_iter((0..nlist).flat_map(|i| &invlists_ids[i as usize]).copied()); + let payloads = Vec::from_iter( + (0..nlist) + .flat_map(|i| &invlists_payloads[i as usize]) + .copied(), + ); + sync_dir(path); + let residuals = { + let mut residuals = Vec2::new(options.vector.dims, n as usize); + residuals + .par_chunks_mut(dims as usize) + .enumerate() + .for_each(|(i, v)| { + for j in 0..dims { + v[j as usize] = S::to_dense(raw.vector(ids[i])).as_ref()[j as usize] + - centroids[idx[ids[i] as usize]][j as usize]; + } + }); + residuals }; - let quantization = ProductQuantization::with_normalizer( + let mut quantization = ProductQuantization::encode( &path.join("quantization"), options.clone(), quantization_opts, - &raw, - |i, target| { - let mut vector = target.to_vec(); - S::elkan_k_means_normalize(&mut vector); - let mut result = (F32::infinity(), 0); - for i in 0..nlist { - let dis = S::elkan_k_means_distance(&vector, ¢roids[i as usize]); - result = std::cmp::min(result, (dis, i)); - } - let centroid_id = result.1; - loop { - let next = heads[centroid_id as usize].load(Acquire); - unsafe { - nexts[i as usize].get().write(next); - } - let o = &heads[centroid_id as usize]; - if o.compare_exchange(next, i, Release, Relaxed).is_ok() { - break; - } - } - for i in 0..dims { - target[i as usize] -= centroids[centroid_id as usize][i as usize]; - } - }, + &residuals, ); + quantization.precompute_table(&path.join("quantization"), ¢roids); IvfRam { raw, quantization, centroids, - heads, - nexts, nlist, dims, + ptr, + payloads, } } -pub fn save(mut ram: IvfRam, path: &Path) -> IvfMmap { +pub fn save(ram: IvfRam, path: &Path) -> IvfMmap { let centroids = MmapArray::create( &path.join("centroids"), (0..ram.nlist) .flat_map(|i| &ram.centroids[i as usize]) .copied(), ); - let heads = MmapArray::create( - &path.join("heads"), - ram.heads.iter_mut().map(|x| *x.get_mut()), - ); - let nexts = MmapArray::create( - &path.join("nexts"), - ram.nexts.iter_mut().map(|x| *x.get_mut()), - ); + let ptr = MmapArray::create(&path.join("ptr"), ram.ptr.iter().copied()); + let payloads = MmapArray::create(&path.join("payload"), ram.payloads.iter().copied()); IvfMmap { raw: ram.raw, quantization: ram.quantization, dims: ram.dims, nlist: ram.nlist, centroids, - heads, - nexts, + ptr, + payloads, } } pub fn open(path: &Path, options: IndexOptions) -> IvfMmap { let raw = Arc::new(Raw::open(&path.join("raw"), options.clone())); - let quantization = ProductQuantization::open( + let quantization = ProductQuantization::open2( &path.join("quantization"), options.clone(), options.indexing.clone().unwrap_ivf().quantization, &raw, ); let centroids = MmapArray::open(&path.join("centroids")); - let heads = MmapArray::open(&path.join("heads")); - let nexts = MmapArray::open(&path.join("nexts")); + let ptr = MmapArray::open(&path.join("ptr")); + let payloads = MmapArray::open(&path.join("payload")); let IvfIndexingOptions { nlist, .. } = options.indexing.unwrap_ivf(); IvfMmap { raw, @@ -253,8 +256,8 @@ pub fn open(path: &Path, options: IndexOptions) -> IvfMmap { dims: options.vector.dims, nlist, centroids, - heads, - nexts, + ptr, + payloads, } } @@ -264,12 +267,10 @@ pub fn basic( nprobe: u32, mut filter: impl Filter, ) -> BinaryHeap> { - let mut target = S::ref_to_owned(vector); - S::elkan_k_means_normalize2(&mut target); let mut lists = ElementHeap::new(nprobe as usize); for i in 0..mmap.nlist { let centroid = mmap.centroids(i); - let distance = S::elkan_k_means_distance2(S::owned_to_ref(&target), centroid); + let distance = S::distance2(vector, centroid); if lists.check(distance) { lists.push(Element { distance, @@ -277,19 +278,27 @@ pub fn basic( }); } } + let runtime_table = mmap.quantization.init_query(S::to_dense(vector).as_ref()); let lists = lists.into_sorted_vec(); let mut result = BinaryHeap::new(); - for i in lists.iter().map(|e| e.payload as u32) { - let mut j = mmap.heads[i as usize]; - while u32::MAX != j { - let payload = mmap.raw.payload(j); + for i in lists.iter() { + let key = i.payload as usize; + let coarse_dis = i.distance; + let start = mmap.ptr[key]; + let end = mmap.ptr[key + 1]; + for j in start..end { + let payload = mmap.payloads[j]; if filter.check(payload) { - let distance = mmap - .quantization - .distance_with_delta(vector, j, mmap.centroids(i)); + let distance = mmap.quantization.distance_with_codes( + vector, + j as u32, + mmap.centroids(key as u32), + key, + coarse_dis, + &runtime_table, + ); result.push(Reverse(Element { distance, payload })); } - j = mmap.nexts[j as usize]; } } result @@ -301,12 +310,10 @@ pub fn vbase<'a, S: G>( nprobe: u32, mut filter: impl Filter + 'a, ) -> (Vec, Box<(dyn Iterator + 'a)>) { - let mut target = S::ref_to_owned(vector); - S::elkan_k_means_normalize2(&mut target); let mut lists = ElementHeap::new(nprobe as usize); for i in 0..mmap.nlist { let centroid = mmap.centroids(i); - let distance = S::elkan_k_means_distance2(S::owned_to_ref(&target), centroid); + let distance = S::distance2(vector, centroid); if lists.check(distance) { lists.push(Element { distance, @@ -314,19 +321,27 @@ pub fn vbase<'a, S: G>( }); } } + let runtime_table = mmap.quantization.init_query(S::to_dense(vector).as_ref()); let lists = lists.into_sorted_vec(); let mut result = Vec::new(); - for i in lists.iter().map(|e| e.payload as u32) { - let mut j = mmap.heads[i as usize]; - while u32::MAX != j { - let payload = mmap.raw.payload(j); + for i in lists.iter() { + let key = i.payload as usize; + let coarse_dis = i.distance; + let start = mmap.ptr[key]; + let end = mmap.ptr[key + 1]; + for j in start..end { + let payload = mmap.payloads[j]; if filter.check(payload) { - let distance = mmap - .quantization - .distance_with_delta(vector, j, mmap.centroids(i)); + let distance = mmap.quantization.distance_with_codes( + vector, + j as u32, + mmap.centroids(key as u32), + key, + coarse_dis, + &runtime_table, + ); result.push(Element { distance, payload }); } - j = mmap.nexts[j as usize]; } } (result, Box::new(std::iter::empty())) diff --git a/crates/service/src/algorithms/quantization/mod.rs b/crates/service/src/algorithms/quantization/mod.rs index 05dcc27b7..778475016 100644 --- a/crates/service/src/algorithms/quantization/mod.rs +++ b/crates/service/src/algorithms/quantization/mod.rs @@ -63,8 +63,9 @@ pub trait Quan { options: IndexOptions, quantization_options: QuantizationOptions, raw: &Arc>, + permutation: Vec, ) -> Self; - fn open( + fn open2( path: &Path, options: IndexOptions, quantization_options: QuantizationOptions, @@ -86,6 +87,7 @@ impl Quantization { options: IndexOptions, quantization_options: QuantizationOptions, raw: &Arc>, + permutation: Vec, // permutation is the mapping from placements to original ids ) -> Self { match quantization_options { QuantizationOptions::Trivial(_) => Self::Trivial(TrivialQuantization::create( @@ -93,18 +95,21 @@ impl Quantization { options, quantization_options, raw, + permutation, )), QuantizationOptions::Scalar(_) => Self::Scalar(ScalarQuantization::create( path, options, quantization_options, raw, + permutation, )), QuantizationOptions::Product(_) => Self::Product(ProductQuantization::create( path, options, quantization_options, raw, + permutation, )), } } @@ -116,19 +121,19 @@ impl Quantization { raw: &Arc>, ) -> Self { match quantization_options { - QuantizationOptions::Trivial(_) => Self::Trivial(TrivialQuantization::open( + QuantizationOptions::Trivial(_) => Self::Trivial(TrivialQuantization::open2( path, options, quantization_options, raw, )), - QuantizationOptions::Scalar(_) => Self::Scalar(ScalarQuantization::open( + QuantizationOptions::Scalar(_) => Self::Scalar(ScalarQuantization::open2( path, options, quantization_options, raw, )), - QuantizationOptions::Product(_) => Self::Product(ProductQuantization::open( + QuantizationOptions::Product(_) => Self::Product(ProductQuantization::open2( path, options, quantization_options, diff --git a/crates/service/src/algorithms/quantization/product.rs b/crates/service/src/algorithms/quantization/product.rs index 365293415..52f233c0b 100644 --- a/crates/service/src/algorithms/quantization/product.rs +++ b/crates/service/src/algorithms/quantization/product.rs @@ -9,6 +9,9 @@ use crate::utils::mmap_array::MmapArray; use crate::utils::vec2::Vec2; use rand::seq::index::sample; use rand::thread_rng; +use rayon::iter::IndexedParallelIterator; +use rayon::iter::ParallelIterator; +use rayon::slice::ParallelSliceMut; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::path::Path; @@ -63,13 +66,14 @@ pub struct ProductQuantization { ratio: u16, centroids: Vec, codes: MmapArray, + precomputed_table: Vec, } unsafe impl Send for ProductQuantization {} unsafe impl Sync for ProductQuantization {} impl ProductQuantization { - fn codes(&self, i: u32) -> &[u8] { + pub fn codes(&self, i: u32) -> &[u8] { let width = self.dims.div_ceil(self.ratio); let s = i as usize * width as usize; let e = (i + 1) as usize * width as usize; @@ -83,11 +87,19 @@ impl Quan for ProductQuantization { options: IndexOptions, quantization_options: QuantizationOptions, raw: &Arc>, + permutation: Vec, // permutation is the mapping from placements to original ids ) -> Self { - Self::with_normalizer(path, options, quantization_options, raw, |_, _| ()) + Self::with_normalizer( + path, + options, + quantization_options, + raw, + |_, _| (), + permutation, + ) } - fn open( + fn open2( path: &Path, options: IndexOptions, quantization_options: QuantizationOptions, @@ -96,11 +108,14 @@ impl Quan for ProductQuantization { let centroids = serde_json::from_slice(&std::fs::read(path.join("centroids")).unwrap()).unwrap(); let codes = MmapArray::open(&path.join("codes")); + let precomputed_table = + serde_json::from_slice(&std::fs::read(path.join("table")).unwrap()).unwrap(); Self { dims: options.vector.dims, ratio: quantization_options.unwrap_product_quantization().ratio as _, centroids, codes, + precomputed_table, } } @@ -127,6 +142,7 @@ impl ProductQuantization { quantization_options: QuantizationOptions, raw: &Raw, normalizer: F, + permutation: Vec, ) -> Self where F: Fn(u32, &mut [S::Scalar]), @@ -173,8 +189,8 @@ impl ProductQuantization { } } let codes_iter = (0..n).flat_map(|i| { - let mut vector = S::to_dense(raw.vector(i)).to_vec(); - normalizer(i, &mut vector); + let mut vector = S::to_dense(raw.vector(permutation[i as usize])).to_vec(); + normalizer(permutation[i as usize], &mut vector); let width = dims.div_ceil(ratio); let mut result = Vec::with_capacity(width as usize); for i in 0..width { @@ -207,7 +223,190 @@ impl ProductQuantization { ratio, centroids, codes, + precomputed_table: Vec::new(), + } + } + + pub fn encode( + path: &Path, + options: IndexOptions, + quantization_options: QuantizationOptions, + raw: &Vec2, + ) -> Self { + std::fs::create_dir(path).unwrap(); + let quantization_options = quantization_options.unwrap_product_quantization(); + let dims = options.vector.dims; + let ratio = quantization_options.ratio as u16; + let n = raw.len(); + let m = std::cmp::min(n, quantization_options.sample as usize); + let samples = { + let f = sample(&mut thread_rng(), n, m).into_vec(); + let mut samples = Vec2::new(options.vector.dims, m); + for i in 0..m { + samples[i].copy_from_slice(&raw[f[i]]); + } + samples + }; + let width = dims.div_ceil(ratio); + // a temp layout (width * 256 * subdims) for par_chunks_mut + let mut tmp_centroids = vec![S::Scalar::zero(); 256 * dims as usize]; + // this par_for parallelizes over sub quantizers + tmp_centroids + .par_chunks_mut(256 * ratio as usize) + .enumerate() + .for_each(|(i, v)| { + // i is the index of subquantizer + let subdims = std::cmp::min(ratio, dims - ratio * i as u16) as usize; + let mut subsamples = Vec2::new(subdims as u16, m); + for j in 0..m { + let src = &samples[j][i * ratio as usize..][..subdims]; + subsamples[j].copy_from_slice(src); + } + let mut k_means = ElkanKMeans::::new(256, subsamples); + for _ in 0..25 { + if k_means.iterate() { + break; + } + } + let centroid = k_means.finish(); + for j in 0usize..=255 { + v[j * subdims..][..subdims].copy_from_slice(¢roid[j]); + } + }); + // transform back to normal layout (256 * width * subdims) + let mut centroids = vec![S::Scalar::zero(); 256 * dims as usize]; + centroids + .par_chunks_mut(dims as usize) + .enumerate() + .for_each(|(i, v)| { + for j in 0..width { + let subdims = std::cmp::min(ratio, dims - ratio * j) as usize; + v[(j * ratio) as usize..][..subdims].copy_from_slice( + &tmp_centroids[(j * ratio) as usize * 256..][i * subdims..][..subdims], + ); + } + }); + let mut codes = vec![0u8; n * width as usize]; + codes + .par_chunks_mut(width as usize) + .enumerate() + .for_each(|(id, v)| { + let vector = raw[id].to_vec(); + let width = dims.div_ceil(ratio); + for i in 0..width { + let subdims = std::cmp::min(ratio, dims - ratio * i); + let mut minimal = F32::infinity(); + let mut target = 0u8; + let left = &vector[(i * ratio) as usize..][..subdims as usize]; + for j in 0u8..=255 { + let right = ¢roids[j as usize * dims as usize..] + [(i * ratio) as usize..][..subdims as usize]; + let dis = S::L2::distance(left, right); + if dis < minimal { + minimal = dis; + target = j; + } + } + v[i as usize] = target; + } + }); + sync_dir(path); + std::fs::write( + path.join("centroids"), + serde_json::to_string(¢roids).unwrap(), + ) + .unwrap(); + let codes = MmapArray::create(&path.join("codes"), codes.into_iter()); + Self { + dims, + ratio, + centroids, + codes, + precomputed_table: Vec::new(), + } + } + + // compute term3 at build time + pub fn precompute_table(&mut self, path: &Path, coarse_centroids: &Vec2) { + let nlist = coarse_centroids.len(); + let dims = self.dims; + let ratio = self.ratio; + let width = dims.div_ceil(ratio); + self.precomputed_table + .resize(nlist * width as usize * 256, F32::zero()); + self.precomputed_table + .par_chunks_mut(width as usize * 256) + .enumerate() + .for_each(|(i, v)| { + let x_c = &coarse_centroids[i]; + for j in 0..width { + let subdims = std::cmp::min(ratio, dims - ratio * j); + let sub_x_c = &x_c[(j * ratio) as usize..][..subdims as usize]; + for k in 0usize..256 { + let sub_x_r = &self.centroids[k * dims as usize..][(j * ratio) as usize..] + [..subdims as usize]; + v[j as usize * 256 + k] = squared_norm::(subdims, sub_x_r) + + F32(2.0) * inner_product::(subdims, sub_x_c, sub_x_r); + } + } + }); + std::fs::write( + path.join("table"), + serde_json::to_string(&self.precomputed_table).unwrap(), + ) + .unwrap(); + } + + // compute term2 at query time + pub fn init_query(&self, query: &[S::Scalar]) -> Vec { + if S::DISTANCE == Distance::Cos { + return Vec::new(); + } + let dims = self.dims; + let ratio = self.ratio; + let width = dims.div_ceil(ratio); + let mut runtime_table = vec![F32::zero(); width as usize * 256]; + for i in 0..256 { + for j in 0..width { + let subdims = std::cmp::min(ratio, dims - ratio * j); + let sub_query = &query[(j * ratio) as usize..][..subdims as usize]; + let centroid = &self.centroids[i * dims as usize..][(j * ratio) as usize..] + [..subdims as usize]; + runtime_table[j as usize * 256 + i] = + F32(-1.0) * inner_product::(subdims, sub_query, centroid); + } + } + runtime_table + } + + // add up all terms given codes + pub fn distance_with_codes( + &self, + lhs: S::VectorRef<'_>, + rhs: u32, + delta: &[S::Scalar], + key: usize, + coarse_dis: F32, + runtime_table: &[F32], + ) -> F32 { + if S::DISTANCE == Distance::Cos { + return self.distance_with_delta(lhs, rhs, delta); + } + let mut result = coarse_dis; + let codes = self.codes(rhs); + let width = self.dims.div_ceil(self.ratio); + let precomputed_table = &self.precomputed_table[key * width as usize * 256..]; + if S::DISTANCE == Distance::L2 { + for i in 0..width { + result += precomputed_table[i as usize * 256 + codes[i as usize] as usize] + + F32(2.0) * runtime_table[i as usize * 256 + codes[i as usize] as usize]; + } + } else if S::DISTANCE == Distance::Dot { + for i in 0..width { + result += runtime_table[i as usize * 256 + codes[i as usize] as usize]; + } } + result } pub fn distance_with_delta(&self, lhs: S::VectorRef<'_>, rhs: u32, delta: &[S::Scalar]) -> F32 { diff --git a/crates/service/src/algorithms/quantization/scalar.rs b/crates/service/src/algorithms/quantization/scalar.rs index 17467b430..d0e4e8c73 100644 --- a/crates/service/src/algorithms/quantization/scalar.rs +++ b/crates/service/src/algorithms/quantization/scalar.rs @@ -32,7 +32,7 @@ unsafe impl Send for ScalarQuantization {} unsafe impl Sync for ScalarQuantization {} impl ScalarQuantization { - fn codes(&self, i: u32) -> &[u8] { + pub fn codes(&self, i: u32) -> &[u8] { let s = i as usize * self.dims as usize; let e = (i + 1) as usize * self.dims as usize; &self.codes[s..e] @@ -45,6 +45,7 @@ impl Quan for ScalarQuantization { options: IndexOptions, _: QuantizationOptions, raw: &Arc>, + permutation: Vec, // permutation is the mapping from placements to original ids ) -> Self { assert!( S::KIND != Kind::SparseF32, @@ -57,7 +58,7 @@ impl Quan for ScalarQuantization { let mut min = vec![S::Scalar::infinity(); dims as usize]; let n = raw.len(); for i in 0..n { - let vector = S::to_dense(raw.vector(i)); + let vector = S::to_dense(raw.vector(permutation[i as usize])); for j in 0..dims as usize { max[j] = std::cmp::max(max[j], vector[j]); min[j] = std::cmp::min(min[j], vector[j]); @@ -66,7 +67,7 @@ impl Quan for ScalarQuantization { std::fs::write(path.join("max"), serde_json::to_string(&max).unwrap()).unwrap(); std::fs::write(path.join("min"), serde_json::to_string(&min).unwrap()).unwrap(); let codes_iter = (0..n).flat_map(|i| { - let vector = S::to_dense(raw.vector(i)); + let vector = S::to_dense(raw.vector(permutation[i as usize])); let mut result = vec![0u8; dims as usize]; for i in 0..dims as usize { let w = (((vector[i] - min[i]) / (max[i] - min[i])).to_f32() * 256.0) as u32; @@ -84,7 +85,7 @@ impl Quan for ScalarQuantization { } } - fn open(path: &Path, options: IndexOptions, _: QuantizationOptions, _: &Arc>) -> Self { + fn open2(path: &Path, options: IndexOptions, _: QuantizationOptions, _: &Arc>) -> Self { let dims = options.vector.dims; let max = serde_json::from_slice(&std::fs::read("max").unwrap()).unwrap(); let min = serde_json::from_slice(&std::fs::read("min").unwrap()).unwrap(); diff --git a/crates/service/src/algorithms/quantization/trivial.rs b/crates/service/src/algorithms/quantization/trivial.rs index 87a6b4234..d2fb75562 100644 --- a/crates/service/src/algorithms/quantization/trivial.rs +++ b/crates/service/src/algorithms/quantization/trivial.rs @@ -3,6 +3,7 @@ use crate::algorithms::quantization::QuantizationOptions; use crate::algorithms::raw::Raw; use crate::index::IndexOptions; use crate::prelude::*; +use crate::utils::dir_ops::sync_dir; use serde::{Deserialize, Serialize}; use std::path::Path; use std::sync::Arc; @@ -20,22 +21,52 @@ impl Default for TrivialQuantizationOptions { pub struct TrivialQuantization { raw: Arc>, + permutation: Vec, +} + +impl TrivialQuantization { + pub fn codes(&self, i: u32) -> S::VectorRef<'_> { + self.raw.vector(self.permutation[i as usize]) + } } impl Quan for TrivialQuantization { - fn create(_: &Path, _: IndexOptions, _: QuantizationOptions, raw: &Arc>) -> Self { - Self { raw: raw.clone() } + // permutation is the mapping from placements to original ids + fn create( + path: &Path, + _: IndexOptions, + _: QuantizationOptions, + raw: &Arc>, + permutation: Vec, + ) -> Self { + // here we cannot modify raw, so we record permutation for translation + std::fs::create_dir(path).unwrap(); + sync_dir(path); + std::fs::write( + path.join("permutation"), + serde_json::to_string(&permutation).unwrap(), + ) + .unwrap(); + Self { + raw: raw.clone(), + permutation, + } } - fn open(_: &Path, _: IndexOptions, _: QuantizationOptions, raw: &Arc>) -> Self { - Self { raw: raw.clone() } + fn open2(path: &Path, _: IndexOptions, _: QuantizationOptions, raw: &Arc>) -> Self { + let permutation = + serde_json::from_slice(&std::fs::read(path.join("permutation")).unwrap()).unwrap(); + Self { + raw: raw.clone(), + permutation, + } } fn distance(&self, lhs: S::VectorRef<'_>, rhs: u32) -> F32 { - S::distance(lhs, self.raw.vector(rhs)) + S::distance(lhs, self.codes(rhs)) } fn distance2(&self, lhs: u32, rhs: u32) -> F32 { - S::distance(self.raw.vector(lhs), self.raw.vector(rhs)) + S::distance(self.codes(lhs), self.codes(rhs)) } } diff --git a/crates/service/src/prelude/global/f16_cos.rs b/crates/service/src/prelude/global/f16_cos.rs index a8e7f3307..64c648d03 100644 --- a/crates/service/src/prelude/global/f16_cos.rs +++ b/crates/service/src/prelude/global/f16_cos.rs @@ -31,6 +31,10 @@ impl G for F16Cos { F32(1.0) - super::f16::cosine(lhs, rhs) } + fn distance2(lhs: &[F16], rhs: &[F16]) -> F32 { + F32(1.0) - super::f16::cosine(lhs, rhs) + } + fn elkan_k_means_normalize(vector: &mut [F16]) { super::f16::l2_normalize(vector) } diff --git a/crates/service/src/prelude/global/f16_dot.rs b/crates/service/src/prelude/global/f16_dot.rs index 353efba86..0d210a421 100644 --- a/crates/service/src/prelude/global/f16_dot.rs +++ b/crates/service/src/prelude/global/f16_dot.rs @@ -31,6 +31,10 @@ impl G for F16Dot { super::f16::dot(lhs, rhs) * (-1.0) } + fn distance2(lhs: &[F16], rhs: &[F16]) -> F32 { + super::f16::dot(lhs, rhs) * (-1.0) + } + fn elkan_k_means_normalize(vector: &mut [F16]) { super::f16::l2_normalize(vector) } diff --git a/crates/service/src/prelude/global/f16_l2.rs b/crates/service/src/prelude/global/f16_l2.rs index 46d95684e..14e7bddbb 100644 --- a/crates/service/src/prelude/global/f16_l2.rs +++ b/crates/service/src/prelude/global/f16_l2.rs @@ -31,6 +31,10 @@ impl G for F16L2 { super::f16::sl2(lhs, rhs) } + fn distance2(lhs: &[F16], rhs: &[F16]) -> F32 { + super::f16::sl2(lhs, rhs) + } + fn elkan_k_means_normalize(_: &mut [F16]) {} fn elkan_k_means_normalize2(_: &mut Vec) {} diff --git a/crates/service/src/prelude/global/f32_cos.rs b/crates/service/src/prelude/global/f32_cos.rs index 9f3539722..c9e75f92a 100644 --- a/crates/service/src/prelude/global/f32_cos.rs +++ b/crates/service/src/prelude/global/f32_cos.rs @@ -30,6 +30,10 @@ impl G for F32Cos { F32(1.0) - super::f32::cosine(lhs, rhs) } + fn distance2(lhs: &[F32], rhs: &[F32]) -> F32 { + F32(1.0) - super::f32::cosine(lhs, rhs) + } + fn elkan_k_means_normalize(vector: &mut [F32]) { super::f32::l2_normalize(vector) } diff --git a/crates/service/src/prelude/global/f32_dot.rs b/crates/service/src/prelude/global/f32_dot.rs index c3574662a..d4f58632b 100644 --- a/crates/service/src/prelude/global/f32_dot.rs +++ b/crates/service/src/prelude/global/f32_dot.rs @@ -30,6 +30,10 @@ impl G for F32Dot { super::f32::dot(lhs, rhs) * (-1.0) } + fn distance2(lhs: &[F32], rhs: &[F32]) -> F32 { + super::f32::dot(lhs, rhs) * (-1.0) + } + fn elkan_k_means_normalize(vector: &mut [F32]) { super::f32::l2_normalize(vector) } diff --git a/crates/service/src/prelude/global/f32_l2.rs b/crates/service/src/prelude/global/f32_l2.rs index 9836f4b03..815d67303 100644 --- a/crates/service/src/prelude/global/f32_l2.rs +++ b/crates/service/src/prelude/global/f32_l2.rs @@ -30,6 +30,10 @@ impl G for F32L2 { super::f32::sl2(lhs, rhs) } + fn distance2(lhs: &[F32], rhs: &[F32]) -> F32 { + super::f32::sl2(lhs, rhs) + } + fn elkan_k_means_normalize(_: &mut [F32]) {} fn elkan_k_means_normalize2(_: &mut Vec) {} diff --git a/crates/service/src/prelude/global/mod.rs b/crates/service/src/prelude/global/mod.rs index 802b09a40..857847433 100644 --- a/crates/service/src/prelude/global/mod.rs +++ b/crates/service/src/prelude/global/mod.rs @@ -22,6 +22,7 @@ pub use sparse_f32_dot::SparseF32Dot; pub use sparse_f32_l2::SparseF32L2; use crate::prelude::*; +use base::scalar::FloatCast; use serde::{Deserialize, Serialize}; use std::{ borrow::Cow, @@ -43,7 +44,7 @@ pub trait G: Copy + Debug + 'static { + Zero + num_traits::NumOps + num_traits::NumAssignOps - + base::scalar::FloatCast; + + FloatCast; type Storage: for<'a> Storage = Self::VectorRef<'a>>; type L2: for<'a> G = &'a [Self::Scalar]>; type VectorOwned: Vector + Clone + Serialize + for<'a> Deserialize<'a>; @@ -58,6 +59,7 @@ pub trait G: Copy + Debug + 'static { fn ref_to_owned(vector: Self::VectorRef<'_>) -> Self::VectorOwned; fn to_dense(vector: Self::VectorRef<'_>) -> Cow<'_, [Self::Scalar]>; fn distance(lhs: Self::VectorRef<'_>, rhs: Self::VectorRef<'_>) -> F32; + fn distance2(lhs: Self::VectorRef<'_>, rhs: &[Self::Scalar]) -> F32; fn elkan_k_means_normalize(vector: &mut [Self::Scalar]); fn elkan_k_means_normalize2(vector: &mut Self::VectorOwned); @@ -143,3 +145,19 @@ pub enum Kind { F16, SparseF32, } + +pub fn squared_norm(dims: u16, vec: &[S::Scalar]) -> F32 { + let mut result = F32::zero(); + for i in 0..dims as usize { + result += F32((vec[i] * vec[i]).to_f32()); + } + result +} + +pub fn inner_product(dims: u16, lhs: &[S::Scalar], rhs: &[S::Scalar]) -> F32 { + let mut result = F32::zero(); + for i in 0..dims as usize { + result += F32((lhs[i] * rhs[i]).to_f32()); + } + result +} diff --git a/crates/service/src/prelude/global/sparse_f32_cos.rs b/crates/service/src/prelude/global/sparse_f32_cos.rs index 2de41328a..372f8d744 100644 --- a/crates/service/src/prelude/global/sparse_f32_cos.rs +++ b/crates/service/src/prelude/global/sparse_f32_cos.rs @@ -31,6 +31,10 @@ impl G for SparseF32Cos { F32(1.0) - super::sparse_f32::cosine(lhs, rhs) } + fn distance2(_lhs: Self::VectorRef<'_>, _rhs: &[Self::Scalar]) -> F32 { + unimplemented!() + } + fn elkan_k_means_normalize(vector: &mut [Self::Scalar]) { super::f32::l2_normalize(vector) } diff --git a/crates/service/src/prelude/global/sparse_f32_dot.rs b/crates/service/src/prelude/global/sparse_f32_dot.rs index 728775759..8c19bc987 100644 --- a/crates/service/src/prelude/global/sparse_f32_dot.rs +++ b/crates/service/src/prelude/global/sparse_f32_dot.rs @@ -31,6 +31,10 @@ impl G for SparseF32Dot { super::sparse_f32::dot(lhs, rhs) * (-1.0) } + fn distance2(_lhs: Self::VectorRef<'_>, _rhs: &[Self::Scalar]) -> F32 { + unimplemented!() + } + fn elkan_k_means_normalize(vector: &mut [Self::Scalar]) { super::f32::l2_normalize(vector) } diff --git a/crates/service/src/prelude/global/sparse_f32_l2.rs b/crates/service/src/prelude/global/sparse_f32_l2.rs index ed3a0330d..b17559e7b 100644 --- a/crates/service/src/prelude/global/sparse_f32_l2.rs +++ b/crates/service/src/prelude/global/sparse_f32_l2.rs @@ -31,6 +31,10 @@ impl G for SparseF32L2 { super::sparse_f32::sl2(lhs, rhs) } + fn distance2(_lhs: Self::VectorRef<'_>, _rhs: &[Self::Scalar]) -> F32 { + unimplemented!() + } + fn elkan_k_means_normalize(_: &mut [Self::Scalar]) {} fn elkan_k_means_normalize2(_: &mut SparseF32) {} diff --git a/crates/service/src/utils/cells.rs b/crates/service/src/utils/cells.rs deleted file mode 100644 index 83a0a7a57..000000000 --- a/crates/service/src/utils/cells.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::cell::UnsafeCell; - -#[repr(transparent)] -pub struct SyncUnsafeCell { - value: UnsafeCell, -} - -unsafe impl Sync for SyncUnsafeCell {} - -impl SyncUnsafeCell { - pub const fn new(value: T) -> Self { - Self { - value: UnsafeCell::new(value), - } - } -} - -impl SyncUnsafeCell { - pub fn get(&self) -> *mut T { - self.value.get() - } - - pub fn get_mut(&mut self) -> &mut T { - self.value.get_mut() - } -} diff --git a/crates/service/src/utils/mod.rs b/crates/service/src/utils/mod.rs index c07fa2005..1b7c91b7d 100644 --- a/crates/service/src/utils/mod.rs +++ b/crates/service/src/utils/mod.rs @@ -1,4 +1,3 @@ -pub mod cells; pub mod clean; pub mod dir_ops; pub mod element_heap;