Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize IO path for reading manifest #2396

Merged
merged 8 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/lance-file/src/page_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ mod tests {
.unwrap();
writer.shutdown().await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let actual = PageTable::load(
Expand Down Expand Up @@ -284,7 +284,7 @@ mod tests {
let mut writer = tokio::fs::File::create(&path).await.unwrap();
let res = page_table.write(&mut writer, 0).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();

Expand Down
14 changes: 7 additions & 7 deletions rust/lance-io/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
/// use lance_io::{local::LocalObjectReader, encodings::binary::BinaryDecoder, traits::Reader};
///
/// async {
/// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048).await.unwrap();
/// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048, None).await.unwrap();
/// let string_decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), 100, 1024, true);
/// };
/// ```
Expand Down Expand Up @@ -494,7 +494,7 @@ mod tests {

let pos = write_test_data(&path, arrs).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let read_len = arrs.iter().map(|a| a.len()).sum();
Expand Down Expand Up @@ -562,7 +562,7 @@ mod tests {
let pos = encoder.encode(&[&data]).await.unwrap();
object_writer.shutdown().await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand Down Expand Up @@ -605,7 +605,7 @@ mod tests {
let path = temp_dir.path().join("foo");

let pos = write_test_data(&path, &[&data]).await.unwrap();
let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand All @@ -627,7 +627,7 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("foo");
let pos = write_test_data(&path, &[&data]).await.unwrap();
let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand Down Expand Up @@ -658,7 +658,7 @@ mod tests {
let path = temp_dir.path().join("foo");
let pos = write_test_data(&path, &[&data]).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand Down Expand Up @@ -738,7 +738,7 @@ mod tests {
pos
};

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/encodings/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
object_writer.shutdown().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048)
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
.await
.unwrap();
let decoder = DictionaryDecoder::new(
Expand Down
6 changes: 3 additions & 3 deletions rust/lance-io/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ mod tests {
writer.flush().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
assert!(reader.size().await.unwrap() > 0);
Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {
writer.flush().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048)
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
.await
.unwrap();
assert!(reader.size().await.unwrap() > 0);
Expand Down Expand Up @@ -753,7 +753,7 @@ mod tests {
writer.shutdown().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048)
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
.await
.unwrap();
assert!(reader.size().await.unwrap() > 0);
Expand Down
47 changes: 34 additions & 13 deletions rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use lance_core::{Error, Result};
use object_store::path::Path;
use snafu::{location, Location};
use tokio::io::AsyncSeekExt;
use tokio::sync::OnceCell;
use tracing::instrument;

use crate::traits::{Reader, Writer};
Expand Down Expand Up @@ -55,6 +56,10 @@ pub struct LocalObjectReader {
/// Fie path.
path: Path,

/// Known size of the file. This is either passed in on construction or
/// cached on the first metadata call.
size: OnceCell<usize>,

/// Block size, in bytes.
block_size: usize,
}
Expand All @@ -63,23 +68,20 @@ impl LocalObjectReader {
pub async fn open_local_path(
path: impl AsRef<std::path::Path>,
block_size: usize,
known_size: Option<usize>,
) -> Result<Box<dyn Reader>> {
let path = path.as_ref().to_owned();
let object_store_path = Path::from_filesystem_path(&path)?;
tokio::task::spawn_blocking(move || {
let local_file = File::open(&path)?;
Ok(Box::new(Self {
file: Arc::new(local_file),
path: object_store_path,
block_size,
}) as Box<dyn Reader>)
})
.await?
Self::open(&object_store_path, block_size, known_size).await
}

/// Open a local object reader, with default prefetch size.
#[instrument(level = "debug")]
pub async fn open(path: &Path, block_size: usize) -> Result<Box<dyn Reader>> {
pub async fn open(
path: &Path,
block_size: usize,
known_size: Option<usize>,
) -> Result<Box<dyn Reader>> {
let path = path.clone();
let local_path = to_local_path(&path);
tokio::task::spawn_blocking(move || {
Expand All @@ -90,9 +92,11 @@ impl LocalObjectReader {
},
_ => e.into(),
})?;
let size = OnceCell::new_with(known_size);
Ok(Box::new(Self {
file: Arc::new(file),
block_size,
size,
path: path.clone(),
}) as Box<dyn Reader>)
})
Expand All @@ -111,13 +115,26 @@ impl Reader for LocalObjectReader {
}

/// Returns the file size.
async fn size(&self) -> Result<usize> {
Ok(self.file.metadata()?.len() as usize)
async fn size(&self) -> object_store::Result<usize> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you! for object_store::Result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still hate the error handling overall. Will refactor it later.

let file = self.file.clone();
self.size
.get_or_try_init(|| async move {
let metadata = tokio::task::spawn_blocking(move || {
file.metadata().map_err(|err| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
})
.await??;
Ok(metadata.len() as usize)
})
.await
.cloned()
}

/// Reads a range of data.
#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
let file = self.file.clone();
tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(range.len());
Expand All @@ -132,6 +149,10 @@ impl Reader for LocalObjectReader {
Ok(buf.freeze())
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
}
}

Expand Down
32 changes: 23 additions & 9 deletions rust/lance-io/src/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use bytes::Bytes;
use futures::future::BoxFuture;
use lance_core::Result;
use object_store::{path::Path, ObjectStore};
use tokio::sync::OnceCell;
use tracing::instrument;

use crate::traits::Reader;
Expand All @@ -22,16 +23,24 @@ pub struct CloudObjectReader {
pub object_store: Arc<dyn ObjectStore>,
// File path
pub path: Path,
// File size, if known.
size: OnceCell<usize>,

block_size: usize,
}

impl CloudObjectReader {
/// Create an ObjectReader from URI
pub fn new(object_store: Arc<dyn ObjectStore>, path: Path, block_size: usize) -> Result<Self> {
pub fn new(
object_store: Arc<dyn ObjectStore>,
path: Path,
block_size: usize,
known_size: Option<usize>,
) -> Result<Self> {
Ok(Self {
object_store,
path,
size: OnceCell::new_with(known_size),
block_size,
})
}
Expand All @@ -42,14 +51,14 @@ impl CloudObjectReader {
async fn do_with_retry<'a, O>(
&self,
f: impl Fn() -> BoxFuture<'a, std::result::Result<O, object_store::Error>>,
) -> Result<O> {
) -> object_store::Result<O> {
let mut retries = 3;
loop {
match f().await {
Ok(val) => return Ok(val),
Err(err) => {
if retries == 0 {
return Err(err.into());
return Err(err);
}
retries -= 1;
}
Expand All @@ -69,15 +78,20 @@ impl Reader for CloudObjectReader {
}

/// Object/File Size.
async fn size(&self) -> Result<usize> {
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
async fn size(&self) -> object_store::Result<usize> {
self.size
.get_or_try_init(|| async move {
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
})
.await
.cloned()
}

#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone()))
.await
}
Expand Down
20 changes: 19 additions & 1 deletion rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,29 @@ impl ObjectStore {
/// - ``path``: Absolute path to the file.
pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
match self.scheme.as_str() {
"file" => LocalObjectReader::open(path, self.block_size).await,
"file" => LocalObjectReader::open(path, self.block_size, None).await,
_ => Ok(Box::new(CloudObjectReader::new(
self.inner.clone(),
path.clone(),
self.block_size,
None,
)?)),
}
}

/// Open a reader for a file with known size.
///
/// This size may either have been retrieved from a list operation or
/// cached metadata. By passing in the known size, we can skip a HEAD / metadata
/// call.
pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
match self.scheme.as_str() {
"file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
_ => Ok(Box::new(CloudObjectReader::new(
self.inner.clone(),
path.clone(),
self.block_size,
Some(known_size),
)?)),
}
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl IoTask {
let bytes = self
.reader
.get_range(self.to_read.start as usize..self.to_read.end as usize)
.await;
.await
.map_err(Error::from);
(self.when_done)(bytes);
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-io/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ pub trait Reader: std::fmt::Debug + Send + Sync {
fn block_size(&self) -> usize;

/// Object/File Size.
async fn size(&self) -> Result<usize>;
async fn size(&self) -> object_store::Result<usize>;

/// Read a range of bytes from the object.
///
/// TODO: change to read_at()?
async fn get_range(&self, range: Range<usize>) -> Result<Bytes>;
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes>;
}
10 changes: 6 additions & 4 deletions rust/lance-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ pub async fn read_fixed_stride_array(
}

/// Read a protobuf message at file position 'pos'.
// TODO: pub(crate)
///
/// We write protobuf by first writing the length of the message as a u32,
/// followed by the message itself.
pub async fn read_message<M: Message + Default>(reader: &dyn Reader, pos: usize) -> Result<M> {
let file_size = reader.size().await?;
if pos > file_size {
return Err(Error::io("file size is too small".to_string(), location!()));
}

let range = pos..min(pos + 4096, file_size);
let range = pos..min(pos + reader.block_size(), file_size);
let buf = reader.get_range(range.clone()).await?;
let msg_len = LittleEndian::read_u32(&buf) as usize;

Expand Down Expand Up @@ -113,7 +115,7 @@ pub async fn read_struct<
T::try_from(msg)
}

pub async fn read_last_block(reader: &dyn Reader) -> Result<Bytes> {
pub async fn read_last_block(reader: &dyn Reader) -> object_store::Result<Bytes> {
let file_size = reader.size().await?;
let block_size = reader.block_size();
let begin = if file_size < block_size {
Expand Down Expand Up @@ -225,7 +227,7 @@ mod tests {
assert_eq!(pos, 0);
object_writer.shutdown().await.unwrap();

let object_reader = CloudObjectReader::new(Arc::new(store), path, 1024).unwrap();
let object_reader = CloudObjectReader::new(Arc::new(store), path, 1024, None).unwrap();
let actual: BytesWrapper = read_struct(&object_reader, pos).await.unwrap();
assert_eq!(some_message, actual);
}
Expand Down
Loading
Loading