Skip to content

Commit

Permalink
fix(streaming): ignore null stream key from full outer join to workar…
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Mar 14, 2023
1 parent 53da2e3 commit 97b021d
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 21 deletions.
24 changes: 24 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_8084.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# https://github.com/risingwavelabs/risingwave/issues/8084

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (a int primary key);

statement ok
create materialized view mv as select t1.* from t as t1 full join t as t2 on t1.a = t2.a;

statement ok
insert into t values(null);

# TODO: https://github.com/risingwavelabs/risingwave/issues/8084
query I
select * from mv;
----

statement ok
drop materialized view mv;

statement ok
drop table t;
38 changes: 20 additions & 18 deletions src/frontend/planner_test/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,19 +198,20 @@
StreamMaterialize { columns: [x, i.t._row_id(hidden), i.t._row_id#1(hidden), i.x(hidden), i.t._row_id#2(hidden), i.t._row_id#3(hidden), i.x#1(hidden)], pk_columns: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x) }
└─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] }
├─StreamShare { id = 5 }
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamShare { id = 5 }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamFilter { predicate: (((((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] }
├─StreamShare { id = 5 }
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamShare { id = 5 }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
- name: Use lookup join
sql: |
create table t1 (v1 int, v2 int);
Expand Down Expand Up @@ -505,11 +506,12 @@
└─StreamExchange { dist: HashShard(a._row_id, b._row_id, a.x, b.x) }
└─StreamProject { exprs: [(2:Int32 * Coalesce(a.x, b.x)) as $expr1, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr2, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr3, a._row_id, b._row_id, a.x, b.x] }
└─StreamFilter { predicate: ((2:Int32 * Coalesce(a.x, b.x)) < 10:Int32) }
└─StreamHashJoin { type: FullOuter, predicate: a.x = b.x, output: [a.x, b.x, a._row_id, b._row_id] }
├─StreamExchange { dist: HashShard(a.x) }
| └─StreamTableScan { table: a, columns: [a.x, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamExchange { dist: HashShard(b.x) }
└─StreamTableScan { table: b, columns: [b.x, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
└─StreamFilter { predicate: (IsNotNull(a._row_id) OR IsNotNull(b._row_id)) }
└─StreamHashJoin { type: FullOuter, predicate: a.x = b.x, output: [a.x, b.x, a._row_id, b._row_id] }
├─StreamExchange { dist: HashShard(a.x) }
| └─StreamTableScan { table: a, columns: [a.x, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamExchange { dist: HashShard(b.x) }
└─StreamTableScan { table: b, columns: [b.x, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
- sql: |
CREATE TABLE test (a INTEGER, b INTEGER);
CREATE TABLE test2 (a INTEGER, c INTEGER);
Expand Down
28 changes: 26 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;

use super::generic::{self, GenericPlanNode};
use super::{
ColPrunable, CollectInputRef, ExprRewritable, LogicalProject, PlanBase, PlanRef,
PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream,
};
use crate::expr::{assert_input_ref, ExprImpl, ExprRewriter};
use crate::expr::{assert_input_ref, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::{
BatchFilter, ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext,
StreamFilter, ToStreamContext,
Expand Down Expand Up @@ -79,7 +80,30 @@ impl LogicalFilter {
}
}

/// the function will check if the predicate is bool expression
/// Create a `LogicalFilter` to filter the rows with all keys are null.
pub fn filter_if_keys_all_null(input: PlanRef, key: &[usize]) -> PlanRef {
let schema = input.schema();
let cond = key.iter().fold(ExprImpl::literal_bool(false), |expr, i| {
ExprImpl::FunctionCall(
FunctionCall::new_unchecked(
ExprType::Or,
vec![
expr,
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![InputRef::new(*i, schema.fields()[*i].data_type.clone()).into()],
DataType::Boolean,
)
.into(),
],
DataType::Boolean,
)
.into(),
)
});
LogicalFilter::create_with_expr(input, cond)
}

pub fn create_with_expr(input: PlanRef, predicate: ExprImpl) -> PlanRef {
let predicate = Condition::with_expr(predicate);
Self::new(input, predicate).into()
Expand Down
31 changes: 30 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1480,8 +1480,37 @@ impl ToStream for LogicalJoin {
}

let join_with_pk = join.clone_with_output_indices(new_output_indices);

let plan = if join_with_pk.join_type() == JoinType::FullOuter {
// ignore the all NULL to maintain the stream key's uniqueness, see https://github.com/risingwavelabs/risingwave/issues/8084 for more information

let l2o = join_with_pk
.l2i_col_mapping()
.composite(&join_with_pk.i2o_col_mapping());
let r2o = join_with_pk
.r2i_col_mapping()
.composite(&join_with_pk.i2o_col_mapping());
let left_right_stream_keys = join_with_pk
.left()
.logical_pk()
.iter()
.map(|i| l2o.map(*i))
.chain(
join_with_pk
.right()
.logical_pk()
.iter()
.map(|i| r2o.map(*i)),
)
.collect_vec();
let plan: PlanRef = join_with_pk.into();
LogicalFilter::filter_if_keys_all_null(plan, &left_right_stream_keys)
} else {
join_with_pk.into()
};

// the added columns is at the end, so it will not change the exists column index
Ok((join_with_pk.into(), out_col_change))
Ok((plan, out_col_change))
}
}

Expand Down

0 comments on commit 97b021d

Please sign in to comment.