From b17dc02fb7574ed610ed516e4bfbb17539da76e3 Mon Sep 17 00:00:00 2001 From: harveyyue Date: Sun, 29 Sep 2024 13:32:14 +0800 Subject: [PATCH] Support filter out strip by provided range --- src/arrow_reader.rs | 17 +++++++++-------- src/async_arrow_reader.rs | 6 +++--- tests/basic/main.rs | 4 ++-- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index db6827a5..fd0eb53d 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -39,7 +39,7 @@ pub struct ArrowReaderBuilder { pub(crate) batch_size: usize, pub(crate) projection: ProjectionMask, pub(crate) schema_ref: Option, - pub(crate) range: Option>, + pub(crate) file_byte_range: Option>, } impl ArrowReaderBuilder { @@ -50,7 +50,7 @@ impl ArrowReaderBuilder { batch_size: DEFAULT_BATCH_SIZE, projection: ProjectionMask::all(), schema_ref: None, - range: None, + file_byte_range: None, } } @@ -73,8 +73,9 @@ impl ArrowReaderBuilder { self } - pub fn with_range(mut self, range: Range) -> Self { - self.range = Some(range); + /// Specifies a range of file bytes that will read the strips offset within this range + pub fn with_file_byte_range(mut self, range: Range) -> Self { + self.file_byte_range = Some(range); self } @@ -116,7 +117,7 @@ impl ArrowReaderBuilder { file_metadata: self.file_metadata, projected_data_type, stripe_index: 0, - range: self.range, + file_byte_range: self.file_byte_range, }; ArrowReader { cursor, @@ -185,18 +186,18 @@ pub(crate) struct Cursor { pub file_metadata: Arc, pub projected_data_type: RootDataType, pub stripe_index: usize, - pub range: Option>, + pub file_byte_range: Option>, } impl Cursor { fn get_stripe_metadatas(&self) -> Vec { - if let Some(range) = self.range.clone() { + if let Some(range) = self.file_byte_range.clone() { self.file_metadata .stripe_metadatas() .iter() .filter(|info| { let offset = info.offset() as usize; - !(offset < range.start || offset >= range.end) + range.contains(&offset) }) .map(|info| info.to_owned()) .collect::>() diff --git a/src/async_arrow_reader.rs b/src/async_arrow_reader.rs index eda2e35e..94a0565d 100644 --- a/src/async_arrow_reader.rs +++ b/src/async_arrow_reader.rs @@ -104,9 +104,9 @@ impl StripeFactory { .cloned(); if let Some(info) = info { - if let Some(range) = self.inner.range.clone() { + if let Some(range) = self.inner.file_byte_range.clone() { let offset = info.offset() as usize; - if offset < range.start || offset >= range.end { + if !range.contains(&offset) { self.inner.stripe_index += 1; return Ok((self, None)); } @@ -221,7 +221,7 @@ impl ArrowReaderBuilder { file_metadata: self.file_metadata, projected_data_type, stripe_index: 0, - range: self.range, + file_byte_range: self.file_byte_range, }; ArrowStreamReader::new(cursor, self.batch_size, schema_ref) } diff --git a/tests/basic/main.rs b/tests/basic/main.rs index df315ba6..84d62d82 100644 --- a/tests/basic/main.rs +++ b/tests/basic/main.rs @@ -58,7 +58,7 @@ async fn new_arrow_stream_reader_range( ArrowReaderBuilder::try_new_async(f) .await .unwrap() - .with_range(range) + .with_file_byte_range(range) .build_async() } @@ -71,7 +71,7 @@ fn new_arrow_reader_range(path: &str, range: Range) -> ArrowReader let f = File::open(path).expect("no file found"); ArrowReaderBuilder::try_new(f) .unwrap() - .with_range(range) + .with_file_byte_range(range) .build() }