Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use the file metadata cache in scalar indices #2330

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/lance-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ArcAny = Arc<dyn Any + Send + Sync>;
/// Cache for various metadata about files.
///
/// The cache is keyed by the file path and the type of metadata.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FileMetadataCache {
cache: Arc<Cache<(Path, TypeId), ArcAny>>,
}
Expand Down
25 changes: 19 additions & 6 deletions rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use lance_file::{
};
use snafu::{location, Location};

use lance_core::{Error, Result};
use lance_core::{cache::FileMetadataCache, Error, Result};
use lance_io::{object_store::ObjectStore, ReadBatchParams};
use lance_table::{format::SelfDescribingFileReader, io::manifest::ManifestDescribing};
use object_store::path::Path;
Expand All @@ -30,14 +30,20 @@ use super::{IndexReader, IndexStore, IndexWriter};
pub struct LanceIndexStore {
object_store: ObjectStore,
index_dir: Path,
metadata_cache: Option<FileMetadataCache>,
}

impl LanceIndexStore {
/// Create a new index store at the given directory
pub fn new(object_store: ObjectStore, index_dir: Path) -> Self {
pub fn new(
object_store: ObjectStore,
index_dir: Path,
metadata_cache: Option<FileMetadataCache>,
) -> Self {
Self {
object_store,
index_dir,
metadata_cache,
}
}
}
Expand Down Expand Up @@ -97,9 +103,12 @@ impl IndexStore for LanceIndexStore {

async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>> {
let path = self.index_dir.child(name);
// TODO: Should probably provide file metadata cache here
let file_reader =
FileReader::try_new_self_described(&self.object_store, &path, None).await?;
let file_reader = FileReader::try_new_self_described(
&self.object_store,
&path,
self.metadata_cache.as_ref(),
)
.await?;
Ok(Arc::new(file_reader))
}

Expand Down Expand Up @@ -167,7 +176,11 @@ mod tests {
let test_path: &Path = tempdir.path();
let (object_store, test_path) =
ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap();
Arc::new(LanceIndexStore::new(object_store, test_path.to_owned()))
Arc::new(LanceIndexStore::new(
object_store,
test_path.to_owned(),
None,
))
}

struct MockTrainingSource {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/benches/scalar_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl BenchmarkFixture {
let test_path = tempdir.path();
let (object_store, test_path) =
ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap();
Arc::new(LanceIndexStore::new(object_store, test_path))
Arc::new(LanceIndexStore::new(object_store, test_path, None))
}

async fn write_baseline_data(tempdir: &TempDir) -> Arc<Dataset> {
Expand Down
16 changes: 16 additions & 0 deletions rust/lance/src/dataset/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use lance_core::Result;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::DatasetIndexExt;
use lance_table::format::Index;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -68,3 +69,18 @@ impl IndexRemapper for DatasetIndexRemapper {
Ok(remapped)
}
}

pub trait LanceIndexStoreExt {
fn from_dataset(dataset: &Dataset, uuid: &str) -> Self;
}

impl LanceIndexStoreExt for LanceIndexStore {
fn from_dataset(dataset: &Dataset, uuid: &str) -> Self {
let index_dir = dataset.indices_dir().child(uuid);
LanceIndexStore::new(
dataset.object_store.as_ref().clone(),
index_dir,
Some(dataset.session.file_metadata_cache.clone()),
)
}
}
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3542,7 +3542,7 @@ mod test {
.await
.unwrap();
let second_index_scan_bytes = get_bytes() - start_bytes;
assert!(second_index_scan_bytes < index_scan_bytes);
assert!(second_index_scan_bytes < filtered_scan_bytes);
}

#[tokio::test]
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(crate) mod prefilter;
pub mod scalar;
pub mod vector;

use crate::dataset::index::LanceIndexStoreExt;
pub use crate::index::prefilter::{FilterLoader, PreFilter};

use crate::dataset::transaction::{Operation, Transaction};
Expand Down Expand Up @@ -93,8 +94,7 @@ pub(crate) async fn remap_index(

match generic.index_type() {
IndexType::Scalar => {
let index_dir = dataset.indices_dir().child(new_id.to_string());
let new_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir);
let new_store = LanceIndexStore::from_dataset(dataset, &new_id.to_string());

let scalar_index = dataset
.open_scalar_index(&field.name, &index_id.to_string())
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use uuid::Uuid;

use super::vector::ivf::optimize_vector_indices;
use super::DatasetIndexInternalExt;
use crate::dataset::index::LanceIndexStoreExt;
use crate::dataset::scanner::ColumnOrdering;
use crate::dataset::Dataset;

Expand Down Expand Up @@ -95,8 +96,7 @@ pub async fn merge_indices<'a>(

let new_uuid = Uuid::new_v4();

let index_dir = dataset.indices_dir().child(new_uuid.to_string());
let new_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir);
let new_store = LanceIndexStore::from_dataset(&dataset, &new_uuid.to_string());

index.update(new_data_stream.into(), &new_store).await?;

Expand Down
14 changes: 6 additions & 8 deletions rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use tracing::instrument;

use lance_core::{Error, Result};

use crate::{dataset::scanner::ColumnOrdering, Dataset};
use crate::{
dataset::{index::LanceIndexStoreExt, scanner::ColumnOrdering},
Dataset,
};

use super::IndexParams;

Expand Down Expand Up @@ -95,17 +98,12 @@ pub async fn build_scalar_index(dataset: &Dataset, column: &str, uuid: &str) ->
});
}
let flat_index_trainer = FlatIndexMetadata::new(field.data_type());
let index_dir = dataset.indices_dir().child(uuid);
let index_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir);
let index_store = LanceIndexStore::from_dataset(dataset, uuid);
train_btree_index(training_request, &flat_index_trainer, &index_store).await
}

pub async fn open_scalar_index(dataset: &Dataset, uuid: &str) -> Result<Arc<dyn ScalarIndex>> {
let index_dir = dataset.indices_dir().child(uuid);
let index_store = Arc::new(LanceIndexStore::new(
(*dataset.object_store).clone(),
index_dir,
));
let index_store = Arc::new(LanceIndexStore::from_dataset(dataset, uuid));
// Currently we assume all scalar indices are btree indices. In the future, if this is not the
// case, we may need to store a metadata file in the index directory with scalar index metadata
let btree_index = BTreeIndex::load(index_store).await?;
Expand Down
Loading