From 8b1ac790bf9bea30a86b773fd1875a036cf92836 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 13 May 2024 11:49:04 -0700 Subject: [PATCH] use the file metadata cache in the index store --- rust/lance-core/src/cache.rs | 2 +- rust/lance-index/src/scalar/lance_format.rs | 25 ++++++++++++++++----- rust/lance/benches/scalar_index.rs | 2 +- rust/lance/src/dataset/index.rs | 16 +++++++++++++ rust/lance/src/dataset/scanner.rs | 2 +- rust/lance/src/index.rs | 4 ++-- rust/lance/src/index/append.rs | 4 ++-- rust/lance/src/index/scalar.rs | 14 +++++------- 8 files changed, 48 insertions(+), 21 deletions(-) diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs index 094184fb81..c92c0ea96f 100644 --- a/rust/lance-core/src/cache.rs +++ b/rust/lance-core/src/cache.rs @@ -20,7 +20,7 @@ type ArcAny = Arc; /// Cache for various metadata about files. /// /// The cache is keyed by the file path and the type of metadata. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct FileMetadataCache { cache: Arc>, } diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index e595f2f79c..fb3d0ebcc5 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -14,7 +14,7 @@ use lance_file::{ }; use snafu::{location, Location}; -use lance_core::{Error, Result}; +use lance_core::{cache::FileMetadataCache, Error, Result}; use lance_io::{object_store::ObjectStore, ReadBatchParams}; use lance_table::{format::SelfDescribingFileReader, io::manifest::ManifestDescribing}; use object_store::path::Path; @@ -30,14 +30,20 @@ use super::{IndexReader, IndexStore, IndexWriter}; pub struct LanceIndexStore { object_store: ObjectStore, index_dir: Path, + metadata_cache: Option, } impl LanceIndexStore { /// Create a new index store at the given directory - pub fn new(object_store: ObjectStore, index_dir: Path) -> Self { + pub fn new( + object_store: ObjectStore, + index_dir: Path, + metadata_cache: Option, + ) -> Self { Self { object_store, index_dir, + metadata_cache, } } } @@ -97,9 +103,12 @@ impl IndexStore for LanceIndexStore { async fn open_index_file(&self, name: &str) -> Result> { let path = self.index_dir.child(name); - // TODO: Should probably provide file metadata cache here - let file_reader = - FileReader::try_new_self_described(&self.object_store, &path, None).await?; + let file_reader = FileReader::try_new_self_described( + &self.object_store, + &path, + self.metadata_cache.as_ref(), + ) + .await?; Ok(Arc::new(file_reader)) } @@ -167,7 +176,11 @@ mod tests { let test_path: &Path = tempdir.path(); let (object_store, test_path) = ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap(); - Arc::new(LanceIndexStore::new(object_store, test_path.to_owned())) + Arc::new(LanceIndexStore::new( + object_store, + test_path.to_owned(), + None, + )) } struct MockTrainingSource { diff --git a/rust/lance/benches/scalar_index.rs b/rust/lance/benches/scalar_index.rs index 9c951fe6d0..6b587038bb 100644 --- a/rust/lance/benches/scalar_index.rs +++ b/rust/lance/benches/scalar_index.rs @@ -57,7 +57,7 @@ impl BenchmarkFixture { let test_path = tempdir.path(); let (object_store, test_path) = ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap(); - Arc::new(LanceIndexStore::new(object_store, test_path)) + Arc::new(LanceIndexStore::new(object_store, test_path, None)) } async fn write_baseline_data(tempdir: &TempDir) -> Arc { diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index e67a4a78e8..496b5d79b7 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; use lance_core::Result; +use lance_index::scalar::lance_format::LanceIndexStore; use lance_index::DatasetIndexExt; use lance_table::format::Index; use serde::{Deserialize, Serialize}; @@ -68,3 +69,18 @@ impl IndexRemapper for DatasetIndexRemapper { Ok(remapped) } } + +pub trait LanceIndexStoreExt { + fn from_dataset(dataset: &Dataset, uuid: &str) -> Self; +} + +impl LanceIndexStoreExt for LanceIndexStore { + fn from_dataset(dataset: &Dataset, uuid: &str) -> Self { + let index_dir = dataset.indices_dir().child(uuid); + LanceIndexStore::new( + dataset.object_store.as_ref().clone(), + index_dir, + Some(dataset.session.file_metadata_cache.clone()), + ) + } +} diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index f0cfd9ea61..fd56447669 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3542,7 +3542,7 @@ mod test { .await .unwrap(); let second_index_scan_bytes = get_bytes() - start_bytes; - assert!(second_index_scan_bytes < index_scan_bytes); + assert!(second_index_scan_bytes < filtered_scan_bytes); } #[tokio::test] diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index ef2b6b2513..42bfe2c5f1 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -38,6 +38,7 @@ pub(crate) mod prefilter; pub mod scalar; pub mod vector; +use crate::dataset::index::LanceIndexStoreExt; pub use crate::index::prefilter::{FilterLoader, PreFilter}; use crate::dataset::transaction::{Operation, Transaction}; @@ -93,8 +94,7 @@ pub(crate) async fn remap_index( match generic.index_type() { IndexType::Scalar => { - let index_dir = dataset.indices_dir().child(new_id.to_string()); - let new_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir); + let new_store = LanceIndexStore::from_dataset(dataset, &new_id.to_string()); let scalar_index = dataset .open_scalar_index(&field.name, &index_id.to_string()) diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 532090bfd4..2e121c0a35 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -14,6 +14,7 @@ use uuid::Uuid; use super::vector::ivf::optimize_vector_indices; use super::DatasetIndexInternalExt; +use crate::dataset::index::LanceIndexStoreExt; use crate::dataset::scanner::ColumnOrdering; use crate::dataset::Dataset; @@ -95,8 +96,7 @@ pub async fn merge_indices<'a>( let new_uuid = Uuid::new_v4(); - let index_dir = dataset.indices_dir().child(new_uuid.to_string()); - let new_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir); + let new_store = LanceIndexStore::from_dataset(&dataset, &new_uuid.to_string()); index.update(new_data_stream.into(), &new_store).await?; diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index cf20e3d25e..1e9ff03a85 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -23,7 +23,10 @@ use tracing::instrument; use lance_core::{Error, Result}; -use crate::{dataset::scanner::ColumnOrdering, Dataset}; +use crate::{ + dataset::{index::LanceIndexStoreExt, scanner::ColumnOrdering}, + Dataset, +}; use super::IndexParams; @@ -95,17 +98,12 @@ pub async fn build_scalar_index(dataset: &Dataset, column: &str, uuid: &str) -> }); } let flat_index_trainer = FlatIndexMetadata::new(field.data_type()); - let index_dir = dataset.indices_dir().child(uuid); - let index_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir); + let index_store = LanceIndexStore::from_dataset(dataset, uuid); train_btree_index(training_request, &flat_index_trainer, &index_store).await } pub async fn open_scalar_index(dataset: &Dataset, uuid: &str) -> Result> { - let index_dir = dataset.indices_dir().child(uuid); - let index_store = Arc::new(LanceIndexStore::new( - (*dataset.object_store).clone(), - index_dir, - )); + let index_store = Arc::new(LanceIndexStore::from_dataset(dataset, uuid)); // Currently we assume all scalar indices are btree indices. In the future, if this is not the // case, we may need to store a metadata file in the index directory with scalar index metadata let btree_index = BTreeIndex::load(index_store).await?;