Skip to content

Commit

Permalink
fix(hash join): avoid emitting chunks that violate UpdateDelete ass… (
Browse files Browse the repository at this point in the history
  • Loading branch information
soundOfDestiny authored Mar 16, 2023
1 parent dc76ad7 commit 88550e4
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,19 +922,19 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
}
if degree == 0 {
if let Some(chunk) =
hashjoin_chunk_builder.forward_if_not_matched(op, row)
hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row)
{
yield chunk;
}
} else if let Some(chunk) =
hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Insert, row)
{
yield chunk;
}
// Insert back the state taken from ht.
side_match.ht.update_state(key, matched_rows);
} else if let Some(chunk) =
hashjoin_chunk_builder.forward_if_not_matched(op, row)
hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row)
{
yield chunk;
}
Expand Down Expand Up @@ -990,19 +990,19 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
}
if degree == 0 {
if let Some(chunk) =
hashjoin_chunk_builder.forward_if_not_matched(op, row)
hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row)
{
yield chunk;
}
} else if let Some(chunk) =
hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Delete, row)
{
yield chunk;
}
// Insert back the state taken from ht.
side_match.ht.update_state(key, matched_rows);
} else if let Some(chunk) =
hashjoin_chunk_builder.forward_if_not_matched(op, row)
hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row)
{
yield chunk;
}
Expand Down

0 comments on commit 88550e4

Please sign in to comment.