diff --git a/e2e_test/streaming/bug_fixes/issue_8084.slt b/e2e_test/streaming/bug_fixes/issue_8084.slt new file mode 100644 index 000000000000..446620cd57c4 --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_8084.slt @@ -0,0 +1,24 @@ +# https://github.com/risingwavelabs/risingwave/issues/8084 + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t (a int primary key); + +statement ok +create materialized view mv as select t1.* from t as t1 full join t as t2 on t1.a = t2.a; + +statement ok +insert into t values(null); + +# TODO: https://github.com/risingwavelabs/risingwave/issues/8084 +query I +select * from mv; +---- + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/frontend/planner_test/tests/testdata/join.yaml b/src/frontend/planner_test/tests/testdata/join.yaml index e05ea2fd9f63..ab96f62f5d3c 100644 --- a/src/frontend/planner_test/tests/testdata/join.yaml +++ b/src/frontend/planner_test/tests/testdata/join.yaml @@ -198,19 +198,20 @@ StreamMaterialize { columns: [x, i.t._row_id(hidden), i.t._row_id#1(hidden), i.x(hidden), i.t._row_id#2(hidden), i.t._row_id#3(hidden), i.x#1(hidden)], pk_columns: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_conflict: "no check" } └─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x) } └─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] } - └─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] } - ├─StreamShare { id = 5 } - | └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] } - | ├─StreamExchange { dist: HashShard(i.x) } - | | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) } - | └─StreamExchange { dist: HashShard(i.x) } - | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) } - └─StreamShare { id = 5 } - └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] } - ├─StreamExchange { dist: HashShard(i.x) } - | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) } - └─StreamExchange { dist: HashShard(i.x) } - └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) } + └─StreamFilter { predicate: (((((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) } + └─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] } + ├─StreamShare { id = 5 } + | └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] } + | ├─StreamExchange { dist: HashShard(i.x) } + | | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) } + | └─StreamExchange { dist: HashShard(i.x) } + | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) } + └─StreamShare { id = 5 } + └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] } + ├─StreamExchange { dist: HashShard(i.x) } + | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) } + └─StreamExchange { dist: HashShard(i.x) } + └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) } - name: Use lookup join sql: | create table t1 (v1 int, v2 int); @@ -505,11 +506,12 @@ └─StreamExchange { dist: HashShard(a._row_id, b._row_id, a.x, b.x) } └─StreamProject { exprs: [(2:Int32 * Coalesce(a.x, b.x)) as $expr1, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr2, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr3, a._row_id, b._row_id, a.x, b.x] } └─StreamFilter { predicate: ((2:Int32 * Coalesce(a.x, b.x)) < 10:Int32) } - └─StreamHashJoin { type: FullOuter, predicate: a.x = b.x, output: [a.x, b.x, a._row_id, b._row_id] } - ├─StreamExchange { dist: HashShard(a.x) } - | └─StreamTableScan { table: a, columns: [a.x, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) } - └─StreamExchange { dist: HashShard(b.x) } - └─StreamTableScan { table: b, columns: [b.x, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) } + └─StreamFilter { predicate: (IsNotNull(a._row_id) OR IsNotNull(b._row_id)) } + └─StreamHashJoin { type: FullOuter, predicate: a.x = b.x, output: [a.x, b.x, a._row_id, b._row_id] } + ├─StreamExchange { dist: HashShard(a.x) } + | └─StreamTableScan { table: a, columns: [a.x, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) } + └─StreamExchange { dist: HashShard(b.x) } + └─StreamTableScan { table: b, columns: [b.x, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) } - sql: | CREATE TABLE test (a INTEGER, b INTEGER); CREATE TABLE test2 (a INTEGER, c INTEGER); diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs index e6060dff3431..2d38c6d37a5a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs @@ -18,13 +18,14 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::error::Result; +use risingwave_common::types::DataType; use super::generic::{self, GenericPlanNode}; use super::{ ColPrunable, CollectInputRef, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; -use crate::expr::{assert_input_ref, ExprImpl, ExprRewriter}; +use crate::expr::{assert_input_ref, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ BatchFilter, ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamFilter, ToStreamContext, @@ -79,7 +80,30 @@ impl LogicalFilter { } } - /// the function will check if the predicate is bool expression + /// Create a `LogicalFilter` to filter the rows with all keys are null. + pub fn filter_if_keys_all_null(input: PlanRef, key: &[usize]) -> PlanRef { + let schema = input.schema(); + let cond = key.iter().fold(ExprImpl::literal_bool(false), |expr, i| { + ExprImpl::FunctionCall( + FunctionCall::new_unchecked( + ExprType::Or, + vec![ + expr, + FunctionCall::new_unchecked( + ExprType::IsNotNull, + vec![InputRef::new(*i, schema.fields()[*i].data_type.clone()).into()], + DataType::Boolean, + ) + .into(), + ], + DataType::Boolean, + ) + .into(), + ) + }); + LogicalFilter::create_with_expr(input, cond) + } + pub fn create_with_expr(input: PlanRef, predicate: ExprImpl) -> PlanRef { let predicate = Condition::with_expr(predicate); Self::new(input, predicate).into() diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index ee1c7a255f5b..bf7fc41b4733 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1460,8 +1460,37 @@ impl ToStream for LogicalJoin { } let join_with_pk = join.clone_with_output_indices(new_output_indices); + + let plan = if join_with_pk.join_type() == JoinType::FullOuter { + // ignore the all NULL to maintain the stream key's uniqueness, see https://github.com/risingwavelabs/risingwave/issues/8084 for more information + + let l2o = join_with_pk + .l2i_col_mapping() + .composite(&join_with_pk.i2o_col_mapping()); + let r2o = join_with_pk + .r2i_col_mapping() + .composite(&join_with_pk.i2o_col_mapping()); + let left_right_stream_keys = join_with_pk + .left() + .logical_pk() + .iter() + .map(|i| l2o.map(*i)) + .chain( + join_with_pk + .right() + .logical_pk() + .iter() + .map(|i| r2o.map(*i)), + ) + .collect_vec(); + let plan: PlanRef = join_with_pk.into(); + LogicalFilter::filter_if_keys_all_null(plan, &left_right_stream_keys) + } else { + join_with_pk.into() + }; + // the added columns is at the end, so it will not change the exists column index - Ok((join_with_pk.into(), out_col_change)) + Ok((plan, out_col_change)) } }