From e6f2140e716fa4abeb3a556a8371ceb1dc4a0fca Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 4 Apr 2024 21:01:49 +0100 Subject: [PATCH] feat: implement manifest filtering in TableScan --- crates/iceberg/src/scan.rs | 80 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 77 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index b96c4701c..b2a9e2200 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -18,14 +18,20 @@ //! Table scan api. use crate::arrow::ArrowReaderBuilder; +use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; +use crate::expr::{Bind, Predicate}; use crate::io::FileIO; -use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef}; +use crate::spec::{ + DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef, +}; use crate::table::Table; use crate::{Error, ErrorKind}; use arrow_array::RecordBatch; use async_stream::try_stream; use futures::stream::{iter, BoxStream}; use futures::StreamExt; +use std::collections::HashMap; +use std::sync::Arc; /// Builder to create table scan. pub struct TableScanBuilder<'a> { @@ -34,6 +40,8 @@ pub struct TableScanBuilder<'a> { column_names: Vec, snapshot_id: Option, batch_size: Option, + case_sensitive: bool, + filter: Option, } impl<'a> TableScanBuilder<'a> { @@ -43,6 +51,8 @@ impl<'a> TableScanBuilder<'a> { column_names: vec![], snapshot_id: None, batch_size: None, + case_sensitive: true, + filter: None, } } @@ -53,6 +63,20 @@ impl<'a> TableScanBuilder<'a> { self } + /// Sets the scan's case sensitivity + pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self { + self.case_sensitive = case_sensitive; + self + } + + /// Specifies a predicate to use as a filter + pub fn with_filter(mut self, predicate: Predicate) -> Self { + // calls rewrite_not to remove Not nodes, which must be absent + // when applying the manifest evaluator + self.filter = Some(predicate.rewrite_not()); + self + } + /// Select all columns. pub fn select_all(mut self) -> Self { self.column_names.clear(); @@ -125,6 +149,8 @@ impl<'a> TableScanBuilder<'a> { column_names: self.column_names, schema, batch_size: self.batch_size, + case_sensitive: self.case_sensitive, + filter: self.filter.map(Arc::new), }) } } @@ -139,6 +165,8 @@ pub struct TableScan { column_names: Vec, schema: SchemaRef, batch_size: Option, + case_sensitive: bool, + filter: Option>, } /// A stream of [`FileScanTask`]. @@ -146,10 +174,20 @@ pub type FileScanTaskStream = BoxStream<'static, crate::Result>; impl TableScan { /// Returns a stream of file scan tasks. + pub async fn plan_files(&self) -> crate::Result { + // Cache `ManifestEvaluatorFactory`s created as part of this scan + let mut manifest_evaluator_factory_cache: HashMap = HashMap::new(); + + // these variables needed to ensure that we don't need to pass a + // reference to self into `try_stream`, as it expects references + // passed in to outlive 'static + let schema = self.schema.clone(); let snapshot = self.snapshot.clone(); let table_metadata = self.table_metadata.clone(); let file_io = self.file_io.clone(); + let case_sensitive = self.case_sensitive; + let filter = self.filter.clone(); Ok(try_stream! { let manifest_list = snapshot @@ -158,8 +196,23 @@ impl TableScan { .await?; // Generate data file stream - let mut entries = iter(manifest_list.entries()); - while let Some(entry) = entries.next().await { + for entry in manifest_list.entries() { + // If this scan has a filter, check the partition evaluator cache for an existing + // PartitionEvaluator that matches this manifest's partition spec ID. + // Use one from the cache if there is one. If not, create one, put it in + // the cache, and take a reference to it. + if let Some(filter) = filter.as_ref() { + let manifest_eval_factory = manifest_evaluator_factory_cache + .entry(entry.partition_spec_id()) + .or_insert_with_key(|key| Self::create_manifest_eval_factory(key, schema.clone(), table_metadata.clone(), case_sensitive, filter)); + + + // reject any manifest files whose partition values don't match the filter. + if !manifest_eval_factory.eval(entry)? { + continue; + } + } + let manifest = entry.load_manifest(&file_io).await?; let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive())); @@ -186,6 +239,27 @@ impl TableScan { .boxed()) } + fn create_manifest_eval_factory( + //&self, + id: &i32, + schema: SchemaRef, + table_metadata: Arc, + case_sensitive: bool, + filter: &Predicate, + ) -> ManifestEvaluator { + let bound_predicate = filter.bind(schema.clone(), case_sensitive).unwrap(); + + let partition_spec = table_metadata.partition_spec_by_id(*id).unwrap(); + + ManifestEvaluator::new( + partition_spec.clone(), + schema.clone(), + bound_predicate, + case_sensitive, + ) + .unwrap() + } + pub async fn to_arrow(&self) -> crate::Result { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());