diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index bcb596718..fd2ebddba 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -21,23 +21,19 @@ use crate::spec::{Datum, FieldSummary, ManifestFile}; use crate::Result; use fnv::FnvHashSet; -#[derive(Debug)] /// Evaluates a [`ManifestFile`] to see if the partition summaries /// match a provided [`BoundPredicate`]. /// /// Used by [`TableScan`] to prune the list of [`ManifestFile`]s /// in which data might be found that matches the TableScan's filter. +#[derive(Debug)] pub(crate) struct ManifestEvaluator { partition_filter: BoundPredicate, - case_sensitive: bool, } impl ManifestEvaluator { - pub(crate) fn new(partition_filter: BoundPredicate, case_sensitive: bool) -> Self { - Self { - partition_filter, - case_sensitive, - } + pub(crate) fn new(partition_filter: BoundPredicate) -> Self { + Self { partition_filter } } /// Evaluate this `ManifestEvaluator`'s filter predicate against the @@ -310,7 +306,7 @@ mod test { fn create_partition_schema( partition_spec: &PartitionSpecRef, - schema: &SchemaRef, + schema: &Schema, ) -> Result { let partition_type = partition_spec.partition_type(schema)?; @@ -356,7 +352,7 @@ mod test { case_sensitive, )?; - Ok(ManifestEvaluator::new(partition_filter, case_sensitive)) + Ok(ManifestEvaluator::new(partition_filter)) } #[test] diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index b842522e2..c2a5e1b2d 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -23,8 +23,8 @@ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::spec::{ - DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, PartitionSpecRef, Schema, - SchemaRef, SnapshotRef, TableMetadataRef, + DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, Schema, SchemaRef, + SnapshotRef, TableMetadataRef, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -203,31 +203,22 @@ impl TableScan { continue; } - if let Some(filter) = context.bound_filter() { - let partition_spec_id = entry.partition_spec_id; + let partition_spec_id = entry.partition_spec_id; - let (partition_spec, partition_schema) = - context.create_partition_spec_and_schema(partition_spec_id)?; - - let partition_filter = partition_filter_cache.get( - partition_spec_id, - partition_spec, - partition_schema.clone(), - filter, - context.case_sensitive, - )?; + let partition_filter = partition_filter_cache.get( + partition_spec_id, + &context, + )?; + if let Some(partition_filter) = partition_filter { let manifest_evaluator = manifest_evaluator_cache.get( - partition_schema.schema_id(), - partition_filter.clone(), - context.case_sensitive, + partition_spec_id, + partition_filter, ); if !manifest_evaluator.eval(entry)? { continue; } - - // TODO: Create ExpressionEvaluator } let manifest = entry.load_manifest(&context.file_io).await?; @@ -321,9 +312,9 @@ impl TableScan { } } -#[derive(Debug)] /// Holds the context necessary for file scanning operations /// in a streaming environment. +#[derive(Debug)] struct FileScanStreamContext { schema: SchemaRef, snapshot: SnapshotRef, @@ -362,37 +353,11 @@ impl FileScanStreamContext { fn bound_filter(&self) -> Option<&BoundPredicate> { self.bound_filter.as_ref() } - - /// Creates a reference-counted [`PartitionSpec`] and a - /// corresponding [`Schema`] based on the specified partition spec id. - fn create_partition_spec_and_schema( - &self, - spec_id: i32, - ) -> Result<(PartitionSpecRef, SchemaRef)> { - let partition_spec = - self.table_metadata - .partition_spec_by_id(spec_id) - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Could not find partition spec for id {}", spec_id), - ))?; - - let partition_type = partition_spec.partition_type(&self.schema)?; - let partition_fields = partition_type.fields().to_owned(); - let partition_schema = Arc::new( - Schema::builder() - .with_schema_id(partition_spec.spec_id) - .with_fields(partition_fields) - .build()?, - ); - - Ok((partition_spec.clone(), partition_schema)) - } } -#[derive(Debug)] /// Manages the caching of [`BoundPredicate`] objects /// for [`PartitionSpec`]s based on partition spec id. +#[derive(Debug)] struct PartitionFilterCache(HashMap); impl PartitionFilterCache { @@ -407,30 +372,47 @@ impl PartitionFilterCache { fn get( &mut self, spec_id: i32, - partition_spec: PartitionSpecRef, - partition_schema: SchemaRef, - filter: &BoundPredicate, - case_sensitive: bool, - ) -> Result<&BoundPredicate> { - match self.0.entry(spec_id) { - Entry::Occupied(e) => Ok(e.into_mut()), - Entry::Vacant(e) => { - let mut inclusive_projection = InclusiveProjection::new(partition_spec); - - let partition_filter = inclusive_projection - .project(filter)? - .rewrite_not() - .bind(partition_schema, case_sensitive)?; - - Ok(e.insert(partition_filter)) - } + context: &FileScanStreamContext, + ) -> Result> { + match context.bound_filter() { + None => Ok(None), + Some(filter) => match self.0.entry(spec_id) { + Entry::Occupied(e) => Ok(Some(e.into_mut())), + Entry::Vacant(e) => { + let partition_spec = context + .table_metadata + .partition_spec_by_id(spec_id) + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Could not find partition spec for id {}", spec_id), + ))?; + + let partition_type = partition_spec.partition_type(context.schema.as_ref())?; + let partition_fields = partition_type.fields().to_owned(); + let partition_schema = Arc::new( + Schema::builder() + .with_schema_id(partition_spec.spec_id) + .with_fields(partition_fields) + .build()?, + ); + + let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone()); + + let partition_filter = inclusive_projection + .project(filter)? + .rewrite_not() + .bind(partition_schema.clone(), context.case_sensitive)?; + + Ok(Some(e.insert(partition_filter))) + } + }, } } } -#[derive(Debug)] /// Manages the caching of [`ManifestEvaluator`] objects /// for [`PartitionSpec`]s based on partition spec id. +#[derive(Debug)] struct ManifestEvaluatorCache(HashMap); impl ManifestEvaluatorCache { @@ -442,15 +424,10 @@ impl ManifestEvaluatorCache { /// Retrieves a [`ManifestEvaluator`] from the cache /// or computes it if not present. - fn get( - &mut self, - spec_id: i32, - partition_filter: BoundPredicate, - case_sensitive: bool, - ) -> &mut ManifestEvaluator { + fn get(&mut self, spec_id: i32, partition_filter: &BoundPredicate) -> &mut ManifestEvaluator { self.0 .entry(spec_id) - .or_insert(ManifestEvaluator::new(partition_filter, case_sensitive)) + .or_insert(ManifestEvaluator::new(partition_filter.clone())) } }