diff --git a/e2e_test/streaming/mv_on_mv.slt b/e2e_test/streaming/mv_on_mv.slt index e05cc9faa800d..05ce3ea5532e2 100644 --- a/e2e_test/streaming/mv_on_mv.slt +++ b/e2e_test/streaming/mv_on_mv.slt @@ -16,7 +16,7 @@ statement ok create materialized view m3 as select sum(m1.v1) as sum_m1_v1, sum(m1.v2) as sum_m1_v2 from m1; statement ok -create materialized view m4 as select m1.v1, m1.v2, m2.v1, m2.v2 from m1 join m2 on m1.v1 = m2.v1; +create materialized view m4 as select m1.v1 as m1v1, m1.v2 as m1v2, m2.v1 as m2v1, m2.v2 as m2v2 from m1 join m2 on m1.v1 = m2.v1; statement ok flush; diff --git a/e2e_test/v2/streaming/mv_on_mv.slt b/e2e_test/v2/streaming/mv_on_mv.slt new file mode 100644 index 0000000000000..4ea0ba82fb89e --- /dev/null +++ b/e2e_test/v2/streaming/mv_on_mv.slt @@ -0,0 +1,282 @@ +control sortmode rowsort + +statement ok +create table t1(v1 int not null, v2 int not null); + +statement ok +create table t2(v1 real not null, v2 int not null, v3 real not null); + +statement ok +create materialized view m1 as select v1, v2 from t1 where v1 = 1; + +statement ok +create materialized view m2 as select v1, v2 from t1 where v2 = 1; + +statement ok +create materialized view m3 as select sum(m1.v1) as sum_m1_v1, sum(m1.v2) as sum_m1_v2 from m1; + +statement ok +create materialized view m4 as select m1.v1 as m1v1, m1.v2 as m1v2, m2.v1 as m2v1, m2.v2 as m2v2 from m1 join m2 on m1.v1 = m2.v1; + +statement ok +flush; + +statement ok +insert into t1 values (2,1),(1,2),(1,1); + +statement ok +flush; + +query II +select v1, v2 from t1; +---- +2 1 +1 2 +1 1 + +query II +select v1, v2 from m1; +---- +1 2 +1 1 + +query II +select v1, v2 from m2; +---- +2 1 +1 1 + +query II +select sum_m1_v1, sum_m1_v2 from m3; +---- +2 3 + +query IIII +select m1v1, m1v2, m2v1, m2v2 from m4; +---- +1 2 1 1 +1 1 1 1 + +statement ok +create materialized view m5 as select m1.v1 as m1v1, m1.v2 as m1v2, m2.v1 as m2v1, m2.v2 as m2v2 from m1 join m2 on m1.v1 = m2.v1; + +statement ok +flush; + +query IIII +select m1v1, m1v2, m2v1, m2v2 from m5; +---- +1 2 1 1 +1 1 1 1 + +statement ok +insert into t1 values (3,1),(1,3); + +statement ok +flush; + +query IIII +select m1v1, m1v2, m2v1, m2v2 from m5; +---- +1 2 1 1 +1 1 1 1 +1 3 1 1 + +statement ok +create materialized view m6 as select v2, v3, v1 from t2; + +statement ok +create materialized view m7 as select sum(v1) as sum_v1, avg(v3) as avg_v3 from m6 group by v1; + +statement ok +create materialized view m8 as select sum(v1) as sum_v1 from m6 group by v2; + +statement ok +flush; + +statement ok +insert into t2 values (1,2,3),(1,5,6),(3,2,6); + +statement ok +flush; + +query III +select v1, v2, v3 from t2; +---- +1 2 3 +1 5 6 +3 2 6 + +query III +select v2, v3, v1 from m6; +---- +2 3 1 +5 6 1 +2 6 3 + +query II +select sum_v1, avg_v3 from m7; +---- +2 4.5 +3 6 + +query I +select sum_v1 from m8; +---- +4 +1 + +statement ok +create materialized view m9 as select * from t2; + +statement ok +create materialized view m10 as select sum(v1) as sum_v1, sum(v3) as sum_v3 from m9 group by v2; + +statement ok +flush; + +query III +select v1, v2, v3 from m9; +---- +1 2 3 +1 5 6 +3 2 6 + +query II +select sum_v1, sum_v3 from m10; +---- +4 9 +1 6 + +statement ok +create table t3(v1 int not null, v2 int not null, v3 int not null); + +statement ok +flush; + +statement ok +insert into t3 values (1,2,3),(1,5,6),(3,2,6); + +statement ok +flush; + +statement ok +create materialized view m11 as select v1, v2, v3 from t3; + +statement ok +create materialized view m12 as select avg(v1) as avg_v1, sum(v2) as sum_v2, count(v3) as count_v3 from m11; + +statement ok +flush; + +query III +select v1, v2, v3 from m11; +---- +1 2 3 +1 5 6 +3 2 6 + +# TODO: enable avg_v1 +query III +select sum_v2, count_v3 from m12; +---- +9 3 + +statement ok +create table t4(v1 real not null); + +statement ok +flush; + +statement ok +insert into t4 values(1),(2),(3); + +statement ok +flush; + +statement ok +create materialized view m13 as select * from t4; + +statement ok +create materialized view m14 as select count(v1) as count_v1 from m13; + +statement ok +flush; + +query I +select v1 from m13; +---- +1 +2 +3 + +query I +select * from m14; +---- +3 + +statement ok +flush; + +statement ok +flush; + +statement ok +flush; + +statement ok +drop materialized view m3; + +statement ok +drop materialized view m4; + +statement ok +drop materialized view m5; + +statement ok +drop materialized view m1; + +statement ok +drop materialized view m2; + +statement ok +drop table t1; + + +statement ok +drop materialized view m7; + +statement ok +drop materialized view m8; + +statement ok +drop materialized view m6; + +statement ok +drop materialized view m10; + +statement ok +drop materialized view m9; + +statement ok +drop table t2; + + +statement ok +drop materialized view m12; + +statement ok +drop materialized view m11; + +statement ok +drop table t3; + + +statement ok +drop materialized view m14; + +statement ok +drop materialized view m13; + +statement ok +drop table t4; diff --git a/rust/frontend/src/binder/bind_context.rs b/rust/frontend/src/binder/bind_context.rs index ee45d355c0ef1..e453723a747f3 100644 --- a/rust/frontend/src/binder/bind_context.rs +++ b/rust/frontend/src/binder/bind_context.rs @@ -24,15 +24,23 @@ pub struct ColumnBinding { pub column_name: String, pub index: usize, pub data_type: DataType, + pub is_hidden: bool, } impl ColumnBinding { - pub fn new(table_name: String, column_name: String, index: usize, data_type: DataType) -> Self { + pub fn new( + table_name: String, + column_name: String, + index: usize, + data_type: DataType, + is_hidden: bool, + ) -> Self { ColumnBinding { table_name, column_name, index, data_type, + is_hidden, } } } diff --git a/rust/frontend/src/binder/relation.rs b/rust/frontend/src/binder/relation.rs index bedb60fd8f594..33e9c3af1fdde 100644 --- a/rust/frontend/src/binder/relation.rs +++ b/rust/frontend/src/binder/relation.rs @@ -14,6 +14,7 @@ use std::collections::hash_map::Entry; +use itertools::Itertools; use risingwave_common::catalog::{ColumnDesc, TableDesc, DEFAULT_SCHEMA_NAME}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::try_match_expand; @@ -180,12 +181,14 @@ impl Binder { let table_desc = table_catalog.table_desc(); let columns = table_catalog.columns().to_vec(); - let columns = columns - .into_iter() - .map(|c| c.column_desc) - .collect::>(); self.bind_context( - columns.iter().cloned().map(|c| (c.name, c.data_type)), + columns.iter().map(|c| { + ( + c.column_desc.name.clone(), + c.column_desc.data_type.clone(), + c.is_hidden, + ) + }), table_name.clone(), )?; @@ -224,19 +227,20 @@ impl Binder { /// Fill the [`BindContext`](super::BindContext) for table. fn bind_context( &mut self, - columns: impl IntoIterator, + columns: impl IntoIterator, table_name: String, ) -> Result<()> { let begin = self.context.columns.len(); columns .into_iter() .enumerate() - .for_each(|(index, (name, data_type))| { + .for_each(|(index, (name, data_type, is_hidden))| { self.context.columns.push(ColumnBinding::new( table_name.clone(), name.clone(), begin + index, data_type, + is_hidden, )); self.context .indexs_of @@ -266,7 +270,11 @@ impl Binder { let query = self.bind_query(query)?; let sub_query_id = self.next_subquery_id(); self.bind_context( - itertools::zip_eq(query.names().into_iter(), query.data_types().into_iter()), + query + .names() + .into_iter() + .zip_eq(query.data_types().into_iter()) + .map(|(x, y)| (x, y, false)), format!("{}_{}", UNNAMED_SUBQUERY, sub_query_id), )?; Ok(BoundSubquery { query }) diff --git a/rust/frontend/src/binder/select.rs b/rust/frontend/src/binder/select.rs index fa33fea688b29..ddfa92c110262 100644 --- a/rust/frontend/src/binder/select.rs +++ b/rust/frontend/src/binder/select.rs @@ -132,7 +132,7 @@ impl Binder { aliases.extend(names); } SelectItem::Wildcard => { - let (exprs, names) = Self::bind_columns(&self.context.columns[..])?; + let (exprs, names) = Self::bind_visible_columns(&self.context.columns[..])?; select_list.extend(exprs); aliases.extend(names); } @@ -153,4 +153,23 @@ impl Binder { .unzip(); Ok(bound_columns) } + + pub fn bind_visible_columns( + columns: &[ColumnBinding], + ) -> Result<(Vec, Vec>)> { + let bound_columns = columns + .iter() + .filter_map(|column| { + if !column.is_hidden { + Some(( + InputRef::new(column.index, column.data_type.clone()).into(), + Some(column.column_name.clone()), + )) + } else { + None + } + }) + .unzip(); + Ok(bound_columns) + } } diff --git a/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs b/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs index 30ac148d2e01f..4b38444a1e80a 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -147,7 +147,11 @@ impl ToStreamProst for StreamHashJoin { .iter() .map(|v| *v as i32) .collect(), - condition: Some(self.eq_join_predicate.other_cond().as_expr().to_protobuf()), + condition: self + .eq_join_predicate + .other_cond() + .as_expr_unless_true() + .map(|x| x.to_protobuf()), }) } } diff --git a/rust/frontend/src/utils/condition.rs b/rust/frontend/src/utils/condition.rs index 31066bec7096e..a26bf9b3faa67 100644 --- a/rust/frontend/src/utils/condition.rs +++ b/rust/frontend/src/utils/condition.rs @@ -97,6 +97,22 @@ impl Condition { } } + /// Convert condition to an expression. If always true, return `None`. + pub fn as_expr_unless_true(&self) -> Option { + let mut iter = self.conjunctions.iter(); + if let Some(e) = iter.next() { + let mut ret = e.clone(); + for expr in iter { + ret = FunctionCall::new(ExprType::And, vec![ret, expr.clone()]) + .unwrap() + .into(); + } + Some(ret) + } else { + None + } + } + #[must_use] pub fn and(self, other: Self) -> Self { let mut ret = self; diff --git a/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml b/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml index 7079f3e443db9..c34f067532c56 100644 --- a/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml +++ b/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml @@ -8,14 +8,10 @@ select * from t; batch_plan: | BatchExchange { order: [], dist: Single } - BatchScan { table: t, columns: [_row_id, v1, v2] } + BatchScan { table: t, columns: [v1, v2] } stream_plan: | - StreamMaterialize { columns: [_row_id, v1, v2], pk_columns: [_row_id] } - StreamTableScan { table: t, columns: [_row_id, v1, v2], pk_indices: [0] } -- sql: select interval '100' day - batch_plan: | - BatchProject { exprs: [IntervalUnit { months: 0, days: 100, ms: 0 }:Interval], expr_alias: [ ] } - BatchValues { rows: [[]] } + StreamMaterialize { columns: [v1, v2, _row_id(hidden)], pk_columns: [_row_id] } + StreamTableScan { table: t, columns: [v1, v2, _row_id], pk_indices: [2] } - sql: | create table t (v1 bigint, v2 double precision); select t2.* from t; @@ -26,9 +22,9 @@ batch_plan: | BatchExchange { order: [], dist: Single } BatchFilter { predicate: (((((1:Int32 > 2:Int32) AND (1:Int32 = 1:Int32)) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR (((1:Int32 = 1:Int32) AND (2:Int32 >= 1:Int32)) AND (1:Int32 <= 2:Int32))) } - BatchScan { table: t, columns: [_row_id] } + BatchScan { table: t, columns: [] } stream_plan: | - StreamMaterialize { columns: [_row_id], pk_columns: [_row_id] } + StreamMaterialize { columns: [_row_id(hidden)], pk_columns: [_row_id] } StreamFilter { predicate: (((((1:Int32 > 2:Int32) AND (1:Int32 = 1:Int32)) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR (((1:Int32 = 1:Int32) AND (2:Int32 >= 1:Int32)) AND (1:Int32 <= 2:Int32))) } StreamTableScan { table: t, columns: [_row_id], pk_indices: [0] } - sql: | @@ -36,12 +32,12 @@ select * from t where v1<1; batch_plan: | BatchExchange { order: [], dist: Single } - BatchFilter { predicate: ($1 < 1:Int32) } - BatchScan { table: t, columns: [_row_id, v1] } + BatchFilter { predicate: ($0 < 1:Int32) } + BatchScan { table: t, columns: [v1] } stream_plan: | - StreamMaterialize { columns: [_row_id, v1], pk_columns: [_row_id] } - StreamFilter { predicate: ($1 < 1:Int32) } - StreamTableScan { table: t, columns: [_row_id, v1], pk_indices: [0] } + StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id] } + StreamFilter { predicate: ($0 < 1:Int32) } + StreamTableScan { table: t, columns: [v1, _row_id], pk_indices: [1] } - sql: | create table t (); select (((((false is not true) is true) is not false) is false) is not null) is null from t; diff --git a/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml b/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml index dbfe1f4d25570..657511026bc70 100644 --- a/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml +++ b/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml @@ -47,26 +47,26 @@ select * from t1 join t2 on (t1.v1 = t2.v1) join t3 on (t2.v2 = t3.v2); batch_plan: | BatchExchange { order: [], dist: Single } - BatchHashJoin { type: Inner, predicate: $5 = $8 } - BatchExchange { order: [], dist: HashShard([5]) } - BatchHashJoin { type: Inner, predicate: $1 = $4 } - BatchExchange { order: [], dist: HashShard([1]) } - BatchScan { table: t1, columns: [_row_id, v1, v2] } - BatchExchange { order: [], dist: HashShard([1]) } - BatchScan { table: t2, columns: [_row_id, v1, v2] } - BatchExchange { order: [], dist: HashShard([2]) } - BatchScan { table: t3, columns: [_row_id, v1, v2] } + BatchHashJoin { type: Inner, predicate: $3 = $5 } + BatchExchange { order: [], dist: HashShard([3]) } + BatchHashJoin { type: Inner, predicate: $0 = $2 } + BatchExchange { order: [], dist: HashShard([0]) } + BatchScan { table: t1, columns: [v1, v2] } + BatchExchange { order: [], dist: HashShard([0]) } + BatchScan { table: t2, columns: [v1, v2] } + BatchExchange { order: [], dist: HashShard([1]) } + BatchScan { table: t3, columns: [v1, v2] } stream_plan: | - StreamMaterialize { columns: [_row_id, v1, v2, _row_id, v1, v2, _row_id, v1, v2], pk_columns: [_row_id, _row_id, _row_id] } - StreamHashJoin { type: Inner, predicate: $5 = $8 } - StreamExchange { dist: HashShard([5]) } - StreamHashJoin { type: Inner, predicate: $1 = $4 } - StreamExchange { dist: HashShard([1]) } - StreamTableScan { table: t1, columns: [_row_id, v1, v2], pk_indices: [0] } - StreamExchange { dist: HashShard([1]) } - StreamTableScan { table: t2, columns: [_row_id, v1, v2], pk_indices: [0] } - StreamExchange { dist: HashShard([2]) } - StreamTableScan { table: t3, columns: [_row_id, v1, v2], pk_indices: [0] } + StreamMaterialize { columns: [v1, v2, _row_id, v1, v2(hidden), _row_id(hidden), v1, v2, _row_id(hidden)], pk_columns: [_row_id, _row_id, _row_id] } + StreamHashJoin { type: Inner, predicate: $4 = $7 } + StreamExchange { dist: HashShard([4]) } + StreamHashJoin { type: Inner, predicate: $0 = $3 } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t1, columns: [v1, v2, _row_id], pk_indices: [2] } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t2, columns: [v1, v2, _row_id], pk_indices: [2] } + StreamExchange { dist: HashShard([1]) } + StreamTableScan { table: t3, columns: [v1, v2, _row_id], pk_indices: [2] } - sql: | create table t1 (v1 int not null, v2 int not null); create table t2 (v1 int not null, v2 int not null); diff --git a/rust/frontend/test_runner/tests/testdata/limit.yaml b/rust/frontend/test_runner/tests/testdata/limit.yaml index a29296a0a4bfe..7a1bb0c8b9427 100644 --- a/rust/frontend/test_runner/tests/testdata/limit.yaml +++ b/rust/frontend/test_runner/tests/testdata/limit.yaml @@ -3,23 +3,21 @@ select * from t limit 4; logical_plan: | LogicalLimit { limit: 4, offset: 0 } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id, v] } + LogicalProject { exprs: [$1], expr_alias: [v] } LogicalScan { table: t, columns: [_row_id, v] } - - sql: | create table t (v int not null); select * from t offset 4; logical_plan: | LogicalLimit { limit: 9223372036854775807, offset: 4 } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id, v] } + LogicalProject { exprs: [$1], expr_alias: [v] } LogicalScan { table: t, columns: [_row_id, v] } - - sql: | create table t (v int not null); select * from ( select * from t limit 5 ) limit 4; logical_plan: | LogicalLimit { limit: 4, offset: 0 } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id, v] } + LogicalProject { exprs: [$0], expr_alias: [v] } LogicalLimit { limit: 5, offset: 0 } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id, v] } + LogicalProject { exprs: [$1], expr_alias: [v] } LogicalScan { table: t, columns: [_row_id, v] } diff --git a/rust/frontend/test_runner/tests/testdata/order_by.yaml b/rust/frontend/test_runner/tests/testdata/order_by.yaml index 2f248cc4f949b..efd3ba4f5a904 100644 --- a/rust/frontend/test_runner/tests/testdata/order_by.yaml +++ b/rust/frontend/test_runner/tests/testdata/order_by.yaml @@ -2,9 +2,9 @@ create table t (v1 bigint, v2 double precision); select * from t order by v1 desc; batch_plan: | - BatchExchange { order: [$1 DESC], dist: Single } - BatchSort { order: [$1 DESC] } - BatchScan { table: t, columns: [_row_id, v1, v2] } + BatchExchange { order: [$0 DESC], dist: Single } + BatchSort { order: [$0 DESC] } + BatchScan { table: t, columns: [v1, v2] } - sql: | create table t (v1 bigint, v2 double precision); select t.* from t order by v1; diff --git a/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml b/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml index 326c12f09b019..e4b3e2d0ab2bc 100644 --- a/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml +++ b/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml @@ -3,28 +3,28 @@ create table t2 (v1 int, v2 int, v3 int); select * from t1 join t2 on t1.v1=t2.v2 and t1.v1>1 where t2.v2>2; logical_plan: | - LogicalProject { exprs: [$0, $1, $2, $3, $4, $5, $6, $7], expr_alias: [_row_id, v1, v2, v3, _row_id, v1, v2, v3] } + LogicalProject { exprs: [$1, $2, $3, $5, $6, $7], expr_alias: [v1, v2, v3, v1, v2, v3] } LogicalFilter { predicate: ($6 > 2:Int32) } LogicalJoin { type: Inner, on: ($1 = $6) AND ($1 > 1:Int32) } LogicalScan { table: t1, columns: [_row_id, v1, v2, v3] } LogicalScan { table: t2, columns: [_row_id, v1, v2, v3] } optimized_logical_plan: | - LogicalJoin { type: Inner, on: ($1 = $6) } - LogicalFilter { predicate: ($1 > 1:Int32) } - LogicalScan { table: t1, columns: [_row_id, v1, v2, v3] } - LogicalFilter { predicate: ($2 > 2:Int32) } - LogicalScan { table: t2, columns: [_row_id, v1, v2, v3] } + LogicalJoin { type: Inner, on: ($0 = $4) } + LogicalFilter { predicate: ($0 > 1:Int32) } + LogicalScan { table: t1, columns: [v1, v2, v3] } + LogicalFilter { predicate: ($1 > 2:Int32) } + LogicalScan { table: t2, columns: [v1, v2, v3] } - sql: | create table t (v1 bigint, v2 double precision); select * from (select * from t) where v2 > 1; logical_plan: | - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } - LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } + LogicalProject { exprs: [$0, $1], expr_alias: [v1, v2] } + LogicalFilter { predicate: ($1 > 1:Int32) } + LogicalProject { exprs: [$1, $2], expr_alias: [v1, v2] } LogicalScan { table: t, columns: [_row_id, v1, v2] } optimized_logical_plan: | - LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalFilter { predicate: ($1 > 1:Int32) } + LogicalScan { table: t, columns: [v1, v2] } - sql: | create table t (v1 bigint, v2 double precision); select v1 from (select v2, v1 from t) where v2 > 1; diff --git a/rust/frontend/test_runner/tests/testdata/stream_proto.yaml b/rust/frontend/test_runner/tests/testdata/stream_proto.yaml index 5b38e1a0cb465..29790cc747835 100644 --- a/rust/frontend/test_runner/tests/testdata/stream_proto.yaml +++ b/rust/frontend/test_runner/tests/testdata/stream_proto.yaml @@ -9,22 +9,22 @@ - input: - mergeNode: {} - pkIndices: - - 0 + - 1 batchPlanNode: tableRefId: tableId: 1 columnDescs: - - columnType: - typeName: INT64 - isNullable: true - name: _row_id - columnType: typeName: INT32 isNullable: true columnId: 1 name: v1 + - columnType: + typeName: INT64 + isNullable: true + name: _row_id pkIndices: - - 0 + - 1 chainNode: tableRefId: tableId: 1 @@ -38,10 +38,10 @@ isNullable: true name: v1 columnIds: - - 0 - 1 + - 0 pkIndices: - - 0 + - 1 filterNode: searchCondition: exprType: LESS_THAN @@ -54,8 +54,7 @@ returnType: typeName: INT32 isNullable: true - inputRef: - columnIdx: 1 + inputRef: {} - exprType: CONSTANT_VALUE returnType: typeName: INT32 @@ -63,11 +62,12 @@ constant: body: AAAAAQ== pkIndices: - - 0 + - 1 materializeNode: columnOrders: - orderType: ASCENDING - inputRef: {} + inputRef: + columnIdx: 1 returnType: typeName: INT64 isNullable: true @@ -80,17 +80,18 @@ columns: - columnDesc: columnType: - typeName: INT64 + typeName: INT32 isNullable: true - name: _row_id + name: v1 - columnDesc: columnType: - typeName: INT32 + typeName: INT64 isNullable: true columnId: 1 - name: v1 + name: _row_id + isHidden: true pkColumnIds: - - 0 + - 1 pkOrders: - ASCENDING - sql: | @@ -103,22 +104,22 @@ - input: - mergeNode: {} - pkIndices: - - 0 + - 1 batchPlanNode: tableRefId: tableId: 1 columnDescs: - - columnType: - typeName: INT64 - isNullable: true - name: _row_id - columnType: typeName: INT32 isNullable: true columnId: 1 name: v1 + - columnType: + typeName: INT64 + isNullable: true + name: _row_id pkIndices: - - 0 + - 1 chainNode: tableRefId: tableId: 1 @@ -132,14 +133,15 @@ isNullable: true name: v1 columnIds: - - 0 - 1 + - 0 pkIndices: - - 0 + - 1 materializeNode: columnOrders: - orderType: ASCENDING - inputRef: {} + inputRef: + columnIdx: 1 returnType: typeName: INT64 isNullable: true @@ -152,17 +154,18 @@ columns: - columnDesc: columnType: - typeName: INT64 + typeName: INT32 isNullable: true - name: _row_id + name: v1 - columnDesc: columnType: - typeName: INT32 + typeName: INT64 isNullable: true columnId: 1 - name: v1 + name: _row_id + isHidden: true pkColumnIds: - - 0 + - 1 pkOrders: - ASCENDING - sql: | diff --git a/rust/frontend/test_runner/tests/testdata/subquery.yaml b/rust/frontend/test_runner/tests/testdata/subquery.yaml index f99d6b2dc3006..a64811ac57be3 100644 --- a/rust/frontend/test_runner/tests/testdata/subquery.yaml +++ b/rust/frontend/test_runner/tests/testdata/subquery.yaml @@ -2,9 +2,9 @@ create table t (v1 bigint, v2 double precision); select v1 from (select * from t) where v2 > 1; logical_plan: | - LogicalProject { exprs: [$1], expr_alias: [v1] } - LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } + LogicalProject { exprs: [$0], expr_alias: [v1] } + LogicalFilter { predicate: ($1 > 1:Int32) } + LogicalProject { exprs: [$1, $2], expr_alias: [v1, v2] } LogicalScan { table: t, columns: [_row_id, v1, v2] } - sql: | /* merge and then eliminate */ @@ -43,19 +43,19 @@ create table t (v1 bigint, v2 double precision); select * from (select * from t); logical_plan: | - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } + LogicalProject { exprs: [$0, $1], expr_alias: [v1, v2] } + LogicalProject { exprs: [$1, $2], expr_alias: [v1, v2] } LogicalScan { table: t, columns: [_row_id, v1, v2] } optimized_logical_plan: | - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalScan { table: t, columns: [v1, v2] } - sql: | /* joins */ create table t (v1 bigint, v2 double precision); select * from (select * from t), t; logical_plan: | - LogicalProject { exprs: [$0, $1, $2, $3, $4, $5], expr_alias: [_row_id, v1, v2, _row_id, v1, v2] } + LogicalProject { exprs: [$0, $1, $3, $4], expr_alias: [v1, v2, v1, v2] } LogicalJoin { type: Inner, on: true:Boolean } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } + LogicalProject { exprs: [$1, $2], expr_alias: [v1, v2] } LogicalScan { table: t, columns: [_row_id, v1, v2] } LogicalScan { table: t, columns: [_row_id, v1, v2] } - sql: | diff --git a/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml b/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml index 990865e26d3a5..b6872ec3515b4 100644 --- a/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml +++ b/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml @@ -61,7 +61,7 @@ LogicalValues { rows: [[]], schema: Schema { fields: [] } } LogicalProject { exprs: [($0 >= 1:Int32)], expr_alias: [ ] } LogicalAgg { group_keys: [], agg_calls: [count] } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id, x] } + LogicalProject { exprs: [$1], expr_alias: [x] } LogicalScan { table: t, columns: [_row_id, x] } - sql: | create table t1(x int); @@ -86,7 +86,7 @@ LogicalScan { table: t, columns: [_row_id, x] } LogicalProject { exprs: [($0 >= 1:Int32)], expr_alias: [ ] } LogicalAgg { group_keys: [], agg_calls: [count] } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id, x] } + LogicalProject { exprs: [$1], expr_alias: [x] } LogicalScan { table: t, columns: [_row_id, x] } - sql: | create table t1(x int); diff --git a/rust/stream/src/executor/barrier_align.rs b/rust/stream/src/executor/barrier_align.rs index cfd66d6c64d2e..f185c9f195cba 100644 --- a/rust/stream/src/executor/barrier_align.rs +++ b/rust/stream/src/executor/barrier_align.rs @@ -27,6 +27,7 @@ enum BarrierWaitState { Either, } +#[derive(Debug)] pub enum AlignedMessage { Left(Result), Right(Result), diff --git a/rust/stream/src/executor/managed_state/flush_status.rs b/rust/stream/src/executor/managed_state/flush_status.rs index fdd03693c4d02..5753a6e162fc3 100644 --- a/rust/stream/src/executor/managed_state/flush_status.rs +++ b/rust/stream/src/executor/managed_state/flush_status.rs @@ -32,7 +32,7 @@ macro_rules! impl_flush_status { Insert(T), } - impl $struct_name { + impl $struct_name { pub fn is_delete(&self) -> bool { matches!(self, Self::Delete) } @@ -63,7 +63,7 @@ macro_rules! impl_flush_status { } /// Insert an entry and modify the corresponding flush state - pub fn do_insert(entry: $entry_type, value: T) { + pub fn do_insert(entry: $entry_type, value: T) { match entry { <$entry_type>::Vacant(e) => { // No-op -> Insert @@ -74,14 +74,14 @@ macro_rules! impl_flush_status { // Delete -> DeleteInsert e.insert(Self::DeleteInsert(value)); } else { - panic!("invalid flush status"); + panic!("invalid flush status: double insert {:?} -> {:?}", e.key(), value); } } } } /// Delete an entry and modify the corresponding flush state - pub fn do_delete(entry: $entry_type) { + pub fn do_delete(entry: $entry_type) { match entry { <$entry_type>::Vacant(e) => { // No-op -> Delete @@ -95,7 +95,7 @@ macro_rules! impl_flush_status { // DeleteInsert -> Delete e.insert(Self::Delete); } else { - panic!("invalid flush status"); + panic!("invalid flush status: double delete {:?}", e.key()); } } } diff --git a/rust/stream/src/executor/managed_state/join/mod.rs b/rust/stream/src/executor/managed_state/join/mod.rs index 4f131556072f5..75fb01aab1193 100644 --- a/rust/stream/src/executor/managed_state/join/mod.rs +++ b/rust/stream/src/executor/managed_state/join/mod.rs @@ -27,7 +27,7 @@ use risingwave_storage::{Keyspace, StateStore}; use serde::{Deserialize, Serialize}; /// This is a row with a match degree -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct JoinRow { pub row: Row, degree: u64,