Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): fix inner interval join #9071

Merged
merged 2 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/frontend/planner_test/tests/testdata/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.ts) }
└─StreamTableScan { table: t2, columns: [t2.ts, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: band hash join
- name: interval join(left outer join)
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
Expand All @@ -95,6 +95,25 @@
└─StreamExchange { dist: HashShard(t2.v1) }
└─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, (AtTimeZone((AtTimeZone(t2.ts, 'UTC':Varchar) + '00:00:00':Interval), 'UTC':Varchar) + '00:00:01':Interval) as $expr2, t2._row_id], output_watermarks: [t2.ts, $expr2] }
└─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: interval join (inner join)
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.ts as t2_ts, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1 join t2 on (t1.v1 = t2.v1 and (t1.ts >= t2.ts + INTERVAL '1' SECOND) and (t2.ts >= t1.ts + INTERVAL '1' SECOND));
logical_plan: |
LogicalProject { exprs: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v1) AND (t1.ts >= (t2.ts + '00:00:01':Interval)) AND (t2.ts >= (t1.ts + '00:00:01':Interval)), output: all }
├─LogicalScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id] }
└─LogicalScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id] }
stream_plan: |
StreamMaterialize { columns: [t1_ts, t1_v1, t1_v2, t2_ts, t2_v1, t2_v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, t1_v1], pk_columns: [t1._row_id, t2._row_id, t1_v1], pk_conflict: "NoCheck", watermark_columns: [t1_ts, t2_ts] }
└─StreamIntervalJoin { type: Inner, predicate: t1.v1 = t2.v1 AND (t1.ts >= $expr2) AND ($expr1 <= t2.ts), conditions_to_clean_left_state_table: (t1.ts >= $expr2), conditions_to_clean_right_state_table: ($expr1 <= t2.ts), output_watermarks: [t1.ts, t2.ts], output: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
| └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, (AtTimeZone((AtTimeZone(t1.ts, 'UTC':Varchar) + '00:00:00':Interval), 'UTC':Varchar) + '00:00:01':Interval) as $expr1, t1._row_id], output_watermarks: [t1.ts, $expr1] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.v1) }
└─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, (AtTimeZone((AtTimeZone(t2.ts, 'UTC':Varchar) + '00:00:00':Interval), 'UTC':Varchar) + '00:00:01':Interval) as $expr2, t2._row_id], output_watermarks: [t2.ts, $expr2] }
└─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: union all
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ macro_rules! impl_has_variant {
impl_has_variant! {InputRef, Literal, FunctionCall, AggCall, Subquery, TableFunction, WindowFunction}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct InequalityInputPair {
pub struct InequalityInputPair {
/// Input index of greater side of inequality.
pub(crate) key_required_larger: usize,
/// Input index of less side of inequality.
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,11 @@ impl LogicalJoin {
// For inner joins, pull non-equal conditions to a filter operator on top of it
// We do so as the filter operator can apply the non-equal condition batch-wise (vectorized)
// as opposed to the HashJoin, which applies the condition row-wise.
let pull_filter = self.join_type() == JoinType::Inner && predicate.has_non_eq();

let stream_hash_join = StreamHashJoin::new(logical_join.core.clone(), predicate.clone());
let pull_filter = self.join_type() == JoinType::Inner
&& stream_hash_join.eq_join_predicate().has_non_eq()
&& stream_hash_join.inequality_pairs().is_empty();
if pull_filter {
let default_indices = (0..self.internal_column_num()).collect::<Vec<_>>();

Expand Down Expand Up @@ -952,7 +956,7 @@ impl LogicalJoin {
Ok(plan)
}
} else {
Ok(StreamHashJoin::new(logical_join.core, predicate).into())
Ok(stream_hash_join.into())
}
}

Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,20 @@ impl StreamHashJoin {
assert_eq!(dk_indices_in_jk.len(), left_dk_indices.len());
dk_indices_in_jk
}

pub fn inequality_pairs(&self) -> &Vec<(bool, InequalityInputPair)> {
&self.inequality_pairs
}
}

impl fmt::Display for StreamHashJoin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut builder = if self.is_append_only {
f.debug_struct("StreamAppendOnlyHashJoin")
} else if self.clean_left_state_conjunction_idx.is_some()
let mut builder = if self.clean_left_state_conjunction_idx.is_some()
&& self.clean_right_state_conjunction_idx.is_some()
{
f.debug_struct("StreamIntervalJoin")
} else if self.is_append_only {
f.debug_struct("StreamAppendOnlyHashJoin")
} else {
f.debug_struct("StreamHashJoin")
};
Expand Down