Skip to content

Commit

Permalink
fix: disable field pruning in last non null mode (#4740)
Browse files Browse the repository at this point in the history
* fix: don't prune fields in last non null mode

* test: add sqlness test for field pruning

* test: add flush

* refine implementation

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
evenyag and waynexia authored Sep 20, 2024
1 parent f5cf25b commit f02410c
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 1 deletion.
39 changes: 38 additions & 1 deletion src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Scans a region according to the scan request.
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -24,6 +25,7 @@ use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use datafusion_expr::utils::expr_to_columns;
use smallvec::SmallVec;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
Expand Down Expand Up @@ -295,6 +297,9 @@ impl ScanRegion {
self.version.options.append_mode,
);

// Remove field filters for LastNonNull mode after logging the request.
self.maybe_remove_field_filters();

let inverted_index_applier = self.build_invereted_index_applier();
let fulltext_index_applier = self.build_fulltext_index_applier();
let predicate = Predicate::new(self.request.filters.clone());
Expand All @@ -321,7 +326,7 @@ impl ScanRegion {
Ok(input)
}

/// Build time range predicate from filters.
/// Build time range predicate from filters, also remove time filters from request.
fn build_time_range_predicate(&mut self) -> TimestampRange {
let time_index = self.version.metadata.time_index_column();
let unit = time_index
Expand All @@ -337,6 +342,38 @@ impl ScanRegion {
)
}

/// Remove field filters if the merge mode is [MergeMode::LastNonNull].
fn maybe_remove_field_filters(&mut self) {
if self.version.options.merge_mode() != MergeMode::LastNonNull {
return;
}

// TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window.
let field_columns = self
.version
.metadata
.field_columns()
.map(|col| &col.column_schema.name)
.collect::<HashSet<_>>();
// Columns in the expr.
let mut columns = HashSet::new();

self.request.filters.retain(|expr| {
columns.clear();
// `expr_to_columns` won't return error.
if expr_to_columns(expr, &mut columns).is_err() {
return false;
}
for column in &columns {
if field_columns.contains(&column.name) {
// This expr uses the field column.
return false;
}
}
true
});
}

/// Use the latest schema to build the inveretd index applier.
fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
if self.ignore_inverted_index {
Expand Down
69 changes: 69 additions & 0 deletions tests/cases/standalone/common/select/prune_field.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
CREATE TABLE IF NOT EXISTS prune_field (
ts TIMESTAMP TIME INDEX,
tag UInt16,
a UInt8,
b UInt8,
PRIMARY KEY (tag)) ENGINE = mito WITH('merge_mode'='last_non_null');

Affected Rows: 0

insert into prune_field(ts, tag, a, b) values(0, 1, 1, null);

Affected Rows: 1

admin flush_table('prune_field');

+----------------------------------+
| ADMIN flush_table('prune_field') |
+----------------------------------+
| 0 |
+----------------------------------+

insert into prune_field(ts, tag, a, b) values(0, 1, null, 1);

Affected Rows: 1

admin flush_table('prune_field');

+----------------------------------+
| ADMIN flush_table('prune_field') |
+----------------------------------+
| 0 |
+----------------------------------+

select * from prune_field where a = 1;

+---------------------+-----+---+---+
| ts | tag | a | b |
+---------------------+-----+---+---+
| 1970-01-01T00:00:00 | 1 | 1 | 1 |
+---------------------+-----+---+---+

select * from prune_field where b = 1;

+---------------------+-----+---+---+
| ts | tag | a | b |
+---------------------+-----+---+---+
| 1970-01-01T00:00:00 | 1 | 1 | 1 |
+---------------------+-----+---+---+

select * from prune_field;

+---------------------+-----+---+---+
| ts | tag | a | b |
+---------------------+-----+---+---+
| 1970-01-01T00:00:00 | 1 | 1 | 1 |
+---------------------+-----+---+---+

select * from prune_field where a = 1 and b = 1;

+---------------------+-----+---+---+
| ts | tag | a | b |
+---------------------+-----+---+---+
| 1970-01-01T00:00:00 | 1 | 1 | 1 |
+---------------------+-----+---+---+

drop table prune_field;

Affected Rows: 0

24 changes: 24 additions & 0 deletions tests/cases/standalone/common/select/prune_field.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE IF NOT EXISTS prune_field (
ts TIMESTAMP TIME INDEX,
tag UInt16,
a UInt8,
b UInt8,
PRIMARY KEY (tag)) ENGINE = mito WITH('merge_mode'='last_non_null');

insert into prune_field(ts, tag, a, b) values(0, 1, 1, null);

admin flush_table('prune_field');

insert into prune_field(ts, tag, a, b) values(0, 1, null, 1);

admin flush_table('prune_field');

select * from prune_field where a = 1;

select * from prune_field where b = 1;

select * from prune_field;

select * from prune_field where a = 1 and b = 1;

drop table prune_field;

0 comments on commit f02410c

Please sign in to comment.