Skip to content

Commit

Permalink
Support filter out strip by provided range
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyyue committed Sep 29, 2024
1 parent 9e5cfbf commit b17dc02
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 13 deletions.
17 changes: 9 additions & 8 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct ArrowReaderBuilder<R> {
pub(crate) batch_size: usize,
pub(crate) projection: ProjectionMask,
pub(crate) schema_ref: Option<SchemaRef>,
pub(crate) range: Option<Range<usize>>,
pub(crate) file_byte_range: Option<Range<usize>>,
}

impl<R> ArrowReaderBuilder<R> {
Expand All @@ -50,7 +50,7 @@ impl<R> ArrowReaderBuilder<R> {
batch_size: DEFAULT_BATCH_SIZE,
projection: ProjectionMask::all(),
schema_ref: None,
range: None,
file_byte_range: None,
}
}

Expand All @@ -73,8 +73,9 @@ impl<R> ArrowReaderBuilder<R> {
self
}

pub fn with_range(mut self, range: Range<usize>) -> Self {
self.range = Some(range);
/// Specifies a range of file bytes that will read the strips offset within this range
pub fn with_file_byte_range(mut self, range: Range<usize>) -> Self {
self.file_byte_range = Some(range);
self
}

Expand Down Expand Up @@ -116,7 +117,7 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
file_metadata: self.file_metadata,
projected_data_type,
stripe_index: 0,
range: self.range,
file_byte_range: self.file_byte_range,
};
ArrowReader {
cursor,
Expand Down Expand Up @@ -185,18 +186,18 @@ pub(crate) struct Cursor<R> {
pub file_metadata: Arc<FileMetadata>,
pub projected_data_type: RootDataType,
pub stripe_index: usize,
pub range: Option<Range<usize>>,
pub file_byte_range: Option<Range<usize>>,
}

impl<R: ChunkReader> Cursor<R> {
fn get_stripe_metadatas(&self) -> Vec<StripeMetadata> {
if let Some(range) = self.range.clone() {
if let Some(range) = self.file_byte_range.clone() {
self.file_metadata
.stripe_metadatas()
.iter()
.filter(|info| {
let offset = info.offset() as usize;
!(offset < range.start || offset >= range.end)
range.contains(&offset)
})
.map(|info| info.to_owned())
.collect::<Vec<_>>()
Expand Down
6 changes: 3 additions & 3 deletions src/async_arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ impl<R: AsyncChunkReader + 'static> StripeFactory<R> {
.cloned();

if let Some(info) = info {
if let Some(range) = self.inner.range.clone() {
if let Some(range) = self.inner.file_byte_range.clone() {
let offset = info.offset() as usize;
if offset < range.start || offset >= range.end {
if !range.contains(&offset) {
self.inner.stripe_index += 1;
return Ok((self, None));
}
Expand Down Expand Up @@ -221,7 +221,7 @@ impl<R: AsyncChunkReader + 'static> ArrowReaderBuilder<R> {
file_metadata: self.file_metadata,
projected_data_type,
stripe_index: 0,
range: self.range,
file_byte_range: self.file_byte_range,
};
ArrowStreamReader::new(cursor, self.batch_size, schema_ref)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/basic/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn new_arrow_stream_reader_range(
ArrowReaderBuilder::try_new_async(f)
.await
.unwrap()
.with_range(range)
.with_file_byte_range(range)
.build_async()
}

Expand All @@ -71,7 +71,7 @@ fn new_arrow_reader_range(path: &str, range: Range<usize>) -> ArrowReader<File>
let f = File::open(path).expect("no file found");
ArrowReaderBuilder::try_new(f)
.unwrap()
.with_range(range)
.with_file_byte_range(range)
.build()
}

Expand Down

0 comments on commit b17dc02

Please sign in to comment.