diff --git a/docs/format.rst b/docs/format.rst index b6e30777da..fa163c8609 100644 --- a/docs/format.rst +++ b/docs/format.rst @@ -10,7 +10,6 @@ A `Lance Dataset` is organized in a directory. /path/to/dataset: data/*.lance -- Data directory - latest.manifest -- The manifest file for the latest version. _versions/*.manifest -- Manifest file for each dataset version. _indices/{UUID-*}/index.idx -- Secondary index, each index per directory. _deletions/*.{arrow,bin} -- Deletion files, which contain ids of rows @@ -249,8 +248,7 @@ Committing Datasets ------------------- A new version of a dataset is committed by writing a new manifest file to the -``_versions`` directory. Only after successfully committing this file should -the ``_latest.manifest`` file be updated. +``_versions`` directory. To prevent concurrent writers from overwriting each other, the commit process must be atomic and consistent for all writers. If two writers try to commit @@ -287,7 +285,6 @@ The commit process is as follows: conflicts, abort the commit. Otherwise, continue. 4. Build a manifest and attempt to commit it to the next version. If the commit fails because another writer has already committed, go back to step 3. - 5. If the commit succeeds, update the ``_latest.manifest`` file. When checking whether two transactions conflict, be conservative. If the transaction file is missing, assume it conflicts. If the transaction file diff --git a/rust/lance-file/src/page_table.rs b/rust/lance-file/src/page_table.rs index 18007bba3c..82e304d535 100644 --- a/rust/lance-file/src/page_table.rs +++ b/rust/lance-file/src/page_table.rs @@ -215,7 +215,7 @@ mod tests { .unwrap(); writer.shutdown().await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let actual = PageTable::load( @@ -284,7 +284,7 @@ mod tests { let mut writer = tokio::fs::File::create(&path).await.unwrap(); let res = page_table.write(&mut writer, 0).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); diff --git a/rust/lance-io/src/encodings/binary.rs b/rust/lance-io/src/encodings/binary.rs index 8c67c1df43..af6ec57507 100644 --- a/rust/lance-io/src/encodings/binary.rs +++ b/rust/lance-io/src/encodings/binary.rs @@ -137,7 +137,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> { /// use lance_io::{local::LocalObjectReader, encodings::binary::BinaryDecoder, traits::Reader}; /// /// async { - /// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048).await.unwrap(); + /// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048, None).await.unwrap(); /// let string_decoder = BinaryDecoder::::new(reader.as_ref(), 100, 1024, true); /// }; /// ``` @@ -494,7 +494,7 @@ mod tests { let pos = write_test_data(&path, arrs).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let read_len = arrs.iter().map(|a| a.len()).sum(); @@ -562,7 +562,7 @@ mod tests { let pos = encoder.encode(&[&data]).await.unwrap(); object_writer.shutdown().await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), false); @@ -605,7 +605,7 @@ mod tests { let path = temp_dir.path().join("foo"); let pos = write_test_data(&path, &[&data]).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), false); @@ -627,7 +627,7 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let path = temp_dir.path().join("foo"); let pos = write_test_data(&path, &[&data]).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), false); @@ -658,7 +658,7 @@ mod tests { let path = temp_dir.path().join("foo"); let pos = write_test_data(&path, &[&data]).await.unwrap(); - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), false); @@ -738,7 +738,7 @@ mod tests { pos }; - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), true); diff --git a/rust/lance-io/src/encodings/dictionary.rs b/rust/lance-io/src/encodings/dictionary.rs index de74783dd5..72b150e023 100644 --- a/rust/lance-io/src/encodings/dictionary.rs +++ b/rust/lance-io/src/encodings/dictionary.rs @@ -246,7 +246,7 @@ mod tests { object_writer.shutdown().await.unwrap(); } - let reader = LocalObjectReader::open_local_path(&path, 2048) + let reader = LocalObjectReader::open_local_path(&path, 2048, None) .await .unwrap(); let decoder = DictionaryDecoder::new( diff --git a/rust/lance-io/src/encodings/plain.rs b/rust/lance-io/src/encodings/plain.rs index 1dd3ee3660..1911cd13c9 100644 --- a/rust/lance-io/src/encodings/plain.rs +++ b/rust/lance-io/src/encodings/plain.rs @@ -581,7 +581,7 @@ mod tests { writer.flush().await.unwrap(); } - let reader = LocalObjectReader::open_local_path(&path, 1024) + let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await .unwrap(); assert!(reader.size().await.unwrap() > 0); @@ -705,7 +705,7 @@ mod tests { writer.flush().await.unwrap(); } - let reader = LocalObjectReader::open_local_path(&path, 2048) + let reader = LocalObjectReader::open_local_path(&path, 2048, None) .await .unwrap(); assert!(reader.size().await.unwrap() > 0); @@ -753,7 +753,7 @@ mod tests { writer.shutdown().await.unwrap(); } - let reader = LocalObjectReader::open_local_path(&path, 2048) + let reader = LocalObjectReader::open_local_path(&path, 2048, None) .await .unwrap(); assert!(reader.size().await.unwrap() > 0); diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs index 268fa37afa..bc39622787 100644 --- a/rust/lance-io/src/local.rs +++ b/rust/lance-io/src/local.rs @@ -20,6 +20,7 @@ use lance_core::{Error, Result}; use object_store::path::Path; use snafu::{location, Location}; use tokio::io::AsyncSeekExt; +use tokio::sync::OnceCell; use tracing::instrument; use crate::traits::{Reader, Writer}; @@ -55,6 +56,10 @@ pub struct LocalObjectReader { /// Fie path. path: Path, + /// Known size of the file. This is either passed in on construction or + /// cached on the first metadata call. + size: OnceCell, + /// Block size, in bytes. block_size: usize, } @@ -63,23 +68,20 @@ impl LocalObjectReader { pub async fn open_local_path( path: impl AsRef, block_size: usize, + known_size: Option, ) -> Result> { let path = path.as_ref().to_owned(); let object_store_path = Path::from_filesystem_path(&path)?; - tokio::task::spawn_blocking(move || { - let local_file = File::open(&path)?; - Ok(Box::new(Self { - file: Arc::new(local_file), - path: object_store_path, - block_size, - }) as Box) - }) - .await? + Self::open(&object_store_path, block_size, known_size).await } /// Open a local object reader, with default prefetch size. #[instrument(level = "debug")] - pub async fn open(path: &Path, block_size: usize) -> Result> { + pub async fn open( + path: &Path, + block_size: usize, + known_size: Option, + ) -> Result> { let path = path.clone(); let local_path = to_local_path(&path); tokio::task::spawn_blocking(move || { @@ -90,9 +92,11 @@ impl LocalObjectReader { }, _ => e.into(), })?; + let size = OnceCell::new_with(known_size); Ok(Box::new(Self { file: Arc::new(file), block_size, + size, path: path.clone(), }) as Box) }) @@ -111,13 +115,26 @@ impl Reader for LocalObjectReader { } /// Returns the file size. - async fn size(&self) -> Result { - Ok(self.file.metadata()?.len() as usize) + async fn size(&self) -> object_store::Result { + let file = self.file.clone(); + self.size + .get_or_try_init(|| async move { + let metadata = tokio::task::spawn_blocking(move || { + file.metadata().map_err(|err| object_store::Error::Generic { + store: "LocalFileSystem", + source: err.into(), + }) + }) + .await??; + Ok(metadata.len() as usize) + }) + .await + .cloned() } /// Reads a range of data. #[instrument(level = "debug", skip(self))] - async fn get_range(&self, range: Range) -> Result { + async fn get_range(&self, range: Range) -> object_store::Result { let file = self.file.clone(); tokio::task::spawn_blocking(move || { let mut buf = BytesMut::with_capacity(range.len()); @@ -132,6 +149,10 @@ impl Reader for LocalObjectReader { Ok(buf.freeze()) }) .await? + .map_err(|err: std::io::Error| object_store::Error::Generic { + store: "LocalFileSystem", + source: err.into(), + }) } } diff --git a/rust/lance-io/src/object_reader.rs b/rust/lance-io/src/object_reader.rs index fc9b495dc9..c856f32460 100644 --- a/rust/lance-io/src/object_reader.rs +++ b/rust/lance-io/src/object_reader.rs @@ -9,6 +9,7 @@ use bytes::Bytes; use futures::future::BoxFuture; use lance_core::Result; use object_store::{path::Path, ObjectStore}; +use tokio::sync::OnceCell; use tracing::instrument; use crate::traits::Reader; @@ -22,16 +23,24 @@ pub struct CloudObjectReader { pub object_store: Arc, // File path pub path: Path, + // File size, if known. + size: OnceCell, block_size: usize, } impl CloudObjectReader { /// Create an ObjectReader from URI - pub fn new(object_store: Arc, path: Path, block_size: usize) -> Result { + pub fn new( + object_store: Arc, + path: Path, + block_size: usize, + known_size: Option, + ) -> Result { Ok(Self { object_store, path, + size: OnceCell::new_with(known_size), block_size, }) } @@ -42,14 +51,14 @@ impl CloudObjectReader { async fn do_with_retry<'a, O>( &self, f: impl Fn() -> BoxFuture<'a, std::result::Result>, - ) -> Result { + ) -> object_store::Result { let mut retries = 3; loop { match f().await { Ok(val) => return Ok(val), Err(err) => { if retries == 0 { - return Err(err.into()); + return Err(err); } retries -= 1; } @@ -69,15 +78,20 @@ impl Reader for CloudObjectReader { } /// Object/File Size. - async fn size(&self) -> Result { - let meta = self - .do_with_retry(|| self.object_store.head(&self.path)) - .await?; - Ok(meta.size) + async fn size(&self) -> object_store::Result { + self.size + .get_or_try_init(|| async move { + let meta = self + .do_with_retry(|| self.object_store.head(&self.path)) + .await?; + Ok(meta.size) + }) + .await + .cloned() } #[instrument(level = "debug", skip(self))] - async fn get_range(&self, range: Range) -> Result { + async fn get_range(&self, range: Range) -> object_store::Result { self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone())) .await } diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index de8b4726da..e44b10f501 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -440,11 +440,29 @@ impl ObjectStore { /// - ``path``: Absolute path to the file. pub async fn open(&self, path: &Path) -> Result> { match self.scheme.as_str() { - "file" => LocalObjectReader::open(path, self.block_size).await, + "file" => LocalObjectReader::open(path, self.block_size, None).await, _ => Ok(Box::new(CloudObjectReader::new( self.inner.clone(), path.clone(), self.block_size, + None, + )?)), + } + } + + /// Open a reader for a file with known size. + /// + /// This size may either have been retrieved from a list operation or + /// cached metadata. By passing in the known size, we can skip a HEAD / metadata + /// call. + pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result> { + match self.scheme.as_str() { + "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await, + _ => Ok(Box::new(CloudObjectReader::new( + self.inner.clone(), + path.clone(), + self.block_size, + Some(known_size), )?)), } } diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index 8a12f8a8d6..ad26f354b9 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -91,7 +91,8 @@ impl IoTask { let bytes = self .reader .get_range(self.to_read.start as usize..self.to_read.end as usize) - .await; + .await + .map_err(Error::from); (self.when_done)(bytes); } } diff --git a/rust/lance-io/src/traits.rs b/rust/lance-io/src/traits.rs index 89ad65b396..66b8b55ccf 100644 --- a/rust/lance-io/src/traits.rs +++ b/rust/lance-io/src/traits.rs @@ -86,10 +86,10 @@ pub trait Reader: std::fmt::Debug + Send + Sync { fn block_size(&self) -> usize; /// Object/File Size. - async fn size(&self) -> Result; + async fn size(&self) -> object_store::Result; /// Read a range of bytes from the object. /// /// TODO: change to read_at()? - async fn get_range(&self, range: Range) -> Result; + async fn get_range(&self, range: Range) -> object_store::Result; } diff --git a/rust/lance-io/src/utils.rs b/rust/lance-io/src/utils.rs index f34a1568ea..4e9309b933 100644 --- a/rust/lance-io/src/utils.rs +++ b/rust/lance-io/src/utils.rs @@ -77,14 +77,16 @@ pub async fn read_fixed_stride_array( } /// Read a protobuf message at file position 'pos'. -// TODO: pub(crate) +/// +/// We write protobuf by first writing the length of the message as a u32, +/// followed by the message itself. pub async fn read_message(reader: &dyn Reader, pos: usize) -> Result { let file_size = reader.size().await?; if pos > file_size { return Err(Error::io("file size is too small".to_string(), location!())); } - let range = pos..min(pos + 4096, file_size); + let range = pos..min(pos + reader.block_size(), file_size); let buf = reader.get_range(range.clone()).await?; let msg_len = LittleEndian::read_u32(&buf) as usize; @@ -113,7 +115,7 @@ pub async fn read_struct< T::try_from(msg) } -pub async fn read_last_block(reader: &dyn Reader) -> Result { +pub async fn read_last_block(reader: &dyn Reader) -> object_store::Result { let file_size = reader.size().await?; let block_size = reader.block_size(); let begin = if file_size < block_size { @@ -225,7 +227,7 @@ mod tests { assert_eq!(pos, 0); object_writer.shutdown().await.unwrap(); - let object_reader = CloudObjectReader::new(Arc::new(store), path, 1024).unwrap(); + let object_reader = CloudObjectReader::new(Arc::new(store), path, 1024, None).unwrap(); let actual: BytesWrapper = read_struct(&object_reader, pos).await.unwrap(); assert_eq!(some_message, actual); } diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index f371e1b3b4..3605bc5fe7 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -22,16 +22,17 @@ //! terms of a lock. The trait [CommitLock] can be implemented as a simpler //! alternative to [CommitHandler]. -use std::fmt::Debug; +use std::io; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::{fmt::Debug, fs::DirEntry}; use futures::{ future::{self, BoxFuture}, stream::BoxStream, StreamExt, TryStreamExt, }; -use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use url::Url; @@ -40,8 +41,7 @@ pub mod dynamodb; pub mod external_manifest; use lance_core::{Error, Result}; -use lance_io::object_store::ObjectStoreExt; -use lance_io::object_store::ObjectStoreParams; +use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams}; #[cfg(feature = "dynamodb")] use { @@ -64,7 +64,7 @@ const MANIFEST_EXTENSION: &str = "manifest"; /// Function that writes the manifest to the object store. pub type ManifestWriter = for<'a> fn( - object_store: &'a dyn ObjectStore, + object_store: &'a dyn OSObjectStore, manifest: &'a mut Manifest, indices: Option>, path: &'a Path, @@ -80,36 +80,61 @@ pub fn latest_manifest_path(base: &Path) -> Path { base.child(LATEST_MANIFEST_NAME) } +#[derive(Debug)] +pub struct ManifestLocation { + /// The version the manifest corresponds to. + pub version: u64, + /// Path of the manifest file, relative to the table root. + pub path: Path, + /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`. + pub size: Option, +} + /// Get the latest manifest path -async fn current_manifest_path(object_store: &dyn ObjectStore, base: &Path) -> Result { - // TODO: list gives us the size, so we could also return the size of the manifest. - // That avoids a HEAD request later. +async fn current_manifest_path( + object_store: &ObjectStore, + base: &Path, +) -> Result { + if object_store.is_local() { + if let Ok(Some(location)) = current_manifest_local(base) { + return Ok(location); + } + } // We use `list_with_delimiter` to avoid listing the contents of child directories. let manifest_files = object_store + .inner .list_with_delimiter(Some(&base.child(VERSIONS_DIR))) .await?; let current = manifest_files .objects .into_iter() - .map(|meta| meta.location) - .filter(|path| { - path.filename().is_some() && path.filename().unwrap().ends_with(MANIFEST_EXTENSION) + .filter(|meta| { + meta.location.filename().is_some() + && meta + .location + .filename() + .unwrap() + .ends_with(MANIFEST_EXTENSION) }) - .filter_map(|path| { - let version = path + .filter_map(|meta| { + let version = meta + .location .filename() .unwrap() .split_once('.') .and_then(|(version_str, _)| version_str.parse::().ok())?; - Some((version, path)) + Some((version, meta)) }) - .max_by_key(|(version, _)| *version) - .map(|(_, path)| path); + .max_by_key(|(version, _)| *version); - if let Some(path) = current { - Ok(path) + if let Some((version, meta)) = current { + Ok(ManifestLocation { + version, + path: meta.location, + size: Some(meta.size as u64), + }) } else { Err(Error::NotFound { uri: manifest_path(base, 1).to_string(), @@ -118,16 +143,63 @@ async fn current_manifest_path(object_store: &dyn ObjectStore, base: &Path) -> R } } +// This is an optimized function that searches for the latest manifest. In +// object_store, list operations lookup metadata for each file listed. This +// method only gets the metadata for the found latest manifest. +fn current_manifest_local(base: &Path) -> std::io::Result> { + let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR)); + let entries = std::fs::read_dir(path)?; + + let mut latest_entry: Option<(u64, DirEntry)> = None; + + for entry in entries { + let entry = entry?; + let filename_raw = entry.file_name(); + let filename = filename_raw.to_string_lossy(); + if !filename.ends_with(MANIFEST_EXTENSION) { + // Need to ignore temporary files, such as + // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d + continue; + } + let Some(version) = filename + .split_once('.') + .and_then(|(version_str, _)| version_str.parse::().ok()) + else { + continue; + }; + + if let Some((latest_version, _)) = &latest_entry { + if version > *latest_version { + latest_entry = Some((version, entry)); + } + } else { + latest_entry = Some((version, entry)); + } + } + + if let Some((version, entry)) = latest_entry { + let path = Path::from_filesystem_path(entry.path()) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; + Ok(Some(ManifestLocation { + version, + path, + size: Some(entry.metadata()?.len()), + })) + } else { + Ok(None) + } +} + async fn list_manifests<'a>( base_path: &Path, - object_store: &'a dyn ObjectStore, + object_store: &'a dyn OSObjectStore, ) -> Result>> { let base_path = base_path.clone(); Ok(object_store .read_dir_all(&base_path.child(VERSIONS_DIR), None) .await? .try_filter_map(|obj_meta| { - if obj_meta.location.extension() == Some("manifest") { + if obj_meta.location.extension() == Some(MANIFEST_EXTENSION) { future::ready(Ok(Some(obj_meta.location))) } else { future::ready(Ok(None)) @@ -139,7 +211,7 @@ async fn list_manifests<'a>( pub fn parse_version_from_path(path: &Path) -> Result { path.filename() .and_then(|name| name.split_once('.')) - .filter(|(_, extension)| *extension == "manifest") + .filter(|(_, extension)| *extension == MANIFEST_EXTENSION) .and_then(|(version, _)| version.parse::().ok()) .ok_or(Error::Internal { message: format!("Expected manifest file, but found {}", path), @@ -158,7 +230,7 @@ fn make_staging_manifest_path(base: &Path) -> Result { async fn write_latest_manifest( from_path: &Path, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, ) -> Result<()> { let latest_path = latest_manifest_path(base_path); let staging_path = make_staging_manifest_path(from_path)?; @@ -183,25 +255,33 @@ const DDB_URL_QUERY_KEY: &str = "ddbTableName"; // TODO: pub(crate) #[async_trait::async_trait] pub trait CommitHandler: Debug + Send + Sync { + async fn resolve_latest_location( + &self, + base_path: &Path, + object_store: &ObjectStore, + ) -> Result { + Ok(current_manifest_path(object_store, base_path).await?) + } + /// Get the path to the latest version manifest of a dataset at the base_path async fn resolve_latest_version( &self, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &ObjectStore, ) -> std::result::Result { // TODO: we need to pade 0's to the version number on the manifest file path - Ok(current_manifest_path(object_store, base_path).await?) + Ok(current_manifest_path(object_store, base_path).await?.path) } // for default implementation, parse the version from the path async fn resolve_latest_version_id( &self, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &ObjectStore, ) -> Result { - let path = self.resolve_latest_version(base_path, object_store).await?; - - parse_version_from_path(&path) + Ok(current_manifest_path(object_store, base_path) + .await? + .version) } /// Get the path to a specific versioned manifest of a dataset at the base_path @@ -209,7 +289,7 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, version: u64, - _object_store: &dyn ObjectStore, + _object_store: &dyn OSObjectStore, ) -> std::result::Result { Ok(manifest_path(base_path, version)) } @@ -218,7 +298,7 @@ pub trait CommitHandler: Debug + Send + Sync { async fn list_manifests<'a>( &self, base_path: &Path, - object_store: &'a dyn ObjectStore, + object_store: &'a dyn OSObjectStore, ) -> Result>> { list_manifests(base_path, object_store).await } @@ -232,7 +312,7 @@ pub trait CommitHandler: Debug + Send + Sync { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError>; } @@ -443,7 +523,7 @@ impl CommitHandler for UnsafeCommitHandler { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError> { // Log a one-time warning @@ -506,7 +586,7 @@ impl CommitHandler for T { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError> { let path = self @@ -552,7 +632,7 @@ impl CommitHandler for Arc { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError> { self.as_ref() @@ -573,7 +653,7 @@ impl CommitHandler for RenameCommitHandler { manifest: &mut Manifest, indices: Option>, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, manifest_writer: ManifestWriter, ) -> std::result::Result<(), CommitError> { // Create a temporary object, then use `rename_if_not_exists` to commit. diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 4fa523c2b0..a7620da7b7 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -9,22 +9,22 @@ use std::sync::Arc; use async_trait::async_trait; use lance_core::{Error, Result}; -use lance_io::object_store::ObjectStoreExt; +use lance_io::object_store::{ObjectStore, ObjectStoreExt}; use log::warn; -use object_store::{path::Path, ObjectStore}; +use object_store::{path::Path, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use super::{ current_manifest_path, make_staging_manifest_path, manifest_path, write_latest_manifest, - MANIFEST_EXTENSION, + ManifestLocation, MANIFEST_EXTENSION, }; use crate::format::{Index, Manifest}; -use crate::io::commit::{parse_version_from_path, CommitError, CommitHandler, ManifestWriter}; +use crate::io::commit::{CommitError, CommitHandler, ManifestWriter}; /// External manifest store /// /// This trait abstracts an external storage for source of truth for manifests. -/// The storge is expected to remember (uri, version) -> manifest_path +/// The storage is expected to remember (uri, version) -> manifest_path /// and able to run transactions on the manifest_path. /// /// This trait is called an **External** manifest store because the store is @@ -43,6 +43,24 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { /// the version and the store should not customize it. async fn get_latest_version(&self, base_uri: &str) -> Result>; + /// Get the latest manifest location for a given base_uri. + /// + /// By default, this calls get_latest_version. Impls should + /// override this method if they store both the location and size + /// of the latest manifest. + async fn get_latest_manifest_location( + &self, + base_uri: &str, + ) -> Result> { + self.get_latest_version(base_uri).await.map(|res| { + res.map(|(version, uri)| ManifestLocation { + version, + path: Path::from(uri), + size: None, + }) + }) + } + /// Put the manifest path for a given base_uri and version, should fail if the version already exists async fn put_if_not_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>; @@ -60,11 +78,26 @@ pub struct ExternalManifestCommitHandler { #[async_trait] impl CommitHandler for ExternalManifestCommitHandler { + async fn resolve_latest_location( + &self, + base_path: &Path, + object_store: &ObjectStore, + ) -> std::result::Result { + let path = self.resolve_latest_version(base_path, object_store).await?; + Ok(ManifestLocation { + version: self + .resolve_latest_version_id(base_path, object_store) + .await?, + path, + size: None, + }) + } + /// Get the latest version of a dataset at the path async fn resolve_latest_version( &self, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &ObjectStore, ) -> std::result::Result { let version = self .external_manifest_store @@ -87,11 +120,13 @@ impl CommitHandler for ExternalManifestCommitHandler { // TODO: remove copy-rename once we upgrade object_store crate object_store.copy(&manifest_path, &staging).await?; object_store + .inner .rename(&staging, &object_store_manifest_path) .await?; // step 2: write _latest.manifest - write_latest_manifest(&manifest_path, base_path, object_store).await?; + write_latest_manifest(&manifest_path, base_path, object_store.inner.as_ref()) + .await?; // step 3: update external store to finalize path self.external_manifest_store @@ -106,14 +141,14 @@ impl CommitHandler for ExternalManifestCommitHandler { } // Dataset not found in the external store, this could be because the dataset did not // use external store for commit before. In this case, we search for the latest manifest - None => current_manifest_path(object_store, base_path).await, + None => Ok(current_manifest_path(object_store, base_path).await?.path), } } async fn resolve_latest_version_id( &self, base_path: &Path, - object_store: &dyn ObjectStore, + object_store: &ObjectStore, ) -> std::result::Result { let version = self .external_manifest_store @@ -122,7 +157,9 @@ impl CommitHandler for ExternalManifestCommitHandler { match version { Some((version, _)) => Ok(version), - None => parse_version_from_path(¤t_manifest_path(object_store, base_path).await?), + None => Ok(current_manifest_path(object_store, base_path) + .await? + .version), } } @@ -130,7 +167,7 @@ impl CommitHandler for ExternalManifestCommitHandler { &self, base_path: &Path, version: u64, - object_store: &dyn ObjectStore, + object_store: &dyn OSObjectStore, ) -> std::result::Result { let path_res = self .external_manifest_store diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index d293957c82..01ee8fead8 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -5,6 +5,7 @@ //! use arrow_array::{RecordBatch, RecordBatchReader}; +use byteorder::{ByteOrder, LittleEndian}; use chrono::{prelude::*, Duration}; use futures::future::BoxFuture; use futures::stream::{self, StreamExt, TryStreamExt}; @@ -15,9 +16,11 @@ use lance_file::datatypes::populate_schema_dictionary; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; use lance_io::object_writer::ObjectWriter; use lance_io::traits::WriteExt; -use lance_io::utils::{read_metadata_offset, read_struct}; +use lance_io::utils::{read_last_block, read_metadata_offset, read_struct}; use lance_table::format::{Fragment, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION}; -use lance_table::io::commit::{commit_handler_from_url, CommitError, CommitHandler, CommitLock}; +use lance_table::io::commit::{ + commit_handler_from_url, CommitError, CommitHandler, CommitLock, ManifestLocation, +}; use lance_table::io::manifest::{read_manifest, write_manifest}; use log::warn; use object_store::path::Path; @@ -234,72 +237,33 @@ impl Dataset { #[deprecated(since = "0.8.17", note = "Please use `DatasetBuilder` instead.")] #[instrument(skip(params))] pub async fn open_with_params(uri: &str, params: &ReadParams) -> Result { - let (object_store, base_path, commit_handler) = - Self::params_from_uri(uri, ¶ms.commit_handler, ¶ms.store_options).await?; - - let latest_manifest = commit_handler - .resolve_latest_version(&base_path, &object_store.inner) + DatasetBuilder::from_uri(uri) + .with_read_params(params.clone()) + .load() .await - .map_err(|e| Error::DatasetNotFound { - path: base_path.to_string(), - source: Box::new(e), - location: location!(), - })?; - - let session = if let Some(session) = params.session.as_ref() { - session.clone() - } else { - Arc::new(Session::new( - params.index_cache_size, - params.metadata_cache_size, - )) - }; - - Self::checkout_manifest( - Arc::new(object_store), - base_path.clone(), - &latest_manifest, - session, - commit_handler, - ) - .await } /// Check out a version of the dataset. + #[deprecated(note = "Please use `DatasetBuilder` instead.")] pub async fn checkout(uri: &str, version: u64) -> Result { - let params = ReadParams::default(); - Self::checkout_with_params(uri, version, ¶ms).await + DatasetBuilder::from_uri(uri) + .with_version(version) + .load() + .await } /// Check out a version of the dataset with read params. + #[deprecated(note = "Please use `DatasetBuilder` instead.")] pub async fn checkout_with_params( uri: &str, version: u64, params: &ReadParams, ) -> Result { - let (object_store, base_path, commit_handler) = - Self::params_from_uri(uri, ¶ms.commit_handler, ¶ms.store_options).await?; - - let manifest_file = commit_handler - .resolve_version(&base_path, version, &object_store.inner) - .await?; - - let session = if let Some(session) = params.session.as_ref() { - session.clone() - } else { - Arc::new(Session::new( - params.index_cache_size, - params.metadata_cache_size, - )) - }; - Self::checkout_manifest( - Arc::new(object_store), - base_path, - &manifest_file, - session, - commit_handler, - ) - .await + DatasetBuilder::from_uri(uri) + .with_version(version) + .with_read_params(params.clone()) + .load() + .await } /// Check out the specified version of this dataset @@ -309,10 +273,15 @@ impl Dataset { .commit_handler .resolve_version(&base_path, version, &self.object_store.inner) .await?; + let manifest_location = ManifestLocation { + version, + path: manifest_file, + size: None, + }; Self::checkout_manifest( self.object_store.clone(), base_path, - &manifest_file, + &manifest_location, self.session.clone(), self.commit_handler.clone(), ) @@ -322,37 +291,51 @@ impl Dataset { async fn checkout_manifest( object_store: Arc, base_path: Path, - manifest_path: &Path, + manifest_location: &ManifestLocation, session: Arc, commit_handler: Arc, ) -> Result { - let object_reader = object_store - .open(manifest_path) - .await - .map_err(|e| match &e { - Error::NotFound { uri, .. } => Error::DatasetNotFound { - path: uri.clone(), - source: box_error(e), - location: location!(), - }, - _ => e, - })?; - // TODO: remove reference to inner. - let get_result = object_store - .inner - .get(manifest_path) - .await - .map_err(|e| match e { - object_store::Error::NotFound { path: _, source } => Error::DatasetNotFound { - path: base_path.to_string(), - source, - location: location!(), - }, - _ => e.into(), - })?; - let bytes = get_result.bytes().await?; - let offset = read_metadata_offset(&bytes)?; - let mut manifest: Manifest = read_struct(object_reader.as_ref(), offset).await?; + let object_reader = if let Some(size) = manifest_location.size { + object_store + .open_with_size(&manifest_location.path, size as usize) + .await + } else { + object_store.open(&manifest_location.path).await + }; + let object_reader = object_reader.map_err(|e| match &e { + Error::NotFound { uri, .. } => Error::DatasetNotFound { + path: uri.clone(), + source: box_error(e), + location: location!(), + }, + _ => e, + })?; + + let last_block = + read_last_block(object_reader.as_ref()) + .await + .map_err(|err| match err { + object_store::Error::NotFound { path, source } => Error::DatasetNotFound { + path, + source, + location: location!(), + }, + _ => Error::IO { + source: err.into(), + location: location!(), + }, + })?; + let offset = read_metadata_offset(&last_block)?; + + // If manifest is in the last block, we can decode directly from memory. + let manifest_size = object_reader.size().await?; + let mut manifest = if manifest_size - offset <= last_block.len() { + let message_len = LittleEndian::read_u32(&last_block[offset..offset + 4]) as usize; + let message_data = &last_block[offset + 4..offset + 4 + message_len]; + Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)? + } else { + read_struct(object_reader.as_ref(), offset).await? + }; if !can_read_dataset(manifest.reader_feature_flags) { let message = format!( @@ -388,7 +371,7 @@ impl Dataset { // Read expected manifest path for the dataset let dataset_exists = match commit_handler - .resolve_latest_version(&base, &object_store.inner) + .resolve_latest_version(&base, &object_store) .await { Ok(_) => true, @@ -621,7 +604,7 @@ impl Dataset { &self.object_store, &self .commit_handler - .resolve_latest_version(&self.base, &self.object_store.inner) + .resolve_latest_version(&self.base, &self.object_store) .await?, ) .await @@ -749,7 +732,7 @@ impl Dataset { // Test if the dataset exists let dataset_exists = match commit_handler - .resolve_latest_version(&base, &object_store.inner) + .resolve_latest_version(&base, &object_store) .await { Ok(_) => true, @@ -1097,7 +1080,7 @@ impl Dataset { /// we don't return the full version struct. pub async fn latest_version_id(&self) -> Result { self.commit_handler - .resolve_latest_version_id(&self.base, &self.object_store.inner) + .resolve_latest_version_id(&self.base, &self.object_store) .await } @@ -1302,11 +1285,13 @@ mod tests { use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_table::format::WriterVersion; + use lance_table::io::commit::RenameCommitHandler; use lance_table::io::deletion::read_deletion_file; use lance_testing::datagen::generate_random_array; use pretty_assertions::assert_eq; use rstest::rstest; use tempfile::{tempdir, TempDir}; + use url::Url; // Used to validate that futures returned are Send. fn require_send(t: T) -> T { @@ -1524,6 +1509,55 @@ mod tests { assert_eq!(result.manifest.max_fragment_id(), None); } + #[tokio::test] + async fn test_load_manifest_iops() { + // Need to use in-memory for accurate IOPS tracking. + use crate::utils::test::IoTrackingStore; + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..10_i32))], + ) + .unwrap(); + let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let dataset = Dataset::write(batches, "memory://test", None) + .await + .unwrap(); + + // Then open with wrapping store. + let memory_store = dataset.object_store.inner.clone(); + let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper(); + let _dataset = DatasetBuilder::from_uri("memory://test") + .with_read_params(ReadParams { + store_options: Some(ObjectStoreParams { + object_store_wrapper: Some(io_stats_wrapper), + ..Default::default() + }), + ..Default::default() + }) + .with_object_store( + memory_store, + Url::parse("memory://test").unwrap(), + Arc::new(RenameCommitHandler), + ) + .load() + .await + .unwrap(); + + let get_iops = || io_stats.lock().unwrap().read_iops; + + // There should be only two IOPS: + // 1. List _versions directory to get the latest manifest location + // 2. Read the manifest file. (The manifest is small enough to be read in one go. + // Larger manifests would result in more IOPS.) + assert_eq!(get_iops(), 2); + } + #[rstest] #[tokio::test] async fn test_write_params(#[values(false, true)] use_experimental_writer: bool) { @@ -1606,7 +1640,7 @@ mod tests { dataset.object_store(), &dataset .commit_handler - .resolve_latest_version(&dataset.base, &dataset.object_store().inner) + .resolve_latest_version(&dataset.base, dataset.object_store()) .await .unwrap(), ) @@ -1624,7 +1658,7 @@ mod tests { dataset.object_store(), &dataset .commit_handler - .resolve_latest_version(&dataset.base, &dataset.object_store().inner) + .resolve_latest_version(&dataset.base, dataset.object_store()) .await .unwrap(), ) @@ -2041,7 +2075,11 @@ mod tests { assert_eq!(actual_ds.version().version, 2); // But we can still check out the first version - let first_ver = Dataset::checkout(test_uri, 1).await.unwrap(); + let first_ver = DatasetBuilder::from_uri(test_uri) + .with_version(1) + .load() + .await + .unwrap(); assert_eq!(first_ver.version().version, 1); assert_eq!(&ArrowSchema::from(first_ver.schema()), schema.as_ref()); } diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index d32ab2132a..a3a7f240ea 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; -use lance_table::io::commit::{commit_handler_from_url, CommitHandler}; +use lance_table::io::commit::{commit_handler_from_url, CommitHandler, ManifestLocation}; use object_store::{aws::AwsCredentialProvider, DynObjectStore}; use snafu::{location, Location}; use tracing::instrument; @@ -213,15 +213,20 @@ impl DatasetBuilder { let version = self.version; let (object_store, commit_handler) = self.build_object_store().await?; - let base_path = object_store.base_path(); + let base_path = object_store.base_path().clone(); let manifest = match version { Some(version) => { - commit_handler - .resolve_version(base_path, version, &object_store.inner) - .await? + let path = commit_handler + .resolve_version(&base_path, version, &object_store.inner) + .await?; + ManifestLocation { + version, + path, + size: None, + } } None => commit_handler - .resolve_latest_version(base_path, &object_store.inner) + .resolve_latest_location(&base_path, &object_store) .await .map_err(|e| Error::DatasetNotFound { path: base_path.to_string(), @@ -231,7 +236,7 @@ impl DatasetBuilder { }; Dataset::checkout_manifest( - Arc::new(object_store.clone()), + Arc::new(object_store), base_path.clone(), &manifest, session, diff --git a/rust/lance/src/index/vector/fixture_test.rs b/rust/lance/src/index/vector/fixture_test.rs index f047b7eb6b..c8c49dc938 100644 --- a/rust/lance/src/index/vector/fixture_test.rs +++ b/rust/lance/src/index/vector/fixture_test.rs @@ -132,7 +132,7 @@ mod test { let make_idx = move |assert_query: Vec, metric: MetricType| async move { let f = tempfile::NamedTempFile::new().unwrap(); - let reader = LocalObjectReader::open_local_path(f.path(), 64) + let reader = LocalObjectReader::open_local_path(f.path(), 64, None) .await .unwrap(); diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 07362876e5..4de5f71bbb 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -353,6 +353,7 @@ impl ObjectStore for IoTrackingStore { } async fn head(&self, location: &Path) -> OSResult { + self.record_read(0); self.target.head(location).await } @@ -368,10 +369,12 @@ impl ObjectStore for IoTrackingStore { } fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, OSResult> { + self.record_read(0); self.target.list(prefix) } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { + self.record_read(0); self.target.list_with_delimiter(prefix).await }