From abe0bfe1c91d33ee8c55e978b0bffc9fa001ae15 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Wed, 8 Mar 2023 21:04:05 +0800 Subject: [PATCH] fix(executor, frontend): `StreamHopWindow` executor should derive window expr in frontend (#8415) Co-authored-by: jon-chuang --- dashboard/proto/gen/stream_plan.ts | 29 ++- e2e_test/streaming/time_window.slt | 39 ++++ proto/stream_plan.proto | 2 + src/batch/src/executor/hop_window.rs | 6 +- .../optimizer/plan_node/logical_hop_window.rs | 4 +- .../src/optimizer/plan_node/stream.rs | 16 ++ .../optimizer/plan_node/stream_hop_window.rs | 57 +++++- src/stream/src/executor/hop_window.rs | 182 ++++-------------- src/stream/src/from_proto/hop_window.rs | 14 ++ 9 files changed, 190 insertions(+), 159 deletions(-) diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index 7b4538b6d68f..1e150e5dc03c 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -641,6 +641,8 @@ export interface HopWindowNode { windowSlide: IntervalUnit | undefined; windowSize: IntervalUnit | undefined; outputIndices: number[]; + windowStartExprs: ExprNode[]; + windowEndExprs: ExprNode[]; } export interface MergeNode { @@ -2867,7 +2869,14 @@ export const DeltaIndexJoinNode = { }; function createBaseHopWindowNode(): HopWindowNode { - return { timeCol: 0, windowSlide: undefined, windowSize: undefined, outputIndices: [] }; + return { + timeCol: 0, + windowSlide: undefined, + windowSize: undefined, + outputIndices: [], + windowStartExprs: [], + windowEndExprs: [], + }; } export const HopWindowNode = { @@ -2877,6 +2886,12 @@ export const HopWindowNode = { windowSlide: isSet(object.windowSlide) ? IntervalUnit.fromJSON(object.windowSlide) : undefined, windowSize: isSet(object.windowSize) ? IntervalUnit.fromJSON(object.windowSize) : undefined, outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], + windowStartExprs: Array.isArray(object?.windowStartExprs) + ? object.windowStartExprs.map((e: any) => ExprNode.fromJSON(e)) + : [], + windowEndExprs: Array.isArray(object?.windowEndExprs) + ? object.windowEndExprs.map((e: any) => ExprNode.fromJSON(e)) + : [], }; }, @@ -2892,6 +2907,16 @@ export const HopWindowNode = { } else { obj.outputIndices = []; } + if (message.windowStartExprs) { + obj.windowStartExprs = message.windowStartExprs.map((e) => e ? ExprNode.toJSON(e) : undefined); + } else { + obj.windowStartExprs = []; + } + if (message.windowEndExprs) { + obj.windowEndExprs = message.windowEndExprs.map((e) => e ? ExprNode.toJSON(e) : undefined); + } else { + obj.windowEndExprs = []; + } return obj; }, @@ -2905,6 +2930,8 @@ export const HopWindowNode = { ? IntervalUnit.fromPartial(object.windowSize) : undefined; message.outputIndices = object.outputIndices?.map((e) => e) || []; + message.windowStartExprs = object.windowStartExprs?.map((e) => ExprNode.fromPartial(e)) || []; + message.windowEndExprs = object.windowEndExprs?.map((e) => ExprNode.fromPartial(e)) || []; return message; }, }; diff --git a/e2e_test/streaming/time_window.slt b/e2e_test/streaming/time_window.slt index a413c952e692..bfe59d38ecfd 100644 --- a/e2e_test/streaming/time_window.slt +++ b/e2e_test/streaming/time_window.slt @@ -128,6 +128,42 @@ select * from mv_hop_agg_2 order by window_start, uid; 3 8 2022-01-01 10:45:00 3 8 2022-01-01 11:00:00 +statement ok +insert into t1 values + (9, 1, 4, '2022-01-02 10:00:00'), + (10, 3, 3, '2022-01-03 10:05:00'), + (11, 2, 2, '2022-01-04 10:14:00'), + (12, 1, 1, '2022-01-05 10:22:00'); + +statement ok +flush; + +statement ok +create materialized view mv_hop_agg_3 as +select uid, sum(v) as sum_v, window_start +from hop(t1, created_at, interval '1' day, interval '2' day) +group by window_start, uid; + + +# Test for interval day +query IIT +select * from mv_hop_agg_3 order by window_start, uid; +---- +1 11 2021-12-31 00:00:00 +2 9 2021-12-31 00:00:00 +3 16 2021-12-31 00:00:00 +1 15 2022-01-01 00:00:00 +2 9 2022-01-01 00:00:00 +3 16 2022-01-01 00:00:00 +1 4 2022-01-02 00:00:00 +3 3 2022-01-02 00:00:00 +2 2 2022-01-03 00:00:00 +3 3 2022-01-03 00:00:00 +1 1 2022-01-04 00:00:00 +2 2 2022-01-04 00:00:00 +1 1 2022-01-05 00:00:00 + + statement ok drop materialized view mv_tumble; @@ -146,6 +182,9 @@ drop materialized view mv_hop_agg_1; statement ok drop materialized view mv_hop_agg_2; +statement ok +drop materialized view mv_hop_agg_3; + statement error create materialized view invalid_hop as select * from hop(t1, created_at, interval '0', interval '1'); diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 23695a61048c..1d084a3ef334 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -328,6 +328,8 @@ message HopWindowNode { data.IntervalUnit window_slide = 2; data.IntervalUnit window_size = 3; repeated uint32 output_indices = 4; + repeated expr.ExprNode window_start_exprs = 5; + repeated expr.ExprNode window_end_exprs = 6; } message MergeNode { diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index a1268702f7f8..e277dcd5bf43 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -171,20 +171,18 @@ impl HopWindowExecutor { let window_start_col_index = child.schema().len(); let window_end_col_index = child.schema().len() + 1; - let contains_window_start = output_indices.contains(&window_start_col_index); - let contains_window_end = output_indices.contains(&window_end_col_index); #[for_await] for data_chunk in child.execute() { let data_chunk = data_chunk?; assert!(matches!(data_chunk.vis(), Vis::Compact(_))); let len = data_chunk.cardinality(); for i in 0..units { - let window_start_col = if contains_window_start { + let window_start_col = if output_indices.contains(&window_start_col_index) { Some(self.window_start_exprs[i].eval(&data_chunk)?) } else { None }; - let window_end_col = if contains_window_end { + let window_end_col = if output_indices.contains(&window_end_col_index) { Some(self.window_end_exprs[i].eval(&data_chunk)?) } else { None diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index d418f604a3c3..93888267a86d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -473,7 +473,9 @@ impl ToStream for LogicalHopWindow { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { let new_input = self.input().to_stream(ctx)?; let new_logical = self.clone_with_input(new_input); - Ok(StreamHopWindow::new(new_logical).into()) + let (window_start_exprs, window_end_exprs) = + new_logical.derive_window_start_and_end_exprs()?; + Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into()) } fn logical_rewrite_for_stream( diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 5a69db437efb..b3b9ec8cb4c4 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -296,6 +296,8 @@ impl HashJoin { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct HopWindow { pub core: generic::HopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, } impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(HopWindow, core, input); @@ -625,12 +627,26 @@ pub fn to_stream_prost_body( unreachable!(); } Node::HopWindow(me) => { + let window_start_exprs = me + .window_start_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(); + let window_end_exprs = me + .window_end_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(); let me = &me.core; ProstNode::HopWindow(HopWindowNode { time_col: me.time_col.index() as _, window_slide: Some(me.window_slide.into()), window_size: Some(me.window_size.into()), output_indices: me.output_indices.iter().map(|&x| x as u32).collect(), + window_start_exprs, + window_end_exprs, }) } Node::LocalSimpleAgg(me) => { diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index 2930746efa4b..133732ff2b7e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -21,6 +21,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::HopWindowNode; use super::{ExprRewritable, LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -29,10 +30,16 @@ use crate::utils::ColIndexMappingRewriteExt; pub struct StreamHopWindow { pub base: PlanBase, logical: LogicalHopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, } impl StreamHopWindow { - pub fn new(logical: LogicalHopWindow) -> Self { + pub fn new( + logical: LogicalHopWindow, + window_start_exprs: Vec, + window_end_exprs: Vec, + ) -> Self { let ctx = logical.base.ctx.clone(); let pk_indices = logical.base.logical_pk.to_vec(); let input = logical.input(); @@ -64,7 +71,12 @@ impl StreamHopWindow { logical.input().append_only(), watermark_columns, ); - Self { base, logical } + Self { + base, + logical, + window_start_exprs, + window_end_exprs, + } } } @@ -95,7 +107,11 @@ impl PlanTreeNodeUnary for StreamHopWindow { } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + Self::new( + self.logical.clone_with_input(input), + self.window_start_exprs.clone(), + self.window_end_exprs.clone(), + ) } } @@ -114,8 +130,41 @@ impl StreamNode for StreamHopWindow { .iter() .map(|&x| x as u32) .collect(), + window_start_exprs: self + .window_start_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(), + window_end_exprs: self + .window_end_exprs + .clone() + .iter() + .map(|x| x.to_expr_proto()) + .collect(), }) } } -impl ExprRewritable for StreamHopWindow {} +impl ExprRewritable for StreamHopWindow { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + Self::new( + self.logical.clone(), + self.window_start_exprs + .clone() + .into_iter() + .map(|e| r.rewrite_expr(e)) + .collect(), + self.window_end_exprs + .clone() + .into_iter() + .map(|e| r.rewrite_expr(e)) + .collect(), + ) + .into() + } +} diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index 4972d9c85810..527c8df1598e 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -16,13 +16,11 @@ use std::num::NonZeroUsize; use futures::StreamExt; use futures_async_stream::try_stream; -use num_traits::CheckedSub; use risingwave_common::array::column::Column; -use risingwave_common::array::{DataChunk, StreamChunk, Vis}; -use risingwave_common::types::{DataType, IntervalUnit, ScalarImpl}; -use risingwave_expr::expr::{new_binary_expr, Expression, InputRefExpression, LiteralExpression}; +use risingwave_common::array::{StreamChunk, Vis}; +use risingwave_common::types::IntervalUnit; +use risingwave_expr::expr::BoxedExpression; use risingwave_expr::ExprError; -use risingwave_pb::expr::expr_node; use super::error::StreamExecutorError; use super::{ActorContextRef, BoxedExecutor, Executor, ExecutorInfo, Message}; @@ -36,10 +34,13 @@ pub struct HopWindowExecutor { pub time_col_idx: usize, pub window_slide: IntervalUnit, pub window_size: IntervalUnit, + window_start_exprs: Vec, + window_end_exprs: Vec, pub output_indices: Vec, } impl HopWindowExecutor { + #[allow(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, input: BoxedExecutor, @@ -47,6 +48,8 @@ impl HopWindowExecutor { time_col_idx: usize, window_slide: IntervalUnit, window_size: IntervalUnit, + window_start_exprs: Vec, + window_end_exprs: Vec, output_indices: Vec, ) -> Self { HopWindowExecutor { @@ -56,6 +59,8 @@ impl HopWindowExecutor { time_col_idx, window_slide, window_size, + window_start_exprs, + window_end_exprs, output_indices, } } @@ -85,7 +90,7 @@ impl HopWindowExecutor { let Self { ctx, input, - time_col_idx, + window_slide, window_size, output_indices, @@ -104,92 +109,6 @@ impl HopWindowExecutor { })? .get(); - let time_col_data_type = input.schema().fields()[time_col_idx].data_type(); - let output_type = DataType::window_of(&time_col_data_type).unwrap(); - let time_col_ref = InputRefExpression::new(time_col_data_type, self.time_col_idx).boxed(); - - let window_slide_expr = - LiteralExpression::new(DataType::Interval, Some(ScalarImpl::Interval(window_slide))) - .boxed(); - - // The first window_start of hop window should be: - // tumble_start(`time_col` - (`window_size` - `window_slide`), `window_slide`). - // Let's pre calculate (`window_size` - `window_slide`). - let window_size_sub_slide = - window_size - .checked_sub(&window_slide) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_size {} cannot be subtracted by window_slide {}", - window_size, window_slide - ), - })?; - let window_size_sub_slide_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_size_sub_slide)), - ) - .boxed(); - - let hop_start = new_binary_expr( - expr_node::Type::TumbleStart, - output_type.clone(), - new_binary_expr( - expr_node::Type::Subtract, - output_type.clone(), - time_col_ref, - window_size_sub_slide_expr, - )?, - window_slide_expr, - )?; - let mut window_start_exprs = Vec::with_capacity(units); - let mut window_end_exprs = Vec::with_capacity(units); - for i in 0..units { - let window_start_offset = - window_slide - .checked_mul_int(i) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ), - })?; - let window_start_offset_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_start_offset)), - ) - .boxed(); - let window_end_offset = - window_slide - .checked_mul_int(i + units) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ), - })?; - let window_end_offset_expr = LiteralExpression::new( - DataType::Interval, - Some(ScalarImpl::Interval(window_end_offset)), - ) - .boxed(); - let window_start_expr = new_binary_expr( - expr_node::Type::Add, - output_type.clone(), - InputRefExpression::new(output_type.clone(), 0).boxed(), - window_start_offset_expr, - )?; - window_start_exprs.push(window_start_expr); - let window_end_expr = new_binary_expr( - expr_node::Type::Add, - output_type.clone(), - InputRefExpression::new(output_type.clone(), 0).boxed(), - window_end_offset_expr, - )?; - window_end_exprs.push(window_end_expr); - } let window_start_col_index = input.schema().len(); let window_end_col_index = input.schema().len() + 1; #[for_await] @@ -199,17 +118,13 @@ impl HopWindowExecutor { // TODO: compact may be not necessary here. let chunk = chunk.compact(); let (data_chunk, ops) = chunk.into_parts(); - let hop_start = hop_start - .eval_infallible(&data_chunk, |err| ctx.on_compute_error(err, &info.identity)); - let len = hop_start.len(); - let hop_start_chunk = DataChunk::new(vec![Column::new(hop_start)], len); - let (origin_cols, vis) = data_chunk.into_parts(); // SAFETY: Already compacted. - assert!(matches!(vis, Vis::Compact(_))); + assert!(matches!(data_chunk.vis(), Vis::Compact(_))); + let _len = data_chunk.cardinality(); for i in 0..units { let window_start_col = if output_indices.contains(&window_start_col_index) { Some( - window_start_exprs[i].eval_infallible(&hop_start_chunk, |err| { + self.window_start_exprs[i].eval_infallible(&data_chunk, |err| { ctx.on_compute_error(err, &info.identity) }), ) @@ -218,7 +133,7 @@ impl HopWindowExecutor { }; let window_end_col = if output_indices.contains(&window_end_col_index) { Some( - window_end_exprs[i].eval_infallible(&hop_start_chunk, |err| { + self.window_end_exprs[i].eval_infallible(&data_chunk, |err| { ctx.on_compute_error(err, &info.identity) }), ) @@ -229,7 +144,7 @@ impl HopWindowExecutor { .iter() .filter_map(|&idx| { if idx < window_start_col_index { - Some(origin_cols[idx].clone()) + Some(data_chunk.column_at(idx).clone()) } else if idx == window_start_col_index { Some(Column::new(window_start_col.clone().unwrap())) } else if idx == window_end_col_index { @@ -256,12 +171,12 @@ mod tests { use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, IntervalUnit}; + use risingwave_expr::expr::test_utils::make_hop_window_expression; use crate::executor::test_utils::MockSource; use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk}; - #[tokio::test] - async fn test_execute() { + fn create_executor(output_indices: Vec) -> Box { let field1 = Field::unnamed(DataType::Int64); let field2 = Field::unnamed(DataType::Int64); let field3 = Field::with_name(DataType::Timestamp, "created_at"); @@ -280,28 +195,35 @@ mod tests { + 8 3 ^11:02:00" .replace('^', "2022-2-2T"), ); - let input = MockSource::with_chunks(schema.clone(), pk_indices.clone(), vec![chunk]).boxed(); - let window_slide = IntervalUnit::from_minutes(15); let window_size = IntervalUnit::from_minutes(30); - let default_indices: Vec<_> = (0..5).collect(); - let executor = super::HopWindowExecutor::new( + let (window_start_exprs, window_end_exprs) = + make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap(); + + super::HopWindowExecutor::new( ActorContext::create(123), input, ExecutorInfo { // TODO: the schema is incorrect, but it seems useless here. - schema: schema.clone(), + schema, pk_indices, identity: "test".to_string(), }, 2, window_slide, window_size, - default_indices, + window_start_exprs, + window_end_exprs, + output_indices, ) - .boxed(); + .boxed() + } + #[tokio::test] + async fn test_execute() { + let default_indices: Vec<_> = (0..5).collect(); + let executor = create_executor(default_indices); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. @@ -343,45 +265,7 @@ mod tests { #[tokio::test] async fn test_output_indices() { - let field1 = Field::unnamed(DataType::Int64); - let field2 = Field::unnamed(DataType::Int64); - let field3 = Field::with_name(DataType::Timestamp, "created_at"); - let schema = Schema::new(vec![field1, field2, field3]); - let pk_indices = vec![0]; - - let chunk = StreamChunk::from_pretty( - &"I I TS - + 1 1 ^10:00:00 - + 2 3 ^10:05:00 - - 3 2 ^10:14:00 - + 4 1 ^10:22:00 - - 5 3 ^10:33:00 - + 6 2 ^10:42:00 - - 7 1 ^10:51:00 - + 8 3 ^11:02:00" - .replace('^', "2022-2-2T"), - ); - - let input = - MockSource::with_chunks(schema.clone(), pk_indices.clone(), vec![chunk]).boxed(); - - let window_slide = IntervalUnit::from_minutes(15); - let window_size = IntervalUnit::from_minutes(30); - let executor = super::HopWindowExecutor::new( - ActorContext::create(123), - input, - ExecutorInfo { - // TODO: the schema is incorrect, but it seems useless here. - schema: schema.clone(), - pk_indices, - identity: "test".to_string(), - }, - 2, - window_slide, - window_size, - vec![4, 1, 0, 2], - ) - .boxed(); + let executor = create_executor(vec![4, 1, 0, 2]); let mut stream = executor.execute(); // TODO: add more test infra to reduce the duplicated codes below. diff --git a/src/stream/src/from_proto/hop_window.rs b/src/stream/src/from_proto/hop_window.rs index ef27b16993c3..d0151c795183 100644 --- a/src/stream/src/from_proto/hop_window.rs +++ b/src/stream/src/from_proto/hop_window.rs @@ -14,6 +14,7 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; +use risingwave_expr::expr::build_from_prost; use risingwave_pb::stream_plan::HopWindowNode; use super::*; @@ -47,6 +48,17 @@ impl ExecutorBuilder for HopWindowExecutorBuilder { .map(|&x| x as usize) .collect_vec(); + let window_start_exprs: Vec<_> = node + .get_window_start_exprs() + .iter() + .map(build_from_prost) + .try_collect()?; + let window_end_exprs: Vec<_> = node + .get_window_end_exprs() + .iter() + .map(build_from_prost) + .try_collect()?; + let time_col = node.get_time_col() as usize; let time_col_data_type = input.schema().fields()[time_col].data_type(); let output_type = DataType::window_of(&time_col_data_type).unwrap(); @@ -79,6 +91,8 @@ impl ExecutorBuilder for HopWindowExecutorBuilder { time_col, window_slide, window_size, + window_start_exprs, + window_end_exprs, output_indices, ) .boxed())