Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(streaming): ignore null stream key from full outer join to workaround #8520

Merged
merged 2 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions src/frontend/planner_test/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,19 +198,20 @@
StreamMaterialize { columns: [x, i.t._row_id(hidden), i.t._row_id#1(hidden), i.x(hidden), i.t._row_id#2(hidden), i.t._row_id#3(hidden), i.x#1(hidden)], pk_columns: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x) }
└─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] }
├─StreamShare { id = 5 }
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamShare { id = 5 }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamFilter { predicate: (((((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] }
├─StreamShare { id = 5 }
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamShare { id = 5 }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
- name: Use lookup join
sql: |
create table t1 (v1 int, v2 int);
Expand Down Expand Up @@ -505,11 +506,12 @@
└─StreamExchange { dist: HashShard(a._row_id, b._row_id, a.x, b.x) }
└─StreamProject { exprs: [(2:Int32 * Coalesce(a.x, b.x)) as $expr1, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr2, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr3, a._row_id, b._row_id, a.x, b.x] }
└─StreamFilter { predicate: ((2:Int32 * Coalesce(a.x, b.x)) < 10:Int32) }
└─StreamHashJoin { type: FullOuter, predicate: a.x = b.x, output: [a.x, b.x, a._row_id, b._row_id] }
├─StreamExchange { dist: HashShard(a.x) }
| └─StreamTableScan { table: a, columns: [a.x, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamExchange { dist: HashShard(b.x) }
└─StreamTableScan { table: b, columns: [b.x, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
└─StreamFilter { predicate: (IsNotNull(a._row_id) OR IsNotNull(b._row_id)) }
└─StreamHashJoin { type: FullOuter, predicate: a.x = b.x, output: [a.x, b.x, a._row_id, b._row_id] }
├─StreamExchange { dist: HashShard(a.x) }
| └─StreamTableScan { table: a, columns: [a.x, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamExchange { dist: HashShard(b.x) }
└─StreamTableScan { table: b, columns: [b.x, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
- sql: |
CREATE TABLE test (a INTEGER, b INTEGER);
CREATE TABLE test2 (a INTEGER, c INTEGER);
Expand Down
28 changes: 26 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;

use super::generic::{self, GenericPlanNode};
use super::{
ColPrunable, CollectInputRef, ExprRewritable, LogicalProject, PlanBase, PlanRef,
PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream,
};
use crate::expr::{assert_input_ref, ExprImpl, ExprRewriter};
use crate::expr::{assert_input_ref, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::{
BatchFilter, ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext,
StreamFilter, ToStreamContext,
Expand Down Expand Up @@ -79,7 +80,30 @@ impl LogicalFilter {
}
}

/// the function will check if the predicate is bool expression
/// Create a `LogicalFilter` to filter the rows with all keys are null.
pub fn filter_if_keys_all_null(input: PlanRef, key: &[usize]) -> PlanRef {
let schema = input.schema();
let cond = key.iter().fold(ExprImpl::literal_bool(false), |expr, i| {
ExprImpl::FunctionCall(
FunctionCall::new_unchecked(
ExprType::Or,
vec![
expr,
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![InputRef::new(*i, schema.fields()[*i].data_type.clone()).into()],
DataType::Boolean,
)
.into(),
],
DataType::Boolean,
)
.into(),
)
});
LogicalFilter::create_with_expr(input, cond)
}

pub fn create_with_expr(input: PlanRef, predicate: ExprImpl) -> PlanRef {
let predicate = Condition::with_expr(predicate);
Self::new(input, predicate).into()
Expand Down
31 changes: 30 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1460,8 +1460,37 @@ impl ToStream for LogicalJoin {
}

let join_with_pk = join.clone_with_output_indices(new_output_indices);

let plan = if join_with_pk.join_type() == JoinType::FullOuter {
// ignore the all NULL to maintain the stream key's uniqueness, see https://github.com/risingwavelabs/risingwave/issues/8084 for more information

let l2o = join_with_pk
.l2i_col_mapping()
.composite(&join_with_pk.i2o_col_mapping());
let r2o = join_with_pk
.r2i_col_mapping()
.composite(&join_with_pk.i2o_col_mapping());
let left_right_stream_keys = join_with_pk
.left()
.logical_pk()
.iter()
.map(|i| l2o.map(*i))
.chain(
join_with_pk
.right()
.logical_pk()
.iter()
.map(|i| r2o.map(*i)),
)
.collect_vec();
let plan: PlanRef = join_with_pk.into();
LogicalFilter::filter_if_keys_all_null(plan, &left_right_stream_keys)
} else {
join_with_pk.into()
};

// the added columns is at the end, so it will not change the exists column index
Ok((join_with_pk.into(), out_col_change))
Ok((plan, out_col_change))
}
}

Expand Down