Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add a type flag
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
BugenZhao committed Mar 16, 2023
1 parent 262bf00 commit 2c8e443
Showing 3 changed files with 11 additions and 3 deletions.
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
@@ -635,8 +635,9 @@ enum FragmentTypeFlag {
SOURCE = 1;
MVIEW = 2;
SINK = 4;
NOW = 8;
NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead.
CHAIN_NODE = 16;
BARRIER_RECV = 32;
}

// The environment associated with a stream plan
6 changes: 5 additions & 1 deletion src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
@@ -231,6 +231,10 @@ fn build_fragment(
) -> Result<StreamNode> {
// Update current fragment based on the node we're visiting.
match stream_node.get_node_body()? {
NodeBody::BarrierRecv(_) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32
}

NodeBody::Source(src) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32;
// Note: For creating table with connector, the source id is left with placeholder and
@@ -248,7 +252,6 @@ fn build_fragment(

NodeBody::TopN(_) => current_fragment.requires_singleton = true,

// FIXME: workaround for single-fragment mview on singleton upstream mview.
NodeBody::Chain(node) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::ChainNode as u32;
// memorize table id for later use
@@ -259,6 +262,7 @@ fn build_fragment(
}

NodeBody::Now(_) => {
// TODO: Remove this and insert a `BarrierRecv` instead.
current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
current_fragment.requires_singleton = true;
}
5 changes: 4 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
@@ -259,7 +259,10 @@ impl TableFragments {
/// Returns barrier inject actor ids.
pub fn barrier_inject_actor_ids(&self) -> Vec<ActorId> {
Self::filter_actor_ids(self, |fragment_type_mask| {
(fragment_type_mask & (FragmentTypeFlag::Source as u32 | FragmentTypeFlag::Now as u32))
(fragment_type_mask
& (FragmentTypeFlag::Source as u32
| FragmentTypeFlag::Now as u32
| FragmentTypeFlag::BarrierRecv as u32))
!= 0
})
}

0 comments on commit 2c8e443

Please sign in to comment.