Skip to content

Commit

Permalink
feat: implement manifest filtering in TableScan
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Apr 25, 2024
1 parent aba6209 commit 2bbf352
Showing 1 changed file with 79 additions and 3 deletions.
82 changes: 79 additions & 3 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
//! Table scan api.
use crate::arrow::ArrowReaderBuilder;
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
use crate::expr::{Bind, Predicate};
use crate::io::FileIO;
use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef};
use crate::spec::{
DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind};
use arrow_array::RecordBatch;
use async_stream::try_stream;
use futures::stream::{iter, BoxStream};
use futures::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;

/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
Expand All @@ -34,6 +40,8 @@ pub struct TableScanBuilder<'a> {
column_names: Vec<String>,
snapshot_id: Option<i64>,
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Predicate>,
}

impl<'a> TableScanBuilder<'a> {
Expand All @@ -43,6 +51,8 @@ impl<'a> TableScanBuilder<'a> {
column_names: vec![],
snapshot_id: None,
batch_size: None,
case_sensitive: true,
filter: None,
}
}

Expand All @@ -53,6 +63,20 @@ impl<'a> TableScanBuilder<'a> {
self
}

/// Sets the scan's case sensitivity
pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
self.case_sensitive = case_sensitive;
self
}

/// Specifies a predicate to use as a filter
pub fn with_filter(mut self, predicate: Predicate) -> Self {
// calls rewrite_not to remove Not nodes, which must be absent
// when applying the manifest evaluator
self.filter = Some(predicate.rewrite_not());
self
}

/// Select all columns.
pub fn select_all(mut self) -> Self {
self.column_names.clear();
Expand Down Expand Up @@ -125,6 +149,8 @@ impl<'a> TableScanBuilder<'a> {
column_names: self.column_names,
schema,
batch_size: self.batch_size,
case_sensitive: self.case_sensitive,
filter: self.filter.map(Arc::new),
})
}
}
Expand All @@ -139,17 +165,29 @@ pub struct TableScan {
column_names: Vec<String>,
schema: SchemaRef,
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Arc<Predicate>>,
}

/// A stream of [`FileScanTask`].
pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;

impl TableScan {
/// Returns a stream of file scan tasks.
pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
// Cache `ManifestEvaluatorFactory`s created as part of this scan
let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = HashMap::new();

// these variables needed to ensure that we don't need to pass a
// reference to self into `try_stream`, as it expects references
// passed in to outlive 'static
let schema = self.schema.clone();
let snapshot = self.snapshot.clone();
let table_metadata = self.table_metadata.clone();
let file_io = self.file_io.clone();
let case_sensitive = self.case_sensitive;
let filter = self.filter.clone();

Ok(try_stream! {
let manifest_list = snapshot
Expand All @@ -158,8 +196,24 @@ impl TableScan {
.await?;

// Generate data file stream
let mut entries = iter(manifest_list.entries());
while let Some(entry) = entries.next().await {
for entry in manifest_list.entries() {
// If this scan has a filter, check the partition evaluator cache for an existing
// PartitionEvaluator that matches this manifest's partition spec ID.
// Use one from the cache if there is one. If not, create one, put it in
// the cache, and take a reference to it.
#[allow(clippy::map_entry)]
if let Some(filter) = filter.as_ref() {
if !manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
manifest_evaluator_cache.insert(entry.partition_spec_id, Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), table_metadata.clone(), case_sensitive, filter)?);
}
let manifest_evaluator = &manifest_evaluator_cache[&entry.partition_spec_id];

// reject any manifest files whose partition values don't match the filter.
if !manifest_evaluator.eval(entry)? {
continue;
}
}

let manifest = entry.load_manifest(&file_io).await?;

let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive()));
Expand All @@ -186,6 +240,28 @@ impl TableScan {
.boxed())
}

fn create_manifest_evaluator(
id: i32,
schema: SchemaRef,
table_metadata: Arc<TableMetadata>,
case_sensitive: bool,
filter: &Predicate,
) -> crate::Result<ManifestEvaluator> {
let bound_predicate = filter.bind(schema.clone(), case_sensitive)?;

let partition_spec = table_metadata.partition_spec_by_id(id).ok_or(Error::new(
ErrorKind::Unexpected,
format!("Could not find partition spec for id {id}"),
))?;

ManifestEvaluator::new(
partition_spec.clone(),
schema.clone(),
bound_predicate,
case_sensitive,
)
}

pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder =
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());
Expand Down

0 comments on commit 2bbf352

Please sign in to comment.