Skip to content

Commit

Permalink
feat(stream): temporal join support pk prefix join condition (#10627)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored and kwannoel committed Jul 14, 2023
1 parent a83e1c2 commit 448a566
Show file tree
Hide file tree
Showing 9 changed files with 490 additions and 112 deletions.
File renamed without changes.
84 changes: 84 additions & 0 deletions e2e_test/streaming/temporal_join/temporal_join_with_index.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create index idx on version (a2);

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join idx FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2;

statement ok
insert into stream values(1, 11, 111);

statement ok
insert into version values(1, 11, 111);

statement ok
insert into version values(9, 11, 111);

statement ok
insert into stream values(1, 11, 111);

statement ok
delete from version;

query IIII rowsort
select * from v;
----
1 11 1 11
1 11 9 11
1 11 NULL NULL

statement ok
insert into version values(2, 22, 222);

statement ok
insert into stream values(2, 22, 222);

statement ok
insert into version values(8, 22, 222);

statement ok
insert into stream values(2, 22, 222);

query IIII rowsort
select * from v;
----
1 11 1 11
1 11 9 11
1 11 NULL NULL
2 22 2 22
2 22 2 22
2 22 8 22

statement ok
update version set b2 = 333 where id2 = 2;

statement ok
insert into stream values(2, 22, 222);

query IIII rowsort
select * from v;
----
1 11 1 11
1 11 9 11
1 11 NULL NULL
2 22 2 22
2 22 2 22
2 22 8 22
2 22 8 22

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
32 changes: 32 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/temporal_join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,35 @@
join version2 FOR SYSTEM_TIME AS OF PROCTIME() on stream.id2 = version2.id2 where a1 < 10;
expected_outputs:
- stream_plan
- name: temporal join with an index (distribution key size = 1)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (a2, b2) distributed by (a2);
select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
expected_outputs:
- stream_plan
- name: temporal join with an index (distribution key size = 2)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (a2, b2);
select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
expected_outputs:
- stream_plan
- name: temporal join with an index (index column size = 1)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (b2);
select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
expected_outputs:
- stream_plan
- name: temporal join with singleton table
sql: |
create table t (a int) append only;
create materialized view v as select count(*) from t;
select * from t left join v FOR SYSTEM_TIME AS OF PROCTIME() on a = count;
expected_outputs:
- stream_plan

52 changes: 52 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/temporal_join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,55 @@
│ └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version2.id2) }
└─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) }
- name: temporal join with an index (distribution key size = 1)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (a2, b2) distributed by (a2);
select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck }
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] }
├─StreamExchange { dist: HashShard(stream.a1) }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) }
└─StreamTableScan { table: idx2, columns: [idx2.a2, idx2.b2, idx2.id2], pk: [idx2.id2], dist: UpstreamHashShard(idx2.a2) }
- name: temporal join with an index (distribution key size = 2)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (a2, b2);
select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck }
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] }
├─StreamExchange { dist: HashShard(stream.a1, stream.b1) }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2, idx2.b2) }
└─StreamTableScan { table: idx2, columns: [idx2.a2, idx2.b2, idx2.id2], pk: [idx2.id2], dist: UpstreamHashShard(idx2.a2, idx2.b2) }
- name: temporal join with an index (index column size = 1)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (b2);
select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, stream.b1, a1], pk_columns: [stream._row_id, id2, stream.b1, a1], pk_conflict: NoCheck }
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.b1 = idx2.b2 AND (stream.a1 = idx2.a2), output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] }
├─StreamExchange { dist: HashShard(stream.b1) }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.b2) }
└─StreamTableScan { table: idx2, columns: [idx2.b2, idx2.id2, idx2.a2], pk: [idx2.id2], dist: UpstreamHashShard(idx2.b2) }
- name: temporal join with singleton table
sql: |
create table t (a int) append only;
create materialized view v as select count(*) from t;
select * from t left join v FOR SYSTEM_TIME AS OF PROCTIME() on a = count;
stream_plan: |-
StreamMaterialize { columns: [a, count, t._row_id(hidden), $expr1(hidden)], stream_key: [t._row_id, $expr1], pk_columns: [t._row_id, $expr1], pk_conflict: NoCheck }
└─StreamTemporalJoin { type: LeftOuter, predicate: AND ($expr1 = v.count), output: [t.a, v.count, t._row_id, $expr1] }
├─StreamExchange { dist: Single }
│ └─StreamProject { exprs: [t.a, t.a::Int64 as $expr1, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange [no_shuffle] { dist: Single }
└─StreamTableScan { table: v, columns: [v.count], pk: [], dist: Single }
38 changes: 38 additions & 0 deletions src/frontend/src/optimizer/plan_node/eq_join_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::Schema;

use crate::expr::{
Expand Down Expand Up @@ -270,6 +271,43 @@ impl EqJoinPredicate {
)
}

/// Retain the prefix of `eq_keys` based on the `prefix_len`. The other part is moved to the
/// other condition.
pub fn retain_prefix_eq_key(self, prefix_len: usize) -> Self {
assert!(prefix_len <= self.eq_keys.len());
let (retain_eq_key, other_eq_key) = self.eq_keys.split_at(prefix_len);
let mut new_other_conjunctions = self.other_cond.conjunctions;
new_other_conjunctions.extend(
other_eq_key
.iter()
.cloned()
.map(|(l, r, null_safe)| {
FunctionCall::new(
if null_safe {
ExprType::IsNotDistinctFrom
} else {
ExprType::Equal
},
vec![l.into(), r.into()],
)
.unwrap()
.into()
})
.collect_vec(),
);

let new_other_cond = Condition {
conjunctions: new_other_conjunctions,
};

Self::new(
new_other_cond,
retain_eq_key.to_owned(),
self.left_cols_num,
self.right_cols_num,
)
}

pub fn rewrite_exprs(&self, rewriter: &mut (impl ExprRewriter + ?Sized)) -> Self {
let mut new = self.clone();
new.other_cond = new.other_cond.rewrite_expr(rewriter);
Expand Down
80 changes: 58 additions & 22 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,19 +366,19 @@ impl LogicalJoin {
.expect("dist_key must in order_key");
dist_key_in_order_key_pos.push(pos);
}
// The at least prefix of order key that contains distribution key.
let at_least_prefix_len = dist_key_in_order_key_pos
// The shortest prefix of order key that contains distribution key.
let shortest_prefix_len = dist_key_in_order_key_pos
.iter()
.max()
.map_or(0, |pos| pos + 1);

// Distributed lookup join can't support lookup table with a singleton distribution.
if at_least_prefix_len == 0 {
if shortest_prefix_len == 0 {
return None;
}

// Reorder the join equal predicate to match the order key.
let mut reorder_idx = Vec::with_capacity(at_least_prefix_len);
let mut reorder_idx = Vec::with_capacity(shortest_prefix_len);
for order_col_id in order_col_ids {
let mut found = false;
for (i, eq_idx) in predicate.right_eq_indexes().into_iter().enumerate() {
Expand All @@ -392,7 +392,7 @@ impl LogicalJoin {
break;
}
}
if reorder_idx.len() < at_least_prefix_len {
if reorder_idx.len() < shortest_prefix_len {
return None;
}
let lookup_prefix_len = reorder_idx.len();
Expand Down Expand Up @@ -966,18 +966,6 @@ impl LogicalJoin {
) -> Result<PlanRef> {
assert!(predicate.has_eq());

let left = self.left().to_stream_with_dist_required(
&RequiredDist::shard_by_key(self.left().schema().len(), &predicate.left_eq_indexes()),
ctx,
)?;

if !left.append_only() {
return Err(RwError::from(ErrorCode::NotSupported(
"Temporal join requires an append-only left input".into(),
"Please ensure your left input is append-only".into(),
)));
}

let right = self.right();
let Some(logical_scan) = right.as_logical_scan() else {
return Err(RwError::from(ErrorCode::NotSupported(
Expand All @@ -994,30 +982,76 @@ impl LogicalJoin {
}

let table_desc = logical_scan.table_desc();
let output_column_ids = logical_scan.output_column_ids();

// Verify that right join key columns are the primary key of the lookup table.
// Verify that the right join key columns are the the prefix of the primary key and
// also contain the distribution key.
let order_col_ids = table_desc.order_column_ids();
let order_col_ids_len = order_col_ids.len();
let output_column_ids = logical_scan.output_column_ids();
let order_key = table_desc.order_column_indices();
let dist_key = table_desc.distribution_key.clone();

let mut dist_key_in_order_key_pos = vec![];
for d in dist_key {
let pos = order_key
.iter()
.position(|&x| x == d)
.expect("dist_key must in order_key");
dist_key_in_order_key_pos.push(pos);
}
// The shortest prefix of order key that contains distribution key.
let shortest_prefix_len = dist_key_in_order_key_pos
.iter()
.max()
.map_or(0, |pos| pos + 1);

// Reorder the join equal predicate to match the order key.
let mut reorder_idx = vec![];
let mut reorder_idx = Vec::with_capacity(shortest_prefix_len);
for order_col_id in order_col_ids {
let mut found = false;
for (i, eq_idx) in predicate.right_eq_indexes().into_iter().enumerate() {
if order_col_id == output_column_ids[eq_idx] {
reorder_idx.push(i);
found = true;
break;
}
}
if !found {
break;
}
}
if order_col_ids_len != predicate.eq_keys().len() || reorder_idx.len() < order_col_ids_len {
if reorder_idx.len() < shortest_prefix_len {
// TODO: support index selection for temporal join and refine this error message.
return Err(RwError::from(ErrorCode::NotSupported(
"Temporal join requires the lookup table's primary key contained exactly in the equivalence condition".into(),
"Please add the primary key of the lookup table to the join condition and remove any other conditions".into(),
)));
}
let lookup_prefix_len = reorder_idx.len();
let predicate = predicate.reorder(&reorder_idx);

let left = if dist_key_in_order_key_pos.is_empty() {
self.left()
.to_stream_with_dist_required(&RequiredDist::single(), ctx)?
} else {
let left_eq_indexes = predicate.left_eq_indexes();
let left_dist_key = dist_key_in_order_key_pos
.iter()
.map(|pos| left_eq_indexes[*pos])
.collect_vec();

self.left().to_stream_with_dist_required(
&RequiredDist::shard_by_key(self.left().schema().len(), &left_dist_key),
ctx,
)?
};

if !left.append_only() {
return Err(RwError::from(ErrorCode::NotSupported(
"Temporal join requires an append-only left input".into(),
"Please ensure your left input is append-only".into(),
)));
}

// Extract the predicate from logical scan. Only pure scan is supported.
let (new_scan, scan_predicate, project_expr) = logical_scan.predicate_pull_up();
// Construct output column to require column mapping
Expand Down Expand Up @@ -1090,6 +1124,8 @@ impl LogicalJoin {
new_join_output_indices,
);

let new_predicate = new_predicate.retain_prefix_eq_key(lookup_prefix_len);

Ok(StreamTemporalJoin::new(new_logical_join, new_predicate).into())
}

Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use risingwave_pb::stream_plan::TemporalJoinNode;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::plan_tree_node::PlanTreeNodeUnary;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::IndicesDisplay;
Expand All @@ -42,7 +41,6 @@ impl StreamTemporalJoin {
pub fn new(logical: generic::Join<PlanRef>, eq_join_predicate: EqJoinPredicate) -> Self {
assert!(logical.join_type == JoinType::Inner || logical.join_type == JoinType::LeftOuter);
assert!(logical.left.append_only());
assert!(logical.right.logical_pk() == eq_join_predicate.right_eq_indexes());
let right = logical.right.clone();
let exchange: &StreamExchange = right
.as_stream_exchange()
Expand Down
Loading

0 comments on commit 448a566

Please sign in to comment.