Skip to content

Commit

Permalink
refactor: cache partition_schema in fn plan_files() (apache#362)
Browse files Browse the repository at this point in the history
* refactor: add partition_schema_cache

* refactor: use context as param object

* fix: test setup

* refactor: clone only when cache miss

* chore: move derive stmts

* refactor: remove unused case_sensitive parameter

* refactor: remove partition_schema_cache

* refactor: move partition_filter into wider scope
  • Loading branch information
marvinlanhenke authored and shaeqahmed committed Dec 9, 2024
1 parent bd125d1 commit d143b34
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 81 deletions.
14 changes: 5 additions & 9 deletions crates/iceberg/src/expr/visitors/manifest_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,19 @@ use crate::spec::{Datum, FieldSummary, ManifestFile};
use crate::Result;
use fnv::FnvHashSet;

#[derive(Debug)]
/// Evaluates a [`ManifestFile`] to see if the partition summaries
/// match a provided [`BoundPredicate`].
///
/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
/// in which data might be found that matches the TableScan's filter.
#[derive(Debug)]
pub(crate) struct ManifestEvaluator {
partition_filter: BoundPredicate,
case_sensitive: bool,
}

impl ManifestEvaluator {
pub(crate) fn new(partition_filter: BoundPredicate, case_sensitive: bool) -> Self {
Self {
partition_filter,
case_sensitive,
}
pub(crate) fn new(partition_filter: BoundPredicate) -> Self {
Self { partition_filter }
}

/// Evaluate this `ManifestEvaluator`'s filter predicate against the
Expand Down Expand Up @@ -310,7 +306,7 @@ mod test {

fn create_partition_schema(
partition_spec: &PartitionSpecRef,
schema: &SchemaRef,
schema: &Schema,
) -> Result<SchemaRef> {
let partition_type = partition_spec.partition_type(schema)?;

Expand Down Expand Up @@ -356,7 +352,7 @@ mod test {
case_sensitive,
)?;

Ok(ManifestEvaluator::new(partition_filter, case_sensitive))
Ok(ManifestEvaluator::new(partition_filter))
}

#[test]
Expand Down
121 changes: 49 additions & 72 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::FileIO;
use crate::spec::{
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, PartitionSpecRef, Schema,
SchemaRef, SnapshotRef, TableMetadataRef,
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, Schema, SchemaRef,
SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -203,31 +203,22 @@ impl TableScan {
continue;
}

if let Some(filter) = context.bound_filter() {
let partition_spec_id = entry.partition_spec_id;
let partition_spec_id = entry.partition_spec_id;

let (partition_spec, partition_schema) =
context.create_partition_spec_and_schema(partition_spec_id)?;

let partition_filter = partition_filter_cache.get(
partition_spec_id,
partition_spec,
partition_schema.clone(),
filter,
context.case_sensitive,
)?;
let partition_filter = partition_filter_cache.get(
partition_spec_id,
&context,
)?;

if let Some(partition_filter) = partition_filter {
let manifest_evaluator = manifest_evaluator_cache.get(
partition_schema.schema_id(),
partition_filter.clone(),
context.case_sensitive,
partition_spec_id,
partition_filter,
);

if !manifest_evaluator.eval(entry)? {
continue;
}

// TODO: Create ExpressionEvaluator
}

let manifest = entry.load_manifest(&context.file_io).await?;
Expand Down Expand Up @@ -321,9 +312,9 @@ impl TableScan {
}
}

#[derive(Debug)]
/// Holds the context necessary for file scanning operations
/// in a streaming environment.
#[derive(Debug)]
struct FileScanStreamContext {
schema: SchemaRef,
snapshot: SnapshotRef,
Expand Down Expand Up @@ -362,37 +353,11 @@ impl FileScanStreamContext {
fn bound_filter(&self) -> Option<&BoundPredicate> {
self.bound_filter.as_ref()
}

/// Creates a reference-counted [`PartitionSpec`] and a
/// corresponding [`Schema`] based on the specified partition spec id.
fn create_partition_spec_and_schema(
&self,
spec_id: i32,
) -> Result<(PartitionSpecRef, SchemaRef)> {
let partition_spec =
self.table_metadata
.partition_spec_by_id(spec_id)
.ok_or(Error::new(
ErrorKind::Unexpected,
format!("Could not find partition spec for id {}", spec_id),
))?;

let partition_type = partition_spec.partition_type(&self.schema)?;
let partition_fields = partition_type.fields().to_owned();
let partition_schema = Arc::new(
Schema::builder()
.with_schema_id(partition_spec.spec_id)
.with_fields(partition_fields)
.build()?,
);

Ok((partition_spec.clone(), partition_schema))
}
}

#[derive(Debug)]
/// Manages the caching of [`BoundPredicate`] objects
/// for [`PartitionSpec`]s based on partition spec id.
#[derive(Debug)]
struct PartitionFilterCache(HashMap<i32, BoundPredicate>);

impl PartitionFilterCache {
Expand All @@ -407,30 +372,47 @@ impl PartitionFilterCache {
fn get(
&mut self,
spec_id: i32,
partition_spec: PartitionSpecRef,
partition_schema: SchemaRef,
filter: &BoundPredicate,
case_sensitive: bool,
) -> Result<&BoundPredicate> {
match self.0.entry(spec_id) {
Entry::Occupied(e) => Ok(e.into_mut()),
Entry::Vacant(e) => {
let mut inclusive_projection = InclusiveProjection::new(partition_spec);

let partition_filter = inclusive_projection
.project(filter)?
.rewrite_not()
.bind(partition_schema, case_sensitive)?;

Ok(e.insert(partition_filter))
}
context: &FileScanStreamContext,
) -> Result<Option<&BoundPredicate>> {
match context.bound_filter() {
None => Ok(None),
Some(filter) => match self.0.entry(spec_id) {
Entry::Occupied(e) => Ok(Some(e.into_mut())),
Entry::Vacant(e) => {
let partition_spec = context
.table_metadata
.partition_spec_by_id(spec_id)
.ok_or(Error::new(
ErrorKind::Unexpected,
format!("Could not find partition spec for id {}", spec_id),
))?;

let partition_type = partition_spec.partition_type(context.schema.as_ref())?;
let partition_fields = partition_type.fields().to_owned();
let partition_schema = Arc::new(
Schema::builder()
.with_schema_id(partition_spec.spec_id)
.with_fields(partition_fields)
.build()?,
);

let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone());

let partition_filter = inclusive_projection
.project(filter)?
.rewrite_not()
.bind(partition_schema.clone(), context.case_sensitive)?;

Ok(Some(e.insert(partition_filter)))
}
},
}
}
}

#[derive(Debug)]
/// Manages the caching of [`ManifestEvaluator`] objects
/// for [`PartitionSpec`]s based on partition spec id.
#[derive(Debug)]
struct ManifestEvaluatorCache(HashMap<i32, ManifestEvaluator>);

impl ManifestEvaluatorCache {
Expand All @@ -442,15 +424,10 @@ impl ManifestEvaluatorCache {

/// Retrieves a [`ManifestEvaluator`] from the cache
/// or computes it if not present.
fn get(
&mut self,
spec_id: i32,
partition_filter: BoundPredicate,
case_sensitive: bool,
) -> &mut ManifestEvaluator {
fn get(&mut self, spec_id: i32, partition_filter: &BoundPredicate) -> &mut ManifestEvaluator {
self.0
.entry(spec_id)
.or_insert(ManifestEvaluator::new(partition_filter, case_sensitive))
.or_insert(ManifestEvaluator::new(partition_filter.clone()))
}
}

Expand Down

0 comments on commit d143b34

Please sign in to comment.