Skip to content

Commit

Permalink
use the file metadata cache in the index store
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed May 13, 2024
1 parent ace9695 commit 5f3b492
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 21 deletions.
2 changes: 1 addition & 1 deletion rust/lance-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ArcAny = Arc<dyn Any + Send + Sync>;
/// Cache for various metadata about files.
///
/// The cache is keyed by the file path and the type of metadata.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FileMetadataCache {
cache: Arc<Cache<(Path, TypeId), ArcAny>>,
}
Expand Down
25 changes: 19 additions & 6 deletions rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use lance_file::{
};
use snafu::{location, Location};

use lance_core::{Error, Result};
use lance_core::{cache::FileMetadataCache, Error, Result};
use lance_io::{object_store::ObjectStore, ReadBatchParams};
use lance_table::{format::SelfDescribingFileReader, io::manifest::ManifestDescribing};
use object_store::path::Path;
Expand All @@ -30,14 +30,20 @@ use super::{IndexReader, IndexStore, IndexWriter};
pub struct LanceIndexStore {
object_store: ObjectStore,
index_dir: Path,
metadata_cache: Option<FileMetadataCache>,
}

impl LanceIndexStore {
/// Create a new index store at the given directory
pub fn new(object_store: ObjectStore, index_dir: Path) -> Self {
pub fn new(
object_store: ObjectStore,
index_dir: Path,
metadata_cache: Option<FileMetadataCache>,
) -> Self {
Self {
object_store,
index_dir,
metadata_cache,
}
}
}
Expand Down Expand Up @@ -97,9 +103,12 @@ impl IndexStore for LanceIndexStore {

async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>> {
let path = self.index_dir.child(name);
// TODO: Should probably provide file metadata cache here
let file_reader =
FileReader::try_new_self_described(&self.object_store, &path, None).await?;
let file_reader = FileReader::try_new_self_described(
&self.object_store,
&path,
self.metadata_cache.as_ref(),
)
.await?;
Ok(Arc::new(file_reader))
}

Expand Down Expand Up @@ -167,7 +176,11 @@ mod tests {
let test_path: &Path = tempdir.path();
let (object_store, test_path) =
ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap();
Arc::new(LanceIndexStore::new(object_store, test_path.to_owned()))
Arc::new(LanceIndexStore::new(
object_store,
test_path.to_owned(),
None,
))
}

struct MockTrainingSource {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/benches/scalar_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl BenchmarkFixture {
let test_path = tempdir.path();
let (object_store, test_path) =
ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap();
Arc::new(LanceIndexStore::new(object_store, test_path))
Arc::new(LanceIndexStore::new(object_store, test_path, None))
}

async fn write_baseline_data(tempdir: &TempDir) -> Arc<Dataset> {
Expand Down
16 changes: 16 additions & 0 deletions rust/lance/src/dataset/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use lance_core::Result;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::DatasetIndexExt;
use lance_table::format::Index;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -68,3 +69,18 @@ impl IndexRemapper for DatasetIndexRemapper {
Ok(remapped)
}
}

pub trait LanceIndexStoreExt {
fn from_dataset(dataset: &Dataset, uuid: &str) -> Self;
}

impl LanceIndexStoreExt for LanceIndexStore {
fn from_dataset(dataset: &Dataset, uuid: &str) -> Self {
let index_dir = dataset.indices_dir().child(uuid);
LanceIndexStore::new(
dataset.object_store.as_ref().clone(),
index_dir,
Some(dataset.session.file_metadata_cache.clone()),
)
}
}
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3542,7 +3542,7 @@ mod test {
.await
.unwrap();
let second_index_scan_bytes = get_bytes() - start_bytes;
assert!(second_index_scan_bytes < index_scan_bytes);
assert!(second_index_scan_bytes < filtered_scan_bytes);
}

#[tokio::test]
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(crate) mod prefilter;
pub mod scalar;
pub mod vector;

use crate::dataset::index::LanceIndexStoreExt;
pub use crate::index::prefilter::{FilterLoader, PreFilter};

use crate::dataset::transaction::{Operation, Transaction};
Expand Down Expand Up @@ -93,8 +94,7 @@ pub(crate) async fn remap_index(

match generic.index_type() {
IndexType::Scalar => {
let index_dir = dataset.indices_dir().child(new_id.to_string());
let new_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir);
let new_store = LanceIndexStore::from_dataset(dataset, &new_id.to_string());

let scalar_index = dataset
.open_scalar_index(&field.name, &index_id.to_string())
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use uuid::Uuid;

use super::vector::ivf::optimize_vector_indices;
use super::DatasetIndexInternalExt;
use crate::dataset::index::LanceIndexStoreExt;
use crate::dataset::scanner::ColumnOrdering;
use crate::dataset::Dataset;

Expand Down Expand Up @@ -95,8 +96,7 @@ pub async fn merge_indices<'a>(

let new_uuid = Uuid::new_v4();

let index_dir = dataset.indices_dir().child(new_uuid.to_string());
let new_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir);
let new_store = LanceIndexStore::from_dataset(&dataset, &new_uuid.to_string());

index.update(new_data_stream.into(), &new_store).await?;

Expand Down
14 changes: 6 additions & 8 deletions rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use tracing::instrument;

use lance_core::{Error, Result};

use crate::{dataset::scanner::ColumnOrdering, Dataset};
use crate::{
dataset::{index::LanceIndexStoreExt, scanner::ColumnOrdering},
Dataset,
};

use super::IndexParams;

Expand Down Expand Up @@ -95,17 +98,12 @@ pub async fn build_scalar_index(dataset: &Dataset, column: &str, uuid: &str) ->
});
}
let flat_index_trainer = FlatIndexMetadata::new(field.data_type());
let index_dir = dataset.indices_dir().child(uuid);
let index_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir);
let index_store = LanceIndexStore::from_dataset(dataset, uuid);
train_btree_index(training_request, &flat_index_trainer, &index_store).await
}

pub async fn open_scalar_index(dataset: &Dataset, uuid: &str) -> Result<Arc<dyn ScalarIndex>> {
let index_dir = dataset.indices_dir().child(uuid);
let index_store = Arc::new(LanceIndexStore::new(
(*dataset.object_store).clone(),
index_dir,
));
let index_store = Arc::new(LanceIndexStore::from_dataset(dataset, uuid));
// Currently we assume all scalar indices are btree indices. In the future, if this is not the
// case, we may need to store a metadata file in the index directory with scalar index metadata
let btree_index = BTreeIndex::load(index_store).await?;
Expand Down

0 comments on commit 5f3b492

Please sign in to comment.