Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(executor, frontend): StreamHopWindow executor should derive window expr in frontend #8415

Merged
merged 17 commits into from
Mar 8, 2023
Merged
29 changes: 28 additions & 1 deletion dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions e2e_test/streaming/time_window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,42 @@ select * from mv_hop_agg_2 order by window_start, uid;
3 8 2022-01-01 10:45:00
3 8 2022-01-01 11:00:00

statement ok
insert into t1 values
(9, 1, 4, '2022-01-02 10:00:00'),
(10, 3, 3, '2022-01-03 10:05:00'),
(11, 2, 2, '2022-01-04 10:14:00'),
(12, 1, 1, '2022-01-05 10:22:00');

statement ok
flush;

statement ok
create materialized view mv_hop_agg_3 as
select uid, sum(v) as sum_v, window_start
from hop(t1, created_at, interval '1' day, interval '2' day)
group by window_start, uid;


# Test for interval day
query IIT
select * from mv_hop_agg_3 order by window_start, uid;
----
1 11 2021-12-31 00:00:00
2 9 2021-12-31 00:00:00
3 16 2021-12-31 00:00:00
1 15 2022-01-01 00:00:00
2 9 2022-01-01 00:00:00
3 16 2022-01-01 00:00:00
1 4 2022-01-02 00:00:00
3 3 2022-01-02 00:00:00
2 2 2022-01-03 00:00:00
3 3 2022-01-03 00:00:00
1 1 2022-01-04 00:00:00
2 2 2022-01-04 00:00:00
1 1 2022-01-05 00:00:00


statement ok
drop materialized view mv_tumble;

Expand All @@ -146,6 +182,9 @@ drop materialized view mv_hop_agg_1;
statement ok
drop materialized view mv_hop_agg_2;

statement ok
drop materialized view mv_hop_agg_3;

statement error
create materialized view invalid_hop as
select * from hop(t1, created_at, interval '0', interval '1');
Expand Down
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ message HopWindowNode {
data.IntervalUnit window_slide = 2;
data.IntervalUnit window_size = 3;
repeated uint32 output_indices = 4;
repeated expr.ExprNode window_start_exprs = 5;
repeated expr.ExprNode window_end_exprs = 6;
}

message MergeNode {
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,18 @@ impl HopWindowExecutor {

let window_start_col_index = child.schema().len();
let window_end_col_index = child.schema().len() + 1;
let contains_window_start = output_indices.contains(&window_start_col_index);
let contains_window_end = output_indices.contains(&window_end_col_index);
#[for_await]
for data_chunk in child.execute() {
let data_chunk = data_chunk?;
assert!(matches!(data_chunk.vis(), Vis::Compact(_)));
let len = data_chunk.cardinality();
for i in 0..units {
let window_start_col = if contains_window_start {
let window_start_col = if output_indices.contains(&window_start_col_index) {
Some(self.window_start_exprs[i].eval(&data_chunk)?)
} else {
None
};
let window_end_col = if contains_window_end {
let window_end_col = if output_indices.contains(&window_end_col_index) {
Some(self.window_end_exprs[i].eval(&data_chunk)?)
} else {
None
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ impl ToStream for LogicalHopWindow {
fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
let new_input = self.input().to_stream(ctx)?;
let new_logical = self.clone_with_input(new_input);
Ok(StreamHopWindow::new(new_logical).into())
let (window_start_exprs, window_end_exprs) =
new_logical.derive_window_start_and_end_exprs()?;
Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
}

fn logical_rewrite_for_stream(
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ impl HashJoin {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HopWindow {
pub core: generic::HopWindow<PlanRef>,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
}
impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(HopWindow, core, input);

Expand Down Expand Up @@ -625,12 +627,26 @@ pub fn to_stream_prost_body(
unreachable!();
}
Node::HopWindow(me) => {
let window_start_exprs = me
.window_start_exprs
.clone()
.iter()
.map(|x| x.to_expr_proto())
.collect();
let window_end_exprs = me
.window_end_exprs
.clone()
.iter()
.map(|x| x.to_expr_proto())
.collect();
let me = &me.core;
ProstNode::HopWindow(HopWindowNode {
time_col: me.time_col.index() as _,
window_slide: Some(me.window_slide.into()),
window_size: Some(me.window_size.into()),
output_indices: me.output_indices.iter().map(|&x| x as u32).collect(),
window_start_exprs,
window_end_exprs,
})
}
Node::LocalSimpleAgg(me) => {
Expand Down
57 changes: 53 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::HopWindowNode;

use super::{ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;

Expand All @@ -29,10 +30,16 @@ use crate::utils::ColIndexMappingRewriteExt;
pub struct StreamHopWindow {
pub base: PlanBase,
logical: LogicalHopWindow,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
}

impl StreamHopWindow {
pub fn new(logical: LogicalHopWindow) -> Self {
pub fn new(
logical: LogicalHopWindow,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
) -> Self {
let ctx = logical.base.ctx.clone();
let pk_indices = logical.base.logical_pk.to_vec();
let input = logical.input();
Expand Down Expand Up @@ -64,7 +71,12 @@ impl StreamHopWindow {
logical.input().append_only(),
watermark_columns,
);
Self { base, logical }
Self {
base,
logical,
window_start_exprs,
window_end_exprs,
}
}
}

Expand Down Expand Up @@ -95,7 +107,11 @@ impl PlanTreeNodeUnary for StreamHopWindow {
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(self.logical.clone_with_input(input))
Self::new(
self.logical.clone_with_input(input),
self.window_start_exprs.clone(),
self.window_end_exprs.clone(),
)
}
}

Expand All @@ -114,8 +130,41 @@ impl StreamNode for StreamHopWindow {
.iter()
.map(|&x| x as u32)
.collect(),
window_start_exprs: self
.window_start_exprs
.clone()
.iter()
.map(|x| x.to_expr_proto())
.collect(),
window_end_exprs: self
.window_end_exprs
.clone()
.iter()
.map(|x| x.to_expr_proto())
.collect(),
})
}
}

impl ExprRewritable for StreamHopWindow {}
impl ExprRewritable for StreamHopWindow {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical.clone(),
self.window_start_exprs
.clone()
.into_iter()
.map(|e| r.rewrite_expr(e))
.collect(),
self.window_end_exprs
.clone()
.into_iter()
.map(|e| r.rewrite_expr(e))
.collect(),
)
.into()
}
}
Loading