Skip to content

Commit

Permalink
fix(executor, frontend): StreamHopWindow executor should derive win…
Browse files Browse the repository at this point in the history
…dow expr in frontend (#8415)

Co-authored-by: jon-chuang <jon-chuang@users.noreply.github.com>
  • Loading branch information
jon-chuang and jon-chuang authored Mar 8, 2023
1 parent f4817e9 commit abe0bfe
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 159 deletions.
29 changes: 28 additions & 1 deletion dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions e2e_test/streaming/time_window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,42 @@ select * from mv_hop_agg_2 order by window_start, uid;
3 8 2022-01-01 10:45:00
3 8 2022-01-01 11:00:00

statement ok
insert into t1 values
(9, 1, 4, '2022-01-02 10:00:00'),
(10, 3, 3, '2022-01-03 10:05:00'),
(11, 2, 2, '2022-01-04 10:14:00'),
(12, 1, 1, '2022-01-05 10:22:00');

statement ok
flush;

statement ok
create materialized view mv_hop_agg_3 as
select uid, sum(v) as sum_v, window_start
from hop(t1, created_at, interval '1' day, interval '2' day)
group by window_start, uid;


# Test for interval day
query IIT
select * from mv_hop_agg_3 order by window_start, uid;
----
1 11 2021-12-31 00:00:00
2 9 2021-12-31 00:00:00
3 16 2021-12-31 00:00:00
1 15 2022-01-01 00:00:00
2 9 2022-01-01 00:00:00
3 16 2022-01-01 00:00:00
1 4 2022-01-02 00:00:00
3 3 2022-01-02 00:00:00
2 2 2022-01-03 00:00:00
3 3 2022-01-03 00:00:00
1 1 2022-01-04 00:00:00
2 2 2022-01-04 00:00:00
1 1 2022-01-05 00:00:00


statement ok
drop materialized view mv_tumble;

Expand All @@ -146,6 +182,9 @@ drop materialized view mv_hop_agg_1;
statement ok
drop materialized view mv_hop_agg_2;

statement ok
drop materialized view mv_hop_agg_3;

statement error
create materialized view invalid_hop as
select * from hop(t1, created_at, interval '0', interval '1');
Expand Down
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ message HopWindowNode {
data.IntervalUnit window_slide = 2;
data.IntervalUnit window_size = 3;
repeated uint32 output_indices = 4;
repeated expr.ExprNode window_start_exprs = 5;
repeated expr.ExprNode window_end_exprs = 6;
}

message MergeNode {
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,18 @@ impl HopWindowExecutor {

let window_start_col_index = child.schema().len();
let window_end_col_index = child.schema().len() + 1;
let contains_window_start = output_indices.contains(&window_start_col_index);
let contains_window_end = output_indices.contains(&window_end_col_index);
#[for_await]
for data_chunk in child.execute() {
let data_chunk = data_chunk?;
assert!(matches!(data_chunk.vis(), Vis::Compact(_)));
let len = data_chunk.cardinality();
for i in 0..units {
let window_start_col = if contains_window_start {
let window_start_col = if output_indices.contains(&window_start_col_index) {
Some(self.window_start_exprs[i].eval(&data_chunk)?)
} else {
None
};
let window_end_col = if contains_window_end {
let window_end_col = if output_indices.contains(&window_end_col_index) {
Some(self.window_end_exprs[i].eval(&data_chunk)?)
} else {
None
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ impl ToStream for LogicalHopWindow {
fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
let new_input = self.input().to_stream(ctx)?;
let new_logical = self.clone_with_input(new_input);
Ok(StreamHopWindow::new(new_logical).into())
let (window_start_exprs, window_end_exprs) =
new_logical.derive_window_start_and_end_exprs()?;
Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
}

fn logical_rewrite_for_stream(
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ impl HashJoin {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HopWindow {
pub core: generic::HopWindow<PlanRef>,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
}
impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(HopWindow, core, input);

Expand Down Expand Up @@ -625,12 +627,26 @@ pub fn to_stream_prost_body(
unreachable!();
}
Node::HopWindow(me) => {
let window_start_exprs = me
.window_start_exprs
.clone()
.iter()
.map(|x| x.to_expr_proto())
.collect();
let window_end_exprs = me
.window_end_exprs
.clone()
.iter()
.map(|x| x.to_expr_proto())
.collect();
let me = &me.core;
ProstNode::HopWindow(HopWindowNode {
time_col: me.time_col.index() as _,
window_slide: Some(me.window_slide.into()),
window_size: Some(me.window_size.into()),
output_indices: me.output_indices.iter().map(|&x| x as u32).collect(),
window_start_exprs,
window_end_exprs,
})
}
Node::LocalSimpleAgg(me) => {
Expand Down
57 changes: 53 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::HopWindowNode;

use super::{ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;

Expand All @@ -29,10 +30,16 @@ use crate::utils::ColIndexMappingRewriteExt;
pub struct StreamHopWindow {
pub base: PlanBase,
logical: LogicalHopWindow,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
}

impl StreamHopWindow {
pub fn new(logical: LogicalHopWindow) -> Self {
pub fn new(
logical: LogicalHopWindow,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
) -> Self {
let ctx = logical.base.ctx.clone();
let pk_indices = logical.base.logical_pk.to_vec();
let input = logical.input();
Expand Down Expand Up @@ -64,7 +71,12 @@ impl StreamHopWindow {
logical.input().append_only(),
watermark_columns,
);
Self { base, logical }
Self {
base,
logical,
window_start_exprs,
window_end_exprs,
}
}
}

Expand Down Expand Up @@ -95,7 +107,11 @@ impl PlanTreeNodeUnary for StreamHopWindow {
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(self.logical.clone_with_input(input))
Self::new(
self.logical.clone_with_input(input),
self.window_start_exprs.clone(),
self.window_end_exprs.clone(),
)
}
}

Expand All @@ -114,8 +130,41 @@ impl StreamNode for StreamHopWindow {
.iter()
.map(|&x| x as u32)
.collect(),
window_start_exprs: self
.window_start_exprs
.clone()
.iter()
.map(|x| x.to_expr_proto())
.collect(),
window_end_exprs: self
.window_end_exprs
.clone()
.iter()
.map(|x| x.to_expr_proto())
.collect(),
})
}
}

impl ExprRewritable for StreamHopWindow {}
impl ExprRewritable for StreamHopWindow {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical.clone(),
self.window_start_exprs
.clone()
.into_iter()
.map(|e| r.rewrite_expr(e))
.collect(),
self.window_end_exprs
.clone()
.into_iter()
.map(|e| r.rewrite_expr(e))
.collect(),
)
.into()
}
}
Loading

0 comments on commit abe0bfe

Please sign in to comment.