From 35d4f851c60631f726e46da03526850cc4c02b8e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 24 May 2024 12:26:18 -0700 Subject: [PATCH 1/8] wip: better local listing --- docs/format.rst | 24 ++++++ rust/lance-table/src/io/commit.rs | 81 ++++++++++++++++++- .../src/io/commit/external_manifest.rs | 22 ++++- 3 files changed, 123 insertions(+), 4 deletions(-) diff --git a/docs/format.rst b/docs/format.rst index b6e30777da..b13be67b48 100644 --- a/docs/format.rst +++ b/docs/format.rst @@ -331,6 +331,30 @@ The reader load process is as follows: 3. ``PUT_EXTERNAL_STORE base_uri, version, mydataset.lance/_versions/{version}.manifest`` update the external store to point to the final manifest 4. ``RETURN mydataset.lance/_versions/{version}.manifest`` always return the finalized path, return error if synchronization fails +Latest manifest file +-------------------- + +To get fast access to the latest version, there is a pointer file called +``latest.json``. This file contains information about what the latest version +of the table is. It is a JSON file with the fields: + +.. code-block:: + + { + "version": 24, + "manifestPath": "_versions/24.manifest", + "manifestSize": 2342501, + "readerFlags": 4, + "writerFlags": 5 + } + +The ``version`` field is the version of the table. The ``manifestPath`` field is +the path to the manifest file for that version. ``manifestSize`` is the size of +the manifest file in bytes, which can be used to bypass the need for a head +request to find the end of the manifest file. The ``readerFlags`` and +``writerFlags`` fields are the feature flags for the reader and writer, +respectively. + Statistics ---------- diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index f371e1b3b4..5a60bc98f8 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -22,7 +22,7 @@ //! terms of a lock. The trait [CommitLock] can be implemented as a simpler //! alternative to [CommitHandler]. -use std::fmt::Debug; +use std::{fmt::Debug, fs::DirEntry}; use std::sync::atomic::AtomicBool; use std::sync::Arc; @@ -31,7 +31,9 @@ use futures::{ stream::BoxStream, StreamExt, TryStreamExt, }; +use object_store::local::LocalFileSystem; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; +use serde::{Deserialize, Serialize}; use snafu::{location, Location}; use url::Url; @@ -40,9 +42,11 @@ pub mod dynamodb; pub mod external_manifest; use lance_core::{Error, Result}; -use lance_io::object_store::ObjectStoreExt; +use lance_io::{object_store::ObjectStoreExt, testing::__mock_MockObjectStore_OSObjectStore}; use lance_io::object_store::ObjectStoreParams; +use self::external_manifest::ManifestLocation; + #[cfg(feature = "dynamodb")] use { self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore}, @@ -62,6 +66,15 @@ const LATEST_MANIFEST_NAME: &str = "_latest.manifest"; const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; +#[derive(Debug, Serialize, Deserialize)] +struct LatestManifestPointer { + version: u64, + location: String, + size: u64, + reader_flags: u64, + writer_flags: u64, +} + /// Function that writes the manifest to the object store. pub type ManifestWriter = for<'a> fn( object_store: &'a dyn ObjectStore, @@ -81,10 +94,16 @@ pub fn latest_manifest_path(base: &Path) -> Path { } /// Get the latest manifest path -async fn current_manifest_path(object_store: &dyn ObjectStore, base: &Path) -> Result { +async fn current_manifest_path(object_store: &dyn ObjectStore, base: &Path) -> Result { // TODO: list gives us the size, so we could also return the size of the manifest. // That avoids a HEAD request later. + if let Some(store) = object_store.as_any().downcast_ref::() { + if let Ok(Some(location)) = current_manifest_local(base, store) { + return Ok(location); + } + } + // We use `list_with_delimiter` to avoid listing the contents of child directories. let manifest_files = object_store .list_with_delimiter(Some(&base.child(VERSIONS_DIR))) @@ -118,6 +137,54 @@ async fn current_manifest_path(object_store: &dyn ObjectStore, base: &Path) -> R } } +// This is an optimized function that searches for the latest manifest. In +// object_store, list operations lookup metadata for each file listed. This +// method only gets the metadata for the found latest manifest. +fn current_manifest_local( + base: &Path, + store: &LocalFileSystem, +) -> std::io::Result> { + let path = store.path_to_filesystem(&base.child(VERSIONS_DIR))?; + let entries = std::fs::read_dir(path)?; + + let mut latest_entry: Option<(u64, DirEntry)> = None; + + for entry in entries { + let entry = entry?; + let filename = entry.file_name().to_string_lossy(); + if !filename.ends_with(MANIFEST_EXTENSION) { + // Need to ignore temporary files, such as + // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d + continue; + } + let version = if let Some(version) = filename + .split_once('.') + .and_then(|(version_str, _)| version_str.parse::().ok()) { + version + } else { + continue; + }; + + if let Some((latest_version, _)) = &latest_entry { + if version > *latest_version { + latest_entry = Some((version, entry)); + } + } else { + latest_entry = Some((version, entry)); + } + } + + if let Some((version, entry)) = latest_entry { + Ok(Some(ManifestLocation { + version, + uri: entry.path().to_string_lossy().to_string(), + size: Some(entry.metadata()?.len()), + })) + } else { + Ok(None) + } +} + async fn list_manifests<'a>( base_path: &Path, object_store: &'a dyn ObjectStore, @@ -204,6 +271,14 @@ pub trait CommitHandler: Debug + Send + Sync { parse_version_from_path(&path) } + async fn resolve_latest_manifest_location( + &self, + base_path: &Path, + object_store: &dyn ObjectStore, + ) -> Result { + + } + /// Get the path to a specific versioned manifest of a dataset at the base_path async fn resolve_version( &self, diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 4fa523c2b0..c43f37ec24 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -24,7 +24,7 @@ use crate::io::commit::{parse_version_from_path, CommitError, CommitHandler, Man /// External manifest store /// /// This trait abstracts an external storage for source of truth for manifests. -/// The storge is expected to remember (uri, version) -> manifest_path +/// The storage is expected to remember (uri, version) -> manifest_path /// and able to run transactions on the manifest_path. /// /// This trait is called an **External** manifest store because the store is @@ -43,6 +43,17 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { /// the version and the store should not customize it. async fn get_latest_version(&self, base_uri: &str) -> Result>; + /// Get the latest manifest location for a given base_uri. + async fn get_latest_manifest_location(&self, base_uri: &str) -> Result> { + self.get_latest_version(base_uri) + .await + .map(|res| res.map(|(version, uri)| ManifestLocation { + version, + uri, + size: None, + })) + } + /// Put the manifest path for a given base_uri and version, should fail if the version already exists async fn put_if_not_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>; @@ -50,6 +61,15 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { async fn put_if_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>; } +pub struct ManifestLocation { + /// The version the manifest corresponds to. + pub version: u64, + /// URI of the manifest file + pub uri: String, + /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`. + pub size: Option, +} + /// External manifest commit handler /// This handler is used to commit a manifest to an external store /// for detailed design, see https://github.com/lancedb/lance/issues/1183 From 5c564bf5425826de4ebd63a5384a266e20a84cda Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 26 May 2024 12:18:28 -0700 Subject: [PATCH 2/8] clean up code paths --- rust/lance-io/src/local.rs | 17 ++- rust/lance-io/src/object_reader.rs | 8 +- rust/lance-io/src/scheduler.rs | 3 +- rust/lance-io/src/traits.rs | 4 +- rust/lance-io/src/utils.rs | 11 +- rust/lance-table/src/io/commit.rs | 134 +++++++++-------- .../src/io/commit/external_manifest.rs | 63 +++++--- rust/lance/src/dataset.rs | 142 +++++++----------- rust/lance/src/dataset/builder.rs | 19 ++- rust/lance/src/index.rs | 4 +- 10 files changed, 213 insertions(+), 192 deletions(-) diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs index 268fa37afa..407a0c5a0b 100644 --- a/rust/lance-io/src/local.rs +++ b/rust/lance-io/src/local.rs @@ -111,13 +111,20 @@ impl Reader for LocalObjectReader { } /// Returns the file size. - async fn size(&self) -> Result { - Ok(self.file.metadata()?.len() as usize) + async fn size(&self) -> object_store::Result { + let metadata = self + .file + .metadata() + .map_err(|err| object_store::Error::Generic { + store: "LocalFileSystem", + source: err.into(), + })?; + Ok(metadata.len() as usize) } /// Reads a range of data. #[instrument(level = "debug", skip(self))] - async fn get_range(&self, range: Range) -> Result { + async fn get_range(&self, range: Range) -> object_store::Result { let file = self.file.clone(); tokio::task::spawn_blocking(move || { let mut buf = BytesMut::with_capacity(range.len()); @@ -132,6 +139,10 @@ impl Reader for LocalObjectReader { Ok(buf.freeze()) }) .await? + .map_err(|err: std::io::Error| object_store::Error::Generic { + store: "LocalFileSystem", + source: err.into(), + }) } } diff --git a/rust/lance-io/src/object_reader.rs b/rust/lance-io/src/object_reader.rs index fc9b495dc9..1ec74dd2b3 100644 --- a/rust/lance-io/src/object_reader.rs +++ b/rust/lance-io/src/object_reader.rs @@ -42,14 +42,14 @@ impl CloudObjectReader { async fn do_with_retry<'a, O>( &self, f: impl Fn() -> BoxFuture<'a, std::result::Result>, - ) -> Result { + ) -> object_store::Result { let mut retries = 3; loop { match f().await { Ok(val) => return Ok(val), Err(err) => { if retries == 0 { - return Err(err.into()); + return Err(err); } retries -= 1; } @@ -69,7 +69,7 @@ impl Reader for CloudObjectReader { } /// Object/File Size. - async fn size(&self) -> Result { + async fn size(&self) -> object_store::Result { let meta = self .do_with_retry(|| self.object_store.head(&self.path)) .await?; @@ -77,7 +77,7 @@ impl Reader for CloudObjectReader { } #[instrument(level = "debug", skip(self))] - async fn get_range(&self, range: Range) -> Result { + async fn get_range(&self, range: Range) -> object_store::Result { self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone())) .await } diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index 8a12f8a8d6..ad26f354b9 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -91,7 +91,8 @@ impl IoTask { let bytes = self .reader .get_range(self.to_read.start as usize..self.to_read.end as usize) - .await; + .await + .map_err(Error::from); (self.when_done)(bytes); } } diff --git a/rust/lance-io/src/traits.rs b/rust/lance-io/src/traits.rs index 89ad65b396..66b8b55ccf 100644 --- a/rust/lance-io/src/traits.rs +++ b/rust/lance-io/src/traits.rs @@ -86,10 +86,10 @@ pub trait Reader: std::fmt::Debug + Send + Sync { fn block_size(&self) -> usize; /// Object/File Size. - async fn size(&self) -> Result; + async fn size(&self) -> object_store::Result; /// Read a range of bytes from the object. /// /// TODO: change to read_at()? - async fn get_range(&self, range: Range) -> Result; + async fn get_range(&self, range: Range) -> object_store::Result; } diff --git a/rust/lance-io/src/utils.rs b/rust/lance-io/src/utils.rs index f34a1568ea..bb5c095bb4 100644 --- a/rust/lance-io/src/utils.rs +++ b/rust/lance-io/src/utils.rs @@ -113,8 +113,15 @@ pub async fn read_struct< T::try_from(msg) } -pub async fn read_last_block(reader: &dyn Reader) -> Result { - let file_size = reader.size().await?; +pub async fn read_last_block( + reader: &dyn Reader, + size: Option, +) -> object_store::Result { + let file_size = if let Some(size) = size { + size as usize + } else { + reader.size().await? + }; let block_size = reader.block_size(); let begin = if file_size < block_size { 0 diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 5a60bc98f8..aa00950a8b 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -22,17 +22,16 @@ //! terms of a lock. The trait [CommitLock] can be implemented as a simpler //! alternative to [CommitHandler]. -use std::{fmt::Debug, fs::DirEntry}; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::{fmt::Debug, fs::DirEntry}; use futures::{ future::{self, BoxFuture}, stream::BoxStream, StreamExt, TryStreamExt, }; -use object_store::local::LocalFileSystem; -use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use serde::{Deserialize, Serialize}; use snafu::{location, Location}; use url::Url; @@ -42,10 +41,7 @@ pub mod dynamodb; pub mod external_manifest; use lance_core::{Error, Result}; -use lance_io::{object_store::ObjectStoreExt, testing::__mock_MockObjectStore_OSObjectStore}; -use lance_io::object_store::ObjectStoreParams; - -use self::external_manifest::ManifestLocation; +use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams}; #[cfg(feature = "dynamodb")] use { @@ -77,7 +73,7 @@ struct LatestManifestPointer { /// Function that writes the manifest to the object store. pub type ManifestWriter = for<'a> fn( - object_store: &'a dyn ObjectStore, + object_store: &'a dyn OSObjectStore, manifest: &'a mut Manifest, indices: Option>, path: &'a Path, @@ -93,42 +89,61 @@ pub fn latest_manifest_path(base: &Path) -> Path { base.child(LATEST_MANIFEST_NAME) } -/// Get the latest manifest path -async fn current_manifest_path(object_store: &dyn ObjectStore, base: &Path) -> Result { - // TODO: list gives us the size, so we could also return the size of the manifest. - // That avoids a HEAD request later. +#[derive(Debug)] +pub struct ManifestLocation { + /// The version the manifest corresponds to. + pub version: u64, + /// Path of the manifest file, relative to the table root. + pub path: Path, + /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`. + pub size: Option, +} - if let Some(store) = object_store.as_any().downcast_ref::() { - if let Ok(Some(location)) = current_manifest_local(base, store) { +/// Get the latest manifest path +async fn current_manifest_path( + object_store: &ObjectStore, + base: &Path, +) -> Result { + if object_store.is_local() { + if let Ok(Some(location)) = current_manifest_local(base) { return Ok(location); } } // We use `list_with_delimiter` to avoid listing the contents of child directories. let manifest_files = object_store + .inner .list_with_delimiter(Some(&base.child(VERSIONS_DIR))) .await?; let current = manifest_files .objects .into_iter() - .map(|meta| meta.location) - .filter(|path| { - path.filename().is_some() && path.filename().unwrap().ends_with(MANIFEST_EXTENSION) + .filter(|meta| { + meta.location.filename().is_some() + && meta + .location + .filename() + .unwrap() + .ends_with(MANIFEST_EXTENSION) }) - .filter_map(|path| { - let version = path + .filter_map(|meta| { + let version = meta + .location .filename() .unwrap() .split_once('.') .and_then(|(version_str, _)| version_str.parse::().ok())?; - Some((version, path)) + Some((version, meta)) }) - .max_by_key(|(version, _)| *version) - .map(|(_, path)| path); + .max_by_key(|(version, _)| *version); - if let Some(path) = current { - Ok(path) + if let Some((version, meta)) = current { + Ok(ManifestLocation { + version, + path: meta.location, + size: Some(meta.size as u64), + }) } else { Err(Error::NotFound { uri: manifest_path(base, 1).to_string(), @@ -138,30 +153,27 @@ async fn current_manifest_path(object_store: &dyn ObjectStore, base: &Path) -> R } // This is an optimized function that searches for the latest manifest. In -// object_store, list operations lookup metadata for each file listed. This +// object_store, list operations lookup metadata for each file listed. This // method only gets the metadata for the found latest manifest. -fn current_manifest_local( - base: &Path, - store: &LocalFileSystem, -) -> std::io::Result> { - let path = store.path_to_filesystem(&base.child(VERSIONS_DIR))?; +fn current_manifest_local(base: &Path) -> std::io::Result> { + let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR)); let entries = std::fs::read_dir(path)?; let mut latest_entry: Option<(u64, DirEntry)> = None; for entry in entries { let entry = entry?; - let filename = entry.file_name().to_string_lossy(); + let filename_raw = entry.file_name(); + let filename = filename_raw.to_string_lossy(); if !filename.ends_with(MANIFEST_EXTENSION) { // Need to ignore temporary files, such as // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d continue; } - let version = if let Some(version) = filename + let Some(version) = filename .split_once('.') - .and_then(|(version_str, _)| version_str.parse::().ok()) { - version - } else { + .and_then(|(version_str, _)| version_str.parse::().ok()) + else { continue; }; @@ -177,7 +189,7 @@ fn current_manifest_local( if let Some((version, entry)) = latest_entry { Ok(Some(ManifestLocation { version, - uri: entry.path().to_string_lossy().to_string(), + path: entry.path().to_string_lossy().as_ref().into(), size: Some(entry.metadata()?.len()), })) } else { @@ -187,14 +199,14 @@ fn current_manifest_local( async fn list_manifests<'a>( base_path: &Path, - object_store: &'a dyn ObjectStore, + object_store: &'a dyn OSObjectStore, ) -> Result>> { let base_path = base_path.clone(); Ok(object_store .read_dir_all(&base_path.child(VERSIONS_DIR), None) .await? .try_filter_map(|obj_meta| { - if obj_meta.location.extension() == Some("manifest") { + if obj_meta.location.extension() == Some(MANIFEST_EXTENSION) { future::ready(Ok(Some(obj_meta.location))) } else { future::ready(Ok(None)) @@ -206,7 +218,7 @@ async fn list_manifests<'a>( pub fn parse_version_from_path(path: &Path) -> Result { path.filename() .and_then(|name| name.split_once('.')) - .filter(|(_, extension)| *extension == "manifest") + .filter(|(_, extension)| *extension == MANIFEST_EXTENSION) .and_then(|(version, _)| version.parse::().ok()) .ok_or(Error::Internal { message: format!("Expected manifest file, but found {}", path), @@ -225,7 +237,7 @@ fn make_staging_manifest_path(base: &Path) -> Result { async fn write_latest_manifest( from_path: &Path, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, ) -> Result<()> { let latest_path = latest_manifest_path(base_path); let staging_path = make_staging_manifest_path(from_path)?; @@ -250,33 +262,33 @@ const DDB_URL_QUERY_KEY: &str = "ddbTableName"; // TODO: pub(crate) #[async_trait::async_trait] pub trait CommitHandler: Debug + Send + Sync { + async fn resolve_latest_location( + &self, + base_path: &Path, + object_store: &ObjectStore, + ) -> Result { + Ok(current_manifest_path(object_store, base_path).await?) + } + /// Get the path to the latest version manifest of a dataset at the base_path async fn resolve_latest_version( &self, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &ObjectStore, ) -> std::result::Result { // TODO: we need to pade 0's to the version number on the manifest file path - Ok(current_manifest_path(object_store, base_path).await?) + Ok(current_manifest_path(object_store, base_path).await?.path) } // for default implementation, parse the version from the path async fn resolve_latest_version_id( &self, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &ObjectStore, ) -> Result { - let path = self.resolve_latest_version(base_path, object_store).await?; - - parse_version_from_path(&path) - } - - async fn resolve_latest_manifest_location( - &self, - base_path: &Path, - object_store: &dyn ObjectStore, - ) -> Result { - + Ok(current_manifest_path(object_store, base_path) + .await? + .version) } /// Get the path to a specific versioned manifest of a dataset at the base_path @@ -284,7 +296,7 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, version: u64, - _object_store: &dyn ObjectStore, + _object_store: &dyn OSObjectStore, ) -> std::result::Result { Ok(manifest_path(base_path, version)) } @@ -293,7 +305,7 @@ pub trait CommitHandler: Debug + Send + Sync { async fn list_manifests<'a>( &self, base_path: &Path, - object_store: &'a dyn ObjectStore, + object_store: &'a dyn OSObjectStore, ) -> Result>> { list_manifests(base_path, object_store).await } @@ -307,7 +319,7 @@ pub trait CommitHandler: Debug + Send + Sync { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError>; } @@ -518,7 +530,7 @@ impl CommitHandler for UnsafeCommitHandler { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError> { // Log a one-time warning @@ -581,7 +593,7 @@ impl CommitHandler for T { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError> { let path = self @@ -627,7 +639,7 @@ impl CommitHandler for Arc { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError> { self.as_ref() @@ -648,7 +660,7 @@ impl CommitHandler for RenameCommitHandler { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError> { // Create a temporary object, then use `rename_if_not_exists` to commit. diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index c43f37ec24..f42c0f535c 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -9,17 +9,17 @@ use std::sync::Arc; use async_trait::async_trait; use lance_core::{Error, Result}; -use lance_io::object_store::ObjectStoreExt; +use lance_io::object_store::{ObjectStore, ObjectStoreExt}; use log::warn; -use object_store::{path::Path, ObjectStore}; +use object_store::{path::Path, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use super::{ current_manifest_path, make_staging_manifest_path, manifest_path, write_latest_manifest, - MANIFEST_EXTENSION, + ManifestLocation, MANIFEST_EXTENSION, }; use crate::format::{Index, Manifest}; -use crate::io::commit::{parse_version_from_path, CommitError, CommitHandler, ManifestWriter}; +use crate::io::commit::{CommitError, CommitHandler, ManifestWriter}; /// External manifest store /// @@ -44,14 +44,17 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { async fn get_latest_version(&self, base_uri: &str) -> Result>; /// Get the latest manifest location for a given base_uri. - async fn get_latest_manifest_location(&self, base_uri: &str) -> Result> { - self.get_latest_version(base_uri) - .await - .map(|res| res.map(|(version, uri)| ManifestLocation { + async fn get_latest_manifest_location( + &self, + base_uri: &str, + ) -> Result> { + self.get_latest_version(base_uri).await.map(|res| { + res.map(|(version, uri)| ManifestLocation { version, - uri, + path: Path::from(uri), size: None, - })) + }) + }) } /// Put the manifest path for a given base_uri and version, should fail if the version already exists @@ -61,15 +64,6 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { async fn put_if_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>; } -pub struct ManifestLocation { - /// The version the manifest corresponds to. - pub version: u64, - /// URI of the manifest file - pub uri: String, - /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`. - pub size: Option, -} - /// External manifest commit handler /// This handler is used to commit a manifest to an external store /// for detailed design, see https://github.com/lancedb/lance/issues/1183 @@ -80,11 +74,26 @@ pub struct ExternalManifestCommitHandler { #[async_trait] impl CommitHandler for ExternalManifestCommitHandler { + async fn resolve_latest_location( + &self, + base_path: &Path, + object_store: &ObjectStore, + ) -> std::result::Result { + let path = self.resolve_latest_version(base_path, object_store).await?; + Ok(ManifestLocation { + version: self + .resolve_latest_version_id(base_path, object_store) + .await?, + path, + size: None, + }) + } + /// Get the latest version of a dataset at the path async fn resolve_latest_version( &self, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &ObjectStore, ) -> std::result::Result { let version = self .external_manifest_store @@ -107,11 +116,13 @@ impl CommitHandler for ExternalManifestCommitHandler { // TODO: remove copy-rename once we upgrade object_store crate object_store.copy(&manifest_path, &staging).await?; object_store + .inner .rename(&staging, &object_store_manifest_path) .await?; // step 2: write _latest.manifest - write_latest_manifest(&manifest_path, base_path, object_store).await?; + write_latest_manifest(&manifest_path, base_path, object_store.inner.as_ref()) + .await?; // step 3: update external store to finalize path self.external_manifest_store @@ -126,14 +137,14 @@ impl CommitHandler for ExternalManifestCommitHandler { } // Dataset not found in the external store, this could be because the dataset did not // use external store for commit before. In this case, we search for the latest manifest - None => current_manifest_path(object_store, base_path).await, + None => Ok(current_manifest_path(object_store, base_path).await?.path), } } async fn resolve_latest_version_id( &self, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &ObjectStore, ) -> std::result::Result { let version = self .external_manifest_store @@ -142,7 +153,9 @@ impl CommitHandler for ExternalManifestCommitHandler { match version { Some((version, _)) => Ok(version), - None => parse_version_from_path(¤t_manifest_path(object_store, base_path).await?), + None => Ok(current_manifest_path(object_store, base_path) + .await? + .version), } } @@ -150,7 +163,7 @@ impl CommitHandler for ExternalManifestCommitHandler { &self, base_path: &Path, version: u64, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, ) -> std::result::Result { let path_res = self .external_manifest_store diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index d293957c82..710f71b27f 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -15,9 +15,11 @@ use lance_file::datatypes::populate_schema_dictionary; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; use lance_io::object_writer::ObjectWriter; use lance_io::traits::WriteExt; -use lance_io::utils::{read_metadata_offset, read_struct}; +use lance_io::utils::{read_last_block, read_metadata_offset, read_struct}; use lance_table::format::{Fragment, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION}; -use lance_table::io::commit::{commit_handler_from_url, CommitError, CommitHandler, CommitLock}; +use lance_table::io::commit::{ + commit_handler_from_url, CommitError, CommitHandler, CommitLock, ManifestLocation, +}; use lance_table::io::manifest::{read_manifest, write_manifest}; use log::warn; use object_store::path::Path; @@ -234,72 +236,33 @@ impl Dataset { #[deprecated(since = "0.8.17", note = "Please use `DatasetBuilder` instead.")] #[instrument(skip(params))] pub async fn open_with_params(uri: &str, params: &ReadParams) -> Result { - let (object_store, base_path, commit_handler) = - Self::params_from_uri(uri, ¶ms.commit_handler, ¶ms.store_options).await?; - - let latest_manifest = commit_handler - .resolve_latest_version(&base_path, &object_store.inner) + DatasetBuilder::from_uri(uri) + .with_read_params(params.clone()) + .load() .await - .map_err(|e| Error::DatasetNotFound { - path: base_path.to_string(), - source: Box::new(e), - location: location!(), - })?; - - let session = if let Some(session) = params.session.as_ref() { - session.clone() - } else { - Arc::new(Session::new( - params.index_cache_size, - params.metadata_cache_size, - )) - }; - - Self::checkout_manifest( - Arc::new(object_store), - base_path.clone(), - &latest_manifest, - session, - commit_handler, - ) - .await } /// Check out a version of the dataset. + #[deprecated(note = "Please use `DatasetBuilder` instead.")] pub async fn checkout(uri: &str, version: u64) -> Result { - let params = ReadParams::default(); - Self::checkout_with_params(uri, version, ¶ms).await + DatasetBuilder::from_uri(uri) + .with_version(version) + .load() + .await } /// Check out a version of the dataset with read params. + #[deprecated(note = "Please use `DatasetBuilder` instead.")] pub async fn checkout_with_params( uri: &str, version: u64, params: &ReadParams, ) -> Result { - let (object_store, base_path, commit_handler) = - Self::params_from_uri(uri, ¶ms.commit_handler, ¶ms.store_options).await?; - - let manifest_file = commit_handler - .resolve_version(&base_path, version, &object_store.inner) - .await?; - - let session = if let Some(session) = params.session.as_ref() { - session.clone() - } else { - Arc::new(Session::new( - params.index_cache_size, - params.metadata_cache_size, - )) - }; - Self::checkout_manifest( - Arc::new(object_store), - base_path, - &manifest_file, - session, - commit_handler, - ) - .await + DatasetBuilder::from_uri(uri) + .with_version(version) + .with_read_params(params.clone()) + .load() + .await } /// Check out the specified version of this dataset @@ -309,10 +272,15 @@ impl Dataset { .commit_handler .resolve_version(&base_path, version, &self.object_store.inner) .await?; + let manifest_location = ManifestLocation { + version, + path: manifest_file, + size: None, + }; Self::checkout_manifest( self.object_store.clone(), base_path, - &manifest_file, + &manifest_location, self.session.clone(), self.commit_handler.clone(), ) @@ -322,36 +290,36 @@ impl Dataset { async fn checkout_manifest( object_store: Arc, base_path: Path, - manifest_path: &Path, + manifest_location: &ManifestLocation, session: Arc, commit_handler: Arc, ) -> Result { - let object_reader = object_store - .open(manifest_path) - .await - .map_err(|e| match &e { - Error::NotFound { uri, .. } => Error::DatasetNotFound { - path: uri.clone(), - source: box_error(e), + let object_reader = + object_store + .open(&manifest_location.path) + .await + .map_err(|e| match &e { + Error::NotFound { uri, .. } => Error::DatasetNotFound { + path: uri.clone(), + source: box_error(e), + location: location!(), + }, + _ => e, + })?; + let last_block = read_last_block(object_reader.as_ref(), manifest_location.size) + .await + .map_err(|err| match err { + object_store::Error::NotFound { path, source } => Error::DatasetNotFound { + path: path.to_string(), + source: source.into(), location: location!(), }, - _ => e, - })?; - // TODO: remove reference to inner. - let get_result = object_store - .inner - .get(manifest_path) - .await - .map_err(|e| match e { - object_store::Error::NotFound { path: _, source } => Error::DatasetNotFound { - path: base_path.to_string(), - source, + _ => Error::IO { + source: err.into(), location: location!(), }, - _ => e.into(), })?; - let bytes = get_result.bytes().await?; - let offset = read_metadata_offset(&bytes)?; + let offset = read_metadata_offset(&last_block)?; let mut manifest: Manifest = read_struct(object_reader.as_ref(), offset).await?; if !can_read_dataset(manifest.reader_feature_flags) { @@ -388,7 +356,7 @@ impl Dataset { // Read expected manifest path for the dataset let dataset_exists = match commit_handler - .resolve_latest_version(&base, &object_store.inner) + .resolve_latest_version(&base, &object_store) .await { Ok(_) => true, @@ -621,7 +589,7 @@ impl Dataset { &self.object_store, &self .commit_handler - .resolve_latest_version(&self.base, &self.object_store.inner) + .resolve_latest_version(&self.base, &self.object_store) .await?, ) .await @@ -749,7 +717,7 @@ impl Dataset { // Test if the dataset exists let dataset_exists = match commit_handler - .resolve_latest_version(&base, &object_store.inner) + .resolve_latest_version(&base, &object_store) .await { Ok(_) => true, @@ -1097,7 +1065,7 @@ impl Dataset { /// we don't return the full version struct. pub async fn latest_version_id(&self) -> Result { self.commit_handler - .resolve_latest_version_id(&self.base, &self.object_store.inner) + .resolve_latest_version_id(&self.base, &self.object_store) .await } @@ -1606,7 +1574,7 @@ mod tests { dataset.object_store(), &dataset .commit_handler - .resolve_latest_version(&dataset.base, &dataset.object_store().inner) + .resolve_latest_version(&dataset.base, dataset.object_store()) .await .unwrap(), ) @@ -1624,7 +1592,7 @@ mod tests { dataset.object_store(), &dataset .commit_handler - .resolve_latest_version(&dataset.base, &dataset.object_store().inner) + .resolve_latest_version(&dataset.base, dataset.object_store()) .await .unwrap(), ) @@ -2041,7 +2009,11 @@ mod tests { assert_eq!(actual_ds.version().version, 2); // But we can still check out the first version - let first_ver = Dataset::checkout(test_uri, 1).await.unwrap(); + let first_ver = DatasetBuilder::from_uri(test_uri) + .with_version(1) + .load() + .await + .unwrap(); assert_eq!(first_ver.version().version, 1); assert_eq!(&ArrowSchema::from(first_ver.schema()), schema.as_ref()); } diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index d32ab2132a..a3a7f240ea 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; -use lance_table::io::commit::{commit_handler_from_url, CommitHandler}; +use lance_table::io::commit::{commit_handler_from_url, CommitHandler, ManifestLocation}; use object_store::{aws::AwsCredentialProvider, DynObjectStore}; use snafu::{location, Location}; use tracing::instrument; @@ -213,15 +213,20 @@ impl DatasetBuilder { let version = self.version; let (object_store, commit_handler) = self.build_object_store().await?; - let base_path = object_store.base_path(); + let base_path = object_store.base_path().clone(); let manifest = match version { Some(version) => { - commit_handler - .resolve_version(base_path, version, &object_store.inner) - .await? + let path = commit_handler + .resolve_version(&base_path, version, &object_store.inner) + .await?; + ManifestLocation { + version, + path, + size: None, + } } None => commit_handler - .resolve_latest_version(base_path, &object_store.inner) + .resolve_latest_location(&base_path, &object_store) .await .map_err(|e| Error::DatasetNotFound { path: base_path.to_string(), @@ -231,7 +236,7 @@ impl DatasetBuilder { }; Dataset::checkout_manifest( - Arc::new(object_store.clone()), + Arc::new(object_store), base_path.clone(), &manifest, session, diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 42bfe2c5f1..ee1ab1ea6c 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -130,7 +130,7 @@ impl IndexInformationProvider for ScalarIndexInfo { async fn open_index_proto(reader: &dyn Reader) -> Result { let file_size = reader.size().await?; - let tail_bytes = read_last_block(reader).await?; + let tail_bytes = read_last_block(reader, Some(file_size as u64)).await?; let metadata_pos = read_metadata_offset(&tail_bytes)?; let proto: pb::Index = if metadata_pos < file_size - tail_bytes.len() { // We have not read the metadata bytes yet. @@ -512,7 +512,7 @@ impl DatasetIndexInternalExt for Dataset { let index_file = index_dir.child(INDEX_FILE_NAME); let reader: Arc = self.object_store.open(&index_file).await?.into(); - let tailing_bytes = read_last_block(reader.as_ref()).await?; + let tailing_bytes = read_last_block(reader.as_ref(), None).await?; let (major_version, minor_version) = read_version(&tailing_bytes)?; // the index file is in lance format since version (0,2) From 2ea6742c3782a58e0970ec53529cb0e76f16f339 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 26 May 2024 13:11:07 -0700 Subject: [PATCH 3/8] cleanup --- docs/format.rst | 24 ------------------------ rust/lance-table/src/io/commit.rs | 10 ---------- rust/lance/src/dataset.rs | 4 ++-- 3 files changed, 2 insertions(+), 36 deletions(-) diff --git a/docs/format.rst b/docs/format.rst index b13be67b48..b6e30777da 100644 --- a/docs/format.rst +++ b/docs/format.rst @@ -331,30 +331,6 @@ The reader load process is as follows: 3. ``PUT_EXTERNAL_STORE base_uri, version, mydataset.lance/_versions/{version}.manifest`` update the external store to point to the final manifest 4. ``RETURN mydataset.lance/_versions/{version}.manifest`` always return the finalized path, return error if synchronization fails -Latest manifest file --------------------- - -To get fast access to the latest version, there is a pointer file called -``latest.json``. This file contains information about what the latest version -of the table is. It is a JSON file with the fields: - -.. code-block:: - - { - "version": 24, - "manifestPath": "_versions/24.manifest", - "manifestSize": 2342501, - "readerFlags": 4, - "writerFlags": 5 - } - -The ``version`` field is the version of the table. The ``manifestPath`` field is -the path to the manifest file for that version. ``manifestSize`` is the size of -the manifest file in bytes, which can be used to bypass the need for a head -request to find the end of the manifest file. The ``readerFlags`` and -``writerFlags`` fields are the feature flags for the reader and writer, -respectively. - Statistics ---------- diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index aa00950a8b..8eec37a0ed 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -32,7 +32,6 @@ use futures::{ StreamExt, TryStreamExt, }; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; -use serde::{Deserialize, Serialize}; use snafu::{location, Location}; use url::Url; @@ -62,15 +61,6 @@ const LATEST_MANIFEST_NAME: &str = "_latest.manifest"; const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; -#[derive(Debug, Serialize, Deserialize)] -struct LatestManifestPointer { - version: u64, - location: String, - size: u64, - reader_flags: u64, - writer_flags: u64, -} - /// Function that writes the manifest to the object store. pub type ManifestWriter = for<'a> fn( object_store: &'a dyn OSObjectStore, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 710f71b27f..4f7903fc23 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -310,8 +310,8 @@ impl Dataset { .await .map_err(|err| match err { object_store::Error::NotFound { path, source } => Error::DatasetNotFound { - path: path.to_string(), - source: source.into(), + path, + source, location: location!(), }, _ => Error::IO { From 450b9f4b4773a823e7b6b5d2ee7932b46281f494 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 26 May 2024 14:37:47 -0700 Subject: [PATCH 4/8] Test number of IOPS --- rust/lance-file/src/reader.rs | 2 +- rust/lance-index/src/vector/ivf/storage.rs | 1 + rust/lance-index/src/vector/pq/storage.rs | 8 ++- rust/lance-io/src/utils.rs | 23 ++++++-- rust/lance-table/src/format/manifest.rs | 2 +- rust/lance-table/src/io/manifest.rs | 2 +- rust/lance/src/dataset.rs | 69 +++++++++++++++++++++- rust/lance/src/index.rs | 2 +- rust/lance/src/utils/test.rs | 3 + 9 files changed, 98 insertions(+), 14 deletions(-) diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index 41f8c7f4a5..c21eceb40c 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -189,7 +189,7 @@ impl FileReader { let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() { // We have not read the metadata bytes yet. - read_struct(object_reader, metadata_pos).await? + read_struct(object_reader, metadata_pos, Some(file_size)).await? } else { let offset = tail_bytes.len() - (file_size - metadata_pos); read_struct_from_buf(&tail_bytes.slice(offset..))? diff --git a/rust/lance-index/src/vector/ivf/storage.rs b/rust/lance-index/src/vector/ivf/storage.rs index c0a97ecc98..9456276095 100644 --- a/rust/lance-index/src/vector/ivf/storage.rs +++ b/rust/lance-index/src/vector/ivf/storage.rs @@ -70,6 +70,7 @@ impl IvfData { let pb: PbIvf = read_message( reader.object_reader.as_ref(), ivf_metadata.pb_position as usize, + None, ) .await?; Self::try_from(pb) diff --git a/rust/lance-index/src/vector/pq/storage.rs b/rust/lance-index/src/vector/pq/storage.rs index d68362f148..16567f6267 100644 --- a/rust/lance-index/src/vector/pq/storage.rs +++ b/rust/lance-index/src/vector/pq/storage.rs @@ -74,8 +74,12 @@ impl QuantizerMetadata for ProductQuantizationMetadata { location: location!(), })?; - let codebook_tensor: pb::Tensor = - read_message(reader.object_reader.as_ref(), metadata.codebook_position).await?; + let codebook_tensor: pb::Tensor = read_message( + reader.object_reader.as_ref(), + metadata.codebook_position, + None, + ) + .await?; metadata.codebook = Some(FixedSizeListArray::try_from(&codebook_tensor)?); Ok(metadata) } diff --git a/rust/lance-io/src/utils.rs b/rust/lance-io/src/utils.rs index bb5c095bb4..1a38639b3f 100644 --- a/rust/lance-io/src/utils.rs +++ b/rust/lance-io/src/utils.rs @@ -77,14 +77,24 @@ pub async fn read_fixed_stride_array( } /// Read a protobuf message at file position 'pos'. -// TODO: pub(crate) -pub async fn read_message(reader: &dyn Reader, pos: usize) -> Result { - let file_size = reader.size().await?; +/// +/// We write protobuf by first writing the length of the message as a u32, +/// followed by the message itself. +pub async fn read_message( + reader: &dyn Reader, + pos: usize, + file_size: Option, +) -> Result { + let file_size = if let Some(file_size) = file_size { + file_size + } else { + reader.size().await? + }; if pos > file_size { return Err(Error::io("file size is too small".to_string(), location!())); } - let range = pos..min(pos + 4096, file_size); + let range = pos..min(pos + reader.block_size(), file_size); let buf = reader.get_range(range.clone()).await?; let msg_len = LittleEndian::read_u32(&buf) as usize; @@ -108,8 +118,9 @@ pub async fn read_struct< >( reader: &dyn Reader, pos: usize, + file_size: Option, ) -> Result { - let msg = read_message::(reader, pos).await?; + let msg = read_message::(reader, pos, file_size).await?; T::try_from(msg) } @@ -233,7 +244,7 @@ mod tests { object_writer.shutdown().await.unwrap(); let object_reader = CloudObjectReader::new(Arc::new(store), path, 1024).unwrap(); - let actual: BytesWrapper = read_struct(&object_reader, pos).await.unwrap(); + let actual: BytesWrapper = read_struct(&object_reader, pos, None).await.unwrap(); assert_eq!(some_message, actual); } } diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index d6f9aa57f8..de39cf51d3 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -456,7 +456,7 @@ impl SelfDescribingFileReader for FileReader { ), location: location!(), })?; - let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?; + let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position, None).await?; populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; let schema = manifest.schema; let max_field_id = schema.max_field_id().unwrap_or_default(); diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index 332b94f881..921c8af5a6 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -101,7 +101,7 @@ pub async fn read_manifest_indexes( ) -> Result> { if let Some(pos) = manifest.index_section.as_ref() { let reader = object_store.open(path).await?; - let section: pb::IndexSection = read_message(reader.as_ref(), *pos).await?; + let section: pb::IndexSection = read_message(reader.as_ref(), *pos, None).await?; Ok(section .indices diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 4f7903fc23..8abe88f705 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -5,6 +5,7 @@ //! use arrow_array::{RecordBatch, RecordBatchReader}; +use byteorder::{ByteOrder, LittleEndian}; use chrono::{prelude::*, Duration}; use futures::future::BoxFuture; use futures::stream::{self, StreamExt, TryStreamExt}; @@ -306,7 +307,12 @@ impl Dataset { }, _ => e, })?; - let last_block = read_last_block(object_reader.as_ref(), manifest_location.size) + let manifest_size = if let Some(size) = manifest_location.size { + size + } else { + object_reader.size().await? as u64 + }; + let last_block = read_last_block(object_reader.as_ref(), Some(manifest_size)) .await .map_err(|err| match err { object_store::Error::NotFound { path, source } => Error::DatasetNotFound { @@ -320,7 +326,15 @@ impl Dataset { }, })?; let offset = read_metadata_offset(&last_block)?; - let mut manifest: Manifest = read_struct(object_reader.as_ref(), offset).await?; + + // If manifest is in the last block, we can decode directly from memory. + let mut manifest = if manifest_size - (offset as u64) <= (last_block.len() as u64) { + let message_len = LittleEndian::read_u32(&last_block[offset..offset + 4]) as usize; + let message_data = &last_block[offset + 4..offset + 4 + message_len]; + Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)? + } else { + read_struct(object_reader.as_ref(), offset, Some(manifest_size as usize)).await? + }; if !can_read_dataset(manifest.reader_feature_flags) { let message = format!( @@ -1270,11 +1284,13 @@ mod tests { use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_table::format::WriterVersion; + use lance_table::io::commit::RenameCommitHandler; use lance_table::io::deletion::read_deletion_file; use lance_testing::datagen::generate_random_array; use pretty_assertions::assert_eq; use rstest::rstest; use tempfile::{tempdir, TempDir}; + use url::Url; // Used to validate that futures returned are Send. fn require_send(t: T) -> T { @@ -1492,6 +1508,55 @@ mod tests { assert_eq!(result.manifest.max_fragment_id(), None); } + #[tokio::test] + async fn test_load_manifest_iops() { + // Need to use in-memory for accurate IOPS tracking. + use crate::utils::test::IoTrackingStore; + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..10_i32))], + ) + .unwrap(); + let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let dataset = Dataset::write(batches, "memory://test", None) + .await + .unwrap(); + + // Then open with wrapping store. + let memory_store = dataset.object_store.inner.clone(); + let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper(); + let _dataset = DatasetBuilder::from_uri("memory://test") + .with_read_params(ReadParams { + store_options: Some(ObjectStoreParams { + object_store_wrapper: Some(io_stats_wrapper), + ..Default::default() + }), + ..Default::default() + }) + .with_object_store( + memory_store, + Url::parse("memory://test").unwrap(), + Arc::new(RenameCommitHandler), + ) + .load() + .await + .unwrap(); + + let get_iops = || io_stats.lock().unwrap().read_iops; + + // There should be only two IOPS: + // 1. List _versions directory to get the latest manifest location + // 2. Read the manifest file. (The manifest is small enough to be read in one go. + // Larger manifests would result in more IOPS.) + assert_eq!(get_iops(), 2); + } + #[rstest] #[tokio::test] async fn test_write_params(#[values(false, true)] use_experimental_writer: bool) { diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index ee1ab1ea6c..9beeb0f121 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -134,7 +134,7 @@ async fn open_index_proto(reader: &dyn Reader) -> Result { let metadata_pos = read_metadata_offset(&tail_bytes)?; let proto: pb::Index = if metadata_pos < file_size - tail_bytes.len() { // We have not read the metadata bytes yet. - read_message(reader, metadata_pos).await? + read_message(reader, metadata_pos, Some(file_size)).await? } else { let offset = tail_bytes.len() - (file_size - metadata_pos); read_message_from_buf(&tail_bytes.slice(offset..))? diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 07362876e5..4de5f71bbb 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -353,6 +353,7 @@ impl ObjectStore for IoTrackingStore { } async fn head(&self, location: &Path) -> OSResult { + self.record_read(0); self.target.head(location).await } @@ -368,10 +369,12 @@ impl ObjectStore for IoTrackingStore { } fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, OSResult> { + self.record_read(0); self.target.list(prefix) } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { + self.record_read(0); self.target.list_with_delimiter(prefix).await } From 3ddde4b87ea366f28f1655882b0e2702632c8eb1 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 28 May 2024 11:19:42 -0700 Subject: [PATCH 5/8] refactor as cache --- rust/lance-file/src/page_table.rs | 4 +- rust/lance-file/src/reader.rs | 2 +- rust/lance-index/src/vector/ivf/storage.rs | 1 - rust/lance-index/src/vector/pq/storage.rs | 8 +-- rust/lance-io/src/encodings/binary.rs | 14 +++--- rust/lance-io/src/encodings/dictionary.rs | 2 +- rust/lance-io/src/encodings/plain.rs | 6 +-- rust/lance-io/src/local.rs | 46 ++++++++++------- rust/lance-io/src/object_reader.rs | 24 +++++++-- rust/lance-io/src/object_store.rs | 20 +++++++- rust/lance-io/src/utils.rs | 30 +++-------- rust/lance-table/src/format/manifest.rs | 2 +- rust/lance-table/src/io/manifest.rs | 2 +- rust/lance/src/dataset.rs | 55 +++++++++++---------- rust/lance/src/index.rs | 6 +-- rust/lance/src/index/vector/fixture_test.rs | 2 +- 16 files changed, 123 insertions(+), 101 deletions(-) diff --git a/rust/lance-file/src/page_table.rs b/rust/lance-file/src/page_table.rs index 18007bba3c..82e304d535 100644 --- a/rust/lance-file/src/page_table.rs +++ b/rust/lance-file/src/page_table.rs @@ -215,7 +215,7 @@ mod tests { .unwrap(); writer.shutdown().await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let actual = PageTable::load( @@ -284,7 +284,7 @@ mod tests { let mut writer = tokio::fs::File::create(&path).await.unwrap(); let res = page_table.write(&mut writer, 0).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index c21eceb40c..41f8c7f4a5 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -189,7 +189,7 @@ impl FileReader { let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() { // We have not read the metadata bytes yet. - read_struct(object_reader, metadata_pos, Some(file_size)).await? + read_struct(object_reader, metadata_pos).await? } else { let offset = tail_bytes.len() - (file_size - metadata_pos); read_struct_from_buf(&tail_bytes.slice(offset..))? diff --git a/rust/lance-index/src/vector/ivf/storage.rs b/rust/lance-index/src/vector/ivf/storage.rs index 9456276095..c0a97ecc98 100644 --- a/rust/lance-index/src/vector/ivf/storage.rs +++ b/rust/lance-index/src/vector/ivf/storage.rs @@ -70,7 +70,6 @@ impl IvfData { let pb: PbIvf = read_message( reader.object_reader.as_ref(), ivf_metadata.pb_position as usize, - None, ) .await?; Self::try_from(pb) diff --git a/rust/lance-index/src/vector/pq/storage.rs b/rust/lance-index/src/vector/pq/storage.rs index 16567f6267..d68362f148 100644 --- a/rust/lance-index/src/vector/pq/storage.rs +++ b/rust/lance-index/src/vector/pq/storage.rs @@ -74,12 +74,8 @@ impl QuantizerMetadata for ProductQuantizationMetadata { location: location!(), })?; - let codebook_tensor: pb::Tensor = read_message( - reader.object_reader.as_ref(), - metadata.codebook_position, - None, - ) - .await?; + let codebook_tensor: pb::Tensor = + read_message(reader.object_reader.as_ref(), metadata.codebook_position).await?; metadata.codebook = Some(FixedSizeListArray::try_from(&codebook_tensor)?); Ok(metadata) } diff --git a/rust/lance-io/src/encodings/binary.rs b/rust/lance-io/src/encodings/binary.rs index 8c67c1df43..af6ec57507 100644 --- a/rust/lance-io/src/encodings/binary.rs +++ b/rust/lance-io/src/encodings/binary.rs @@ -137,7 +137,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> { /// use lance_io::{local::LocalObjectReader, encodings::binary::BinaryDecoder, traits::Reader}; /// /// async { - /// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048).await.unwrap(); + /// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048, None).await.unwrap(); /// let string_decoder = BinaryDecoder::::new(reader.as_ref(), 100, 1024, true); /// }; /// ``` @@ -494,7 +494,7 @@ mod tests { let pos = write_test_data(&path, arrs).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let read_len = arrs.iter().map(|a| a.len()).sum(); @@ -562,7 +562,7 @@ mod tests { let pos = encoder.encode(&[&data]).await.unwrap(); object_writer.shutdown().await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), false); @@ -605,7 +605,7 @@ mod tests { let path = temp_dir.path().join("foo"); let pos = write_test_data(&path, &[&data]).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), false); @@ -627,7 +627,7 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let path = temp_dir.path().join("foo"); let pos = write_test_data(&path, &[&data]).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), false); @@ -658,7 +658,7 @@ mod tests { let path = temp_dir.path().join("foo"); let pos = write_test_data(&path, &[&data]).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), false); @@ -738,7 +738,7 @@ mod tests { pos }; - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), true); diff --git a/rust/lance-io/src/encodings/dictionary.rs b/rust/lance-io/src/encodings/dictionary.rs index de74783dd5..72b150e023 100644 --- a/rust/lance-io/src/encodings/dictionary.rs +++ b/rust/lance-io/src/encodings/dictionary.rs @@ -246,7 +246,7 @@ mod tests { object_writer.shutdown().await.unwrap(); } - let reader = LocalObjectReader::open_local_path(&path, 2048) + let reader = LocalObjectReader::open_local_path(&path, 2048, None) .await .unwrap(); let decoder = DictionaryDecoder::new( diff --git a/rust/lance-io/src/encodings/plain.rs b/rust/lance-io/src/encodings/plain.rs index 1dd3ee3660..1911cd13c9 100644 --- a/rust/lance-io/src/encodings/plain.rs +++ b/rust/lance-io/src/encodings/plain.rs @@ -581,7 +581,7 @@ mod tests { writer.flush().await.unwrap(); } - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); assert!(reader.size().await.unwrap() > 0); @@ -705,7 +705,7 @@ mod tests { writer.flush().await.unwrap(); } - let reader = LocalObjectReader::open_local_path(&path, 2048) + let reader = LocalObjectReader::open_local_path(&path, 2048, None) .await .unwrap(); assert!(reader.size().await.unwrap() > 0); @@ -753,7 +753,7 @@ mod tests { writer.shutdown().await.unwrap(); } - let reader = LocalObjectReader::open_local_path(&path, 2048) + let reader = LocalObjectReader::open_local_path(&path, 2048, None) .await .unwrap(); assert!(reader.size().await.unwrap() > 0); diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs index 407a0c5a0b..bc39622787 100644 --- a/rust/lance-io/src/local.rs +++ b/rust/lance-io/src/local.rs @@ -20,6 +20,7 @@ use lance_core::{Error, Result}; use object_store::path::Path; use snafu::{location, Location}; use tokio::io::AsyncSeekExt; +use tokio::sync::OnceCell; use tracing::instrument; use crate::traits::{Reader, Writer}; @@ -55,6 +56,10 @@ pub struct LocalObjectReader { /// Fie path. path: Path, + /// Known size of the file. This is either passed in on construction or + /// cached on the first metadata call. + size: OnceCell, + /// Block size, in bytes. block_size: usize, } @@ -63,23 +68,20 @@ impl LocalObjectReader { pub async fn open_local_path( path: impl AsRef, block_size: usize, + known_size: Option, ) -> Result> { let path = path.as_ref().to_owned(); let object_store_path = Path::from_filesystem_path(&path)?; - tokio::task::spawn_blocking(move || { - let local_file = File::open(&path)?; - Ok(Box::new(Self { - file: Arc::new(local_file), - path: object_store_path, - block_size, - }) as Box) - }) - .await? + Self::open(&object_store_path, block_size, known_size).await } /// Open a local object reader, with default prefetch size. #[instrument(level = "debug")] - pub async fn open(path: &Path, block_size: usize) -> Result> { + pub async fn open( + path: &Path, + block_size: usize, + known_size: Option, + ) -> Result> { let path = path.clone(); let local_path = to_local_path(&path); tokio::task::spawn_blocking(move || { @@ -90,9 +92,11 @@ impl LocalObjectReader { }, _ => e.into(), })?; + let size = OnceCell::new_with(known_size); Ok(Box::new(Self { file: Arc::new(file), block_size, + size, path: path.clone(), }) as Box) }) @@ -112,14 +116,20 @@ impl Reader for LocalObjectReader { /// Returns the file size. async fn size(&self) -> object_store::Result { - let metadata = self - .file - .metadata() - .map_err(|err| object_store::Error::Generic { - store: "LocalFileSystem", - source: err.into(), - })?; - Ok(metadata.len() as usize) + let file = self.file.clone(); + self.size + .get_or_try_init(|| async move { + let metadata = tokio::task::spawn_blocking(move || { + file.metadata().map_err(|err| object_store::Error::Generic { + store: "LocalFileSystem", + source: err.into(), + }) + }) + .await??; + Ok(metadata.len() as usize) + }) + .await + .cloned() } /// Reads a range of data. diff --git a/rust/lance-io/src/object_reader.rs b/rust/lance-io/src/object_reader.rs index 1ec74dd2b3..c856f32460 100644 --- a/rust/lance-io/src/object_reader.rs +++ b/rust/lance-io/src/object_reader.rs @@ -9,6 +9,7 @@ use bytes::Bytes; use futures::future::BoxFuture; use lance_core::Result; use object_store::{path::Path, ObjectStore}; +use tokio::sync::OnceCell; use tracing::instrument; use crate::traits::Reader; @@ -22,16 +23,24 @@ pub struct CloudObjectReader { pub object_store: Arc, // File path pub path: Path, + // File size, if known. + size: OnceCell, block_size: usize, } impl CloudObjectReader { /// Create an ObjectReader from URI - pub fn new(object_store: Arc, path: Path, block_size: usize) -> Result { + pub fn new( + object_store: Arc, + path: Path, + block_size: usize, + known_size: Option, + ) -> Result { Ok(Self { object_store, path, + size: OnceCell::new_with(known_size), block_size, }) } @@ -70,10 +79,15 @@ impl Reader for CloudObjectReader { /// Object/File Size. async fn size(&self) -> object_store::Result { - let meta = self - .do_with_retry(|| self.object_store.head(&self.path)) - .await?; - Ok(meta.size) + self.size + .get_or_try_init(|| async move { + let meta = self + .do_with_retry(|| self.object_store.head(&self.path)) + .await?; + Ok(meta.size) + }) + .await + .cloned() } #[instrument(level = "debug", skip(self))] diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index de8b4726da..e44b10f501 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -440,11 +440,29 @@ impl ObjectStore { /// - ``path``: Absolute path to the file. pub async fn open(&self, path: &Path) -> Result> { match self.scheme.as_str() { - "file" => LocalObjectReader::open(path, self.block_size).await, + "file" => LocalObjectReader::open(path, self.block_size, None).await, _ => Ok(Box::new(CloudObjectReader::new( self.inner.clone(), path.clone(), self.block_size, + None, + )?)), + } + } + + /// Open a reader for a file with known size. + /// + /// This size may either have been retrieved from a list operation or + /// cached metadata. By passing in the known size, we can skip a HEAD / metadata + /// call. + pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result> { + match self.scheme.as_str() { + "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await, + _ => Ok(Box::new(CloudObjectReader::new( + self.inner.clone(), + path.clone(), + self.block_size, + Some(known_size), )?)), } } diff --git a/rust/lance-io/src/utils.rs b/rust/lance-io/src/utils.rs index 1a38639b3f..4e9309b933 100644 --- a/rust/lance-io/src/utils.rs +++ b/rust/lance-io/src/utils.rs @@ -80,16 +80,8 @@ pub async fn read_fixed_stride_array( /// /// We write protobuf by first writing the length of the message as a u32, /// followed by the message itself. -pub async fn read_message( - reader: &dyn Reader, - pos: usize, - file_size: Option, -) -> Result { - let file_size = if let Some(file_size) = file_size { - file_size - } else { - reader.size().await? - }; +pub async fn read_message(reader: &dyn Reader, pos: usize) -> Result { + let file_size = reader.size().await?; if pos > file_size { return Err(Error::io("file size is too small".to_string(), location!())); } @@ -118,21 +110,13 @@ pub async fn read_struct< >( reader: &dyn Reader, pos: usize, - file_size: Option, ) -> Result { - let msg = read_message::(reader, pos, file_size).await?; + let msg = read_message::(reader, pos).await?; T::try_from(msg) } -pub async fn read_last_block( - reader: &dyn Reader, - size: Option, -) -> object_store::Result { - let file_size = if let Some(size) = size { - size as usize - } else { - reader.size().await? - }; +pub async fn read_last_block(reader: &dyn Reader) -> object_store::Result { + let file_size = reader.size().await?; let block_size = reader.block_size(); let begin = if file_size < block_size { 0 @@ -243,8 +227,8 @@ mod tests { assert_eq!(pos, 0); object_writer.shutdown().await.unwrap(); - let object_reader = CloudObjectReader::new(Arc::new(store), path, 1024).unwrap(); - let actual: BytesWrapper = read_struct(&object_reader, pos, None).await.unwrap(); + let object_reader = CloudObjectReader::new(Arc::new(store), path, 1024, None).unwrap(); + let actual: BytesWrapper = read_struct(&object_reader, pos).await.unwrap(); assert_eq!(some_message, actual); } } diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index de39cf51d3..d6f9aa57f8 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -456,7 +456,7 @@ impl SelfDescribingFileReader for FileReader { ), location: location!(), })?; - let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position, None).await?; + let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?; populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; let schema = manifest.schema; let max_field_id = schema.max_field_id().unwrap_or_default(); diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index 921c8af5a6..332b94f881 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -101,7 +101,7 @@ pub async fn read_manifest_indexes( ) -> Result> { if let Some(pos) = manifest.index_section.as_ref() { let reader = object_store.open(path).await?; - let section: pb::IndexSection = read_message(reader.as_ref(), *pos, None).await?; + let section: pb::IndexSection = read_message(reader.as_ref(), *pos).await?; Ok(section .indices diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8abe88f705..01ee8fead8 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -295,45 +295,46 @@ impl Dataset { session: Arc, commit_handler: Arc, ) -> Result { - let object_reader = + let object_reader = if let Some(size) = manifest_location.size { object_store - .open(&manifest_location.path) + .open_with_size(&manifest_location.path, size as usize) .await - .map_err(|e| match &e { - Error::NotFound { uri, .. } => Error::DatasetNotFound { - path: uri.clone(), - source: box_error(e), + } else { + object_store.open(&manifest_location.path).await + }; + let object_reader = object_reader.map_err(|e| match &e { + Error::NotFound { uri, .. } => Error::DatasetNotFound { + path: uri.clone(), + source: box_error(e), + location: location!(), + }, + _ => e, + })?; + + let last_block = + read_last_block(object_reader.as_ref()) + .await + .map_err(|err| match err { + object_store::Error::NotFound { path, source } => Error::DatasetNotFound { + path, + source, + location: location!(), + }, + _ => Error::IO { + source: err.into(), location: location!(), }, - _ => e, })?; - let manifest_size = if let Some(size) = manifest_location.size { - size - } else { - object_reader.size().await? as u64 - }; - let last_block = read_last_block(object_reader.as_ref(), Some(manifest_size)) - .await - .map_err(|err| match err { - object_store::Error::NotFound { path, source } => Error::DatasetNotFound { - path, - source, - location: location!(), - }, - _ => Error::IO { - source: err.into(), - location: location!(), - }, - })?; let offset = read_metadata_offset(&last_block)?; // If manifest is in the last block, we can decode directly from memory. - let mut manifest = if manifest_size - (offset as u64) <= (last_block.len() as u64) { + let manifest_size = object_reader.size().await?; + let mut manifest = if manifest_size - offset <= last_block.len() { let message_len = LittleEndian::read_u32(&last_block[offset..offset + 4]) as usize; let message_data = &last_block[offset + 4..offset + 4 + message_len]; Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)? } else { - read_struct(object_reader.as_ref(), offset, Some(manifest_size as usize)).await? + read_struct(object_reader.as_ref(), offset).await? }; if !can_read_dataset(manifest.reader_feature_flags) { diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 9beeb0f121..42bfe2c5f1 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -130,11 +130,11 @@ impl IndexInformationProvider for ScalarIndexInfo { async fn open_index_proto(reader: &dyn Reader) -> Result { let file_size = reader.size().await?; - let tail_bytes = read_last_block(reader, Some(file_size as u64)).await?; + let tail_bytes = read_last_block(reader).await?; let metadata_pos = read_metadata_offset(&tail_bytes)?; let proto: pb::Index = if metadata_pos < file_size - tail_bytes.len() { // We have not read the metadata bytes yet. - read_message(reader, metadata_pos, Some(file_size)).await? + read_message(reader, metadata_pos).await? } else { let offset = tail_bytes.len() - (file_size - metadata_pos); read_message_from_buf(&tail_bytes.slice(offset..))? @@ -512,7 +512,7 @@ impl DatasetIndexInternalExt for Dataset { let index_file = index_dir.child(INDEX_FILE_NAME); let reader: Arc = self.object_store.open(&index_file).await?.into(); - let tailing_bytes = read_last_block(reader.as_ref(), None).await?; + let tailing_bytes = read_last_block(reader.as_ref()).await?; let (major_version, minor_version) = read_version(&tailing_bytes)?; // the index file is in lance format since version (0,2) diff --git a/rust/lance/src/index/vector/fixture_test.rs b/rust/lance/src/index/vector/fixture_test.rs index f047b7eb6b..c8c49dc938 100644 --- a/rust/lance/src/index/vector/fixture_test.rs +++ b/rust/lance/src/index/vector/fixture_test.rs @@ -132,7 +132,7 @@ mod test { let make_idx = move |assert_query: Vec, metric: MetricType| async move { let f = tempfile::NamedTempFile::new().unwrap(); - let reader = LocalObjectReader::open_local_path(f.path(), 64) + let reader = LocalObjectReader::open_local_path(f.path(), 64, None) .await .unwrap(); From 7de96ef97a6d9bc3117f889511285ce45a91648b Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 28 May 2024 11:32:55 -0700 Subject: [PATCH 6/8] remove references to latest.manifest --- docs/format.rst | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/format.rst b/docs/format.rst index b6e30777da..fa163c8609 100644 --- a/docs/format.rst +++ b/docs/format.rst @@ -10,7 +10,6 @@ A `Lance Dataset` is organized in a directory. /path/to/dataset: data/*.lance -- Data directory - latest.manifest -- The manifest file for the latest version. _versions/*.manifest -- Manifest file for each dataset version. _indices/{UUID-*}/index.idx -- Secondary index, each index per directory. _deletions/*.{arrow,bin} -- Deletion files, which contain ids of rows @@ -249,8 +248,7 @@ Committing Datasets ------------------- A new version of a dataset is committed by writing a new manifest file to the -``_versions`` directory. Only after successfully committing this file should -the ``_latest.manifest`` file be updated. +``_versions`` directory. To prevent concurrent writers from overwriting each other, the commit process must be atomic and consistent for all writers. If two writers try to commit @@ -287,7 +285,6 @@ The commit process is as follows: conflicts, abort the commit. Otherwise, continue. 4. Build a manifest and attempt to commit it to the next version. If the commit fails because another writer has already committed, go back to step 3. - 5. If the commit succeeds, update the ``_latest.manifest`` file. When checking whether two transactions conflict, be conservative. If the transaction file is missing, assume it conflicts. If the transaction file From 066fdabc3f2e03a11859d0ceae2b0bd3bbdacb00 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 28 May 2024 13:02:23 -0700 Subject: [PATCH 7/8] hopefully fix windows --- rust/lance-table/src/io/commit.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 8eec37a0ed..3605bc5fe7 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -22,6 +22,7 @@ //! terms of a lock. The trait [CommitLock] can be implemented as a simpler //! alternative to [CommitHandler]. +use std::io; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::{fmt::Debug, fs::DirEntry}; @@ -177,9 +178,11 @@ fn current_manifest_local(base: &Path) -> std::io::Result Date: Wed, 29 May 2024 09:41:25 -0700 Subject: [PATCH 8/8] Update rust/lance-table/src/io/commit/external_manifest.rs Co-authored-by: Weston Pace --- rust/lance-table/src/io/commit/external_manifest.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index f42c0f535c..a7620da7b7 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -44,6 +44,10 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { async fn get_latest_version(&self, base_uri: &str) -> Result>; /// Get the latest manifest location for a given base_uri. + /// + /// By default, this calls get_latest_version. Impls should + /// override this method if they store both the location and size + /// of the latest manifest. async fn get_latest_manifest_location( &self, base_uri: &str,