diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index f947604b3d1d9..894a9190b9a70 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -44,7 +44,7 @@ impl StreamDynamicFilter { let mut watermark_columns = FixedBitSet::with_capacity(left.schema().len()); if right.watermark_columns()[0] { match comparator { - ExprType::GreaterThan | ExprType::GreaterThanOrEqual => { + ExprType::Equal | ExprType::GreaterThan | ExprType::GreaterThanOrEqual => { watermark_columns.set(left_index, true) } _ => {} diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 90f1c21fd761e..68bf0ee28d718 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -47,6 +47,7 @@ use risingwave_storage::StateStore; use tracing::trace; use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; +use crate::cache::cache_may_stale; use crate::executor::{StreamExecutorError, StreamExecutorResult}; /// This num is arbitrary and we may want to improve this choice in the future. @@ -577,7 +578,7 @@ where /// Update the vnode bitmap of the state table, returns the previous vnode bitmap. #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"] - pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { + pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> (Arc, bool) { assert!( !self.is_dirty(), "vnode bitmap should only be updated when state table is clean" @@ -590,9 +591,16 @@ where } assert_eq!(self.vnodes.len(), new_vnodes.len()); - self.cur_watermark = None; + let cache_may_stale = cache_may_stale(&self.vnodes, &new_vnodes); - std::mem::replace(&mut self.vnodes, new_vnodes) + if cache_may_stale { + self.cur_watermark = None; + } + + ( + std::mem::replace(&mut self.vnodes, new_vnodes), + cache_may_stale, + ) } } diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index f4da5696ebf8d..18a7437d70e1b 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -456,7 +456,7 @@ impl DynamicFilterExecutor { // Update the vnode bitmap for the left state table if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) { - let _previous_vnode_bitmap = + let (_previous_vnode_bitmap, _cache_may_stale) = self.left_table.update_vnode_bitmap(vnode_bitmap); } diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 0b28d4e0582fb..5fac7026ffc03 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -704,7 +704,11 @@ impl HashJoinExecutor JoinHashMap { } /// Update the vnode bitmap and manipulate the cache if necessary. - pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) { - let previous_vnode_bitmap = self.state.table.update_vnode_bitmap(vnode_bitmap.clone()); - let _ = self - .degree_state - .table - .update_vnode_bitmap(vnode_bitmap.clone()); - - if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) { + pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) -> bool { + let (_previous_vnode_bitmap, cache_may_stale) = + self.state.table.update_vnode_bitmap(vnode_bitmap.clone()); + let _ = self.degree_state.table.update_vnode_bitmap(vnode_bitmap); + + if cache_may_stale { self.inner.clear(); } + + cache_may_stale } pub fn update_watermark(&mut self, watermark: ScalarImpl) { diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 40fd5fe5c1976..27a74b44cbcee 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -144,6 +144,21 @@ impl MergeExecutor { ); barrier.passed_actors.push(actor_id); + if let Some(Mutation::Update { dispatchers, .. }) = barrier.mutation.as_deref() + { + if select_all + .upstream_actor_ids() + .iter() + .any(|actor_id| dispatchers.contains_key(actor_id)) + { + // `Watermark` of upstream may become stale after downstream scaling. + select_all + .buffered_watermarks + .values_mut() + .for_each(|buffers| buffers.clear()); + } + } + if let Some(update) = barrier.as_update_merge(self.actor_context.id, self.upstream_fragment_id) { diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index cc6b606443671..fb700b6ddabcf 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -217,7 +217,7 @@ impl SortExecutor { // Update the vnode bitmap for the state table if asked. Also update the buffer. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.context.id) { - let prev_vnode_bitmap = + let (prev_vnode_bitmap, _cache_may_stale) = self.state_table.update_vnode_bitmap(vnode_bitmap.clone()); self.fill_buffer(Some(&prev_vnode_bitmap), &vnode_bitmap) .await?; diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index e2e02756bf928..9cc03f2e66a71 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -28,7 +28,7 @@ use risingwave_storage::StateStore; use super::top_n_cache::TopNCacheTrait; use super::utils::*; use super::TopNCache; -use crate::cache::{cache_may_stale, new_unbounded, ExecutorCache}; +use crate::cache::{new_unbounded, ExecutorCache}; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; use crate::executor::error::StreamExecutorResult; @@ -222,12 +222,12 @@ where } fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) { - let previous_vnode_bitmap = self + let (_previous_vnode_bitmap, cache_may_stale) = self .managed_state .state_table - .update_vnode_bitmap(vnode_bitmap.clone()); + .update_vnode_bitmap(vnode_bitmap); - if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) { + if cache_may_stale { self.caches.clear(); } } diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index a1d9c30e72509..c173f44759e8f 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -42,7 +42,6 @@ use super::group_top_n::GroupTopNCache; use super::top_n_cache::AppendOnlyTopNCacheTrait; use super::utils::*; use super::TopNCache; -use crate::cache::cache_may_stale; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; use crate::executor::error::StreamExecutorResult; @@ -207,12 +206,12 @@ where } fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) { - let previous_vnode_bitmap = self + let (_previous_vnode_bitmap, cache_may_stale) = self .managed_state .state_table - .update_vnode_bitmap(vnode_bitmap.clone()); + .update_vnode_bitmap(vnode_bitmap); - if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) { + if cache_may_stale { self.caches.clear(); } } diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index f43b17b75c7c3..90496b14011f6 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -199,7 +199,8 @@ impl WatermarkFilterExecutor { Message::Barrier(barrier) => { // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) { - let previous_vnode_bitmap = table.update_vnode_bitmap(vnode_bitmap.clone()); + let (previous_vnode_bitmap, _cache_may_stale) = + table.update_vnode_bitmap(vnode_bitmap.clone()); // Take the global max watermark when scaling happens. if previous_vnode_bitmap != vnode_bitmap {