Skip to content

Commit

Permalink
fix(watermark): fix watermark derivation in stream dynamic filter in … (
Browse files Browse the repository at this point in the history
  • Loading branch information
soundOfDestiny authored Mar 23, 2023
1 parent 1b008f4 commit ea72ed7
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl StreamDynamicFilter {
let mut watermark_columns = FixedBitSet::with_capacity(left.schema().len());
if right.watermark_columns()[0] {
match comparator {
ExprType::GreaterThan | ExprType::GreaterThanOrEqual => {
ExprType::Equal | ExprType::GreaterThan | ExprType::GreaterThanOrEqual => {
watermark_columns.set(left_index, true)
}
_ => {}
Expand Down
14 changes: 11 additions & 3 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use risingwave_storage::StateStore;
use tracing::trace;

use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy};
use crate::cache::cache_may_stale;
use crate::executor::{StreamExecutorError, StreamExecutorResult};

/// This num is arbitrary and we may want to improve this choice in the future.
Expand Down Expand Up @@ -577,7 +578,7 @@ where

/// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
#[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> (Arc<Bitmap>, bool) {
assert!(
!self.is_dirty(),
"vnode bitmap should only be updated when state table is clean"
Expand All @@ -590,9 +591,16 @@ where
}
assert_eq!(self.vnodes.len(), new_vnodes.len());

self.cur_watermark = None;
let cache_may_stale = cache_may_stale(&self.vnodes, &new_vnodes);

std::mem::replace(&mut self.vnodes, new_vnodes)
if cache_may_stale {
self.cur_watermark = None;
}

(
std::mem::replace(&mut self.vnodes, new_vnodes),
cache_may_stale,
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ impl<S: StateStore> DynamicFilterExecutor<S> {

// Update the vnode bitmap for the left state table if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
let _previous_vnode_bitmap =
let (_previous_vnode_bitmap, _cache_may_stale) =
self.left_table.update_vnode_bitmap(vnode_bitmap);
}

Expand Down
6 changes: 5 additions & 1 deletion src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,11 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,

// Update the vnode bitmap for state tables of both sides if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
self.side_l.ht.update_vnode_bitmap(vnode_bitmap.clone());
if self.side_l.ht.update_vnode_bitmap(vnode_bitmap.clone()) {
self.watermark_buffers
.values_mut()
.for_each(|buffers| buffers.clear());
}
self.side_r.ht.update_vnode_bitmap(vnode_bitmap);
}

Expand Down
18 changes: 9 additions & 9 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use crate::cache::{cache_may_stale, new_with_hasher_in, ExecutorCache};
use crate::cache::{new_with_hasher_in, ExecutorCache};
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorResult;
use crate::executor::monitor::StreamingMetrics;
Expand Down Expand Up @@ -311,16 +311,16 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
}

/// Update the vnode bitmap and manipulate the cache if necessary.
pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let previous_vnode_bitmap = self.state.table.update_vnode_bitmap(vnode_bitmap.clone());
let _ = self
.degree_state
.table
.update_vnode_bitmap(vnode_bitmap.clone());

if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) -> bool {
let (_previous_vnode_bitmap, cache_may_stale) =
self.state.table.update_vnode_bitmap(vnode_bitmap.clone());
let _ = self.degree_state.table.update_vnode_bitmap(vnode_bitmap);

if cache_may_stale {
self.inner.clear();
}

cache_may_stale
}

pub fn update_watermark(&mut self, watermark: ScalarImpl) {
Expand Down
15 changes: 15 additions & 0 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,21 @@ impl MergeExecutor {
);
barrier.passed_actors.push(actor_id);

if let Some(Mutation::Update { dispatchers, .. }) = barrier.mutation.as_deref()
{
if select_all
.upstream_actor_ids()
.iter()
.any(|actor_id| dispatchers.contains_key(actor_id))
{
// `Watermark` of upstream may become stale after downstream scaling.
select_all
.buffered_watermarks
.values_mut()
.for_each(|buffers| buffers.clear());
}
}

if let Some(update) =
barrier.as_update_merge(self.actor_context.id, self.upstream_fragment_id)
{
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl<S: StateStore> SortExecutor<S> {

// Update the vnode bitmap for the state table if asked. Also update the buffer.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.context.id) {
let prev_vnode_bitmap =
let (prev_vnode_bitmap, _cache_may_stale) =
self.state_table.update_vnode_bitmap(vnode_bitmap.clone());
self.fill_buffer(Some(&prev_vnode_bitmap), &vnode_bitmap)
.await?;
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_storage::StateStore;
use super::top_n_cache::TopNCacheTrait;
use super::utils::*;
use super::TopNCache;
use crate::cache::{cache_may_stale, new_unbounded, ExecutorCache};
use crate::cache::{new_unbounded, ExecutorCache};
use crate::common::table::state_table::StateTable;
use crate::error::StreamResult;
use crate::executor::error::StreamExecutorResult;
Expand Down Expand Up @@ -222,12 +222,12 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let previous_vnode_bitmap = self
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap.clone());
.update_vnode_bitmap(vnode_bitmap);

if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
if cache_may_stale {
self.caches.clear();
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use super::group_top_n::GroupTopNCache;
use super::top_n_cache::AppendOnlyTopNCacheTrait;
use super::utils::*;
use super::TopNCache;
use crate::cache::cache_may_stale;
use crate::common::table::state_table::StateTable;
use crate::error::StreamResult;
use crate::executor::error::StreamExecutorResult;
Expand Down Expand Up @@ -207,12 +206,12 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let previous_vnode_bitmap = self
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap.clone());
.update_vnode_bitmap(vnode_bitmap);

if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
if cache_may_stale {
self.caches.clear();
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
Message::Barrier(barrier) => {
// Update the vnode bitmap for state tables of all agg calls if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) {
let previous_vnode_bitmap = table.update_vnode_bitmap(vnode_bitmap.clone());
let (previous_vnode_bitmap, _cache_may_stale) =
table.update_vnode_bitmap(vnode_bitmap.clone());

// Take the global max watermark when scaling happens.
if previous_vnode_bitmap != vnode_bitmap {
Expand Down

0 comments on commit ea72ed7

Please sign in to comment.