diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index e6b43c3d7d8e..528c7adaf466 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -395,6 +395,13 @@ export interface StreamSource_PropertiesEntry { value: string; } +/** + * The executor only for receiving barrier from the meta service. It always resides in the leaves + * of the streaming graph. + */ +export interface BarrierRecvNode { +} + export interface SourceNode { /** * The source node can contain either a stream source or nothing. So here we extract all @@ -867,7 +874,8 @@ export interface StreamNode { | { $case: "rowIdGen"; rowIdGen: RowIdGenNode } | { $case: "now"; now: NowNode } | { $case: "appendOnlyGroupTopN"; appendOnlyGroupTopN: GroupTopNNode } - | { $case: "temporalJoin"; temporalJoin: TemporalJoinNode }; + | { $case: "temporalJoin"; temporalJoin: TemporalJoinNode } + | { $case: "barrierRecv"; barrierRecv: BarrierRecvNode }; /** * The id for the operator. This is local per mview. * TODO: should better be a uint32. @@ -1898,6 +1906,26 @@ export const StreamSource_PropertiesEntry = { }, }; +function createBaseBarrierRecvNode(): BarrierRecvNode { + return {}; +} + +export const BarrierRecvNode = { + fromJSON(_: any): BarrierRecvNode { + return {}; + }, + + toJSON(_: BarrierRecvNode): unknown { + const obj: any = {}; + return obj; + }, + + fromPartial, I>>(_: I): BarrierRecvNode { + const message = createBaseBarrierRecvNode(); + return message; + }, +}; + function createBaseSourceNode(): SourceNode { return { sourceInner: undefined }; } @@ -3733,6 +3761,8 @@ export const StreamNode = { ? { $case: "appendOnlyGroupTopN", appendOnlyGroupTopN: GroupTopNNode.fromJSON(object.appendOnlyGroupTopN) } : isSet(object.temporalJoin) ? { $case: "temporalJoin", temporalJoin: TemporalJoinNode.fromJSON(object.temporalJoin) } + : isSet(object.barrierRecv) + ? { $case: "barrierRecv", barrierRecv: BarrierRecvNode.fromJSON(object.barrierRecv) } : undefined, operatorId: isSet(object.operatorId) ? Number(object.operatorId) : 0, input: Array.isArray(object?.input) @@ -3822,6 +3852,9 @@ export const StreamNode = { message.nodeBody?.$case === "temporalJoin" && (obj.temporalJoin = message.nodeBody?.temporalJoin ? TemporalJoinNode.toJSON(message.nodeBody?.temporalJoin) : undefined); + message.nodeBody?.$case === "barrierRecv" && (obj.barrierRecv = message.nodeBody?.barrierRecv + ? BarrierRecvNode.toJSON(message.nodeBody?.barrierRecv) + : undefined); message.operatorId !== undefined && (obj.operatorId = Math.round(message.operatorId)); if (message.input) { obj.input = message.input.map((e) => @@ -3840,7 +3873,9 @@ export const StreamNode = { message.appendOnly !== undefined && (obj.appendOnly = message.appendOnly); message.identity !== undefined && (obj.identity = message.identity); if (message.fields) { - obj.fields = message.fields.map((e) => e ? Field.toJSON(e) : undefined); + obj.fields = message.fields.map((e) => + e ? Field.toJSON(e) : undefined + ); } else { obj.fields = []; } @@ -4063,6 +4098,16 @@ export const StreamNode = { temporalJoin: TemporalJoinNode.fromPartial(object.nodeBody.temporalJoin), }; } + if ( + object.nodeBody?.$case === "barrierRecv" && + object.nodeBody?.barrierRecv !== undefined && + object.nodeBody?.barrierRecv !== null + ) { + message.nodeBody = { + $case: "barrierRecv", + barrierRecv: BarrierRecvNode.fromPartial(object.nodeBody.barrierRecv), + }; + } message.operatorId = object.operatorId ?? 0; message.input = object.input?.map((e) => StreamNode.fromPartial(e)) || []; message.streamKey = object.streamKey?.map((e) => e) || []; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index f865ab992e01..9ecaa4cd2510 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -131,6 +131,10 @@ message StreamSource { string source_name = 8; } +// The executor only for receiving barrier from the meta service. It always resides in the leaves +// of the streaming graph. +message BarrierRecvNode {} + message SourceNode { // The source node can contain either a stream source or nothing. So here we extract all // information about stream source to a message, and here it will be an `Option` in Rust. @@ -545,6 +549,7 @@ message StreamNode { NowNode now = 129; GroupTopNNode append_only_group_top_n = 130; TemporalJoinNode temporal_join = 131; + BarrierRecvNode barrier_recv = 132; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. @@ -630,8 +635,9 @@ enum FragmentTypeFlag { SOURCE = 1; MVIEW = 2; SINK = 4; - NOW = 8; + NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead. CHAIN_NODE = 16; + BARRIER_RECV = 32; } // The environment associated with a stream plan diff --git a/src/common/src/catalog/schema.rs b/src/common/src/catalog/schema.rs index f4c254d890b7..7df02894514d 100644 --- a/src/common/src/catalog/schema.rs +++ b/src/common/src/catalog/schema.rs @@ -118,6 +118,11 @@ pub struct Schema { } impl Schema { + pub fn empty() -> &'static Self { + static EMPTY: Schema = Schema { fields: Vec::new() }; + &EMPTY + } + pub fn len(&self) -> usize { self.fields.len() } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 6457969dd51f..b2aa8fee1471 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -231,6 +231,10 @@ fn build_fragment( ) -> Result { // Update current fragment based on the node we're visiting. match stream_node.get_node_body()? { + NodeBody::BarrierRecv(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32 + } + NodeBody::Source(src) => { current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32; // Note: For creating table with connector, the source id is left with placeholder and @@ -248,7 +252,6 @@ fn build_fragment( NodeBody::TopN(_) => current_fragment.requires_singleton = true, - // FIXME: workaround for single-fragment mview on singleton upstream mview. NodeBody::Chain(node) => { current_fragment.fragment_type_mask |= FragmentTypeFlag::ChainNode as u32; // memorize table id for later use @@ -259,6 +262,7 @@ fn build_fragment( } NodeBody::Now(_) => { + // TODO: Remove this and insert a `BarrierRecv` instead. current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32; current_fragment.requires_singleton = true; } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 9480a528fe61..05e680a03475 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -259,7 +259,10 @@ impl TableFragments { /// Returns barrier inject actor ids. pub fn barrier_inject_actor_ids(&self) -> Vec { Self::filter_actor_ids(self, |fragment_type_mask| { - (fragment_type_mask & (FragmentTypeFlag::Source as u32 | FragmentTypeFlag::Now as u32)) + (fragment_type_mask + & (FragmentTypeFlag::Source as u32 + | FragmentTypeFlag::Now as u32 + | FragmentTypeFlag::BarrierRecv as u32)) != 0 }) } diff --git a/src/stream/src/executor/barrier_recv.rs b/src/stream/src/executor/barrier_recv.rs new file mode 100644 index 000000000000..5bf01c139d16 --- /dev/null +++ b/src/stream/src/executor/barrier_recv.rs @@ -0,0 +1,107 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::StreamExt; +use risingwave_common::catalog::Schema; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use super::{ + ActorContext, ActorContextRef, Barrier, BoxedMessageStream, Executor, Message, PkIndicesRef, + StreamExecutorError, +}; + +/// The executor only for receiving barrier from the meta service. It always resides in the leaves +/// of the streaming graph. +pub struct BarrierRecvExecutor { + _ctx: ActorContextRef, + identity: String, + + /// The barrier receiver registered in the local barrier manager. + barrier_receiver: UnboundedReceiver, +} + +impl BarrierRecvExecutor { + pub fn new( + ctx: ActorContextRef, + barrier_receiver: UnboundedReceiver, + executor_id: u64, + ) -> Self { + Self { + _ctx: ctx, + identity: format!("BarrierRecvExecutor {:X}", executor_id), + barrier_receiver, + } + } + + pub fn for_test(barrier_receiver: UnboundedReceiver) -> Self { + Self::new(ActorContext::create(0), barrier_receiver, 0) + } +} + +impl Executor for BarrierRecvExecutor { + fn execute(self: Box) -> BoxedMessageStream { + UnboundedReceiverStream::new(self.barrier_receiver) + .map(|barrier| Ok(Message::Barrier(barrier))) + .chain(futures::stream::once(async { + // We do not use the stream termination as the control message, and this line should + // never be reached in normal cases. So we just return an error here. + Err(StreamExecutorError::channel_closed("barrier receiver")) + })) + .boxed() + } + + fn schema(&self) -> &Schema { + Schema::empty() + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &[] + } + + fn identity(&self) -> &str { + &self.identity + } +} + +#[cfg(test)] +mod tests { + use futures::pin_mut; + use tokio::sync::mpsc; + + use super::*; + use crate::executor::test_utils::StreamExecutorTestExt; + + #[tokio::test] + async fn test_barrier_recv() { + let (barrier_tx, barrier_rx) = mpsc::unbounded_channel(); + + let barrier_recv = BarrierRecvExecutor::for_test(barrier_rx).boxed(); + let stream = barrier_recv.execute(); + pin_mut!(stream); + + barrier_tx.send(Barrier::new_test_barrier(114)).unwrap(); + barrier_tx.send(Barrier::new_test_barrier(514)).unwrap(); + + let barrier_1 = stream.next_unwrap_ready_barrier().unwrap(); + assert_eq!(barrier_1.epoch.curr, 114); + let barrier_2 = stream.next_unwrap_ready_barrier().unwrap(); + assert_eq!(barrier_2.epoch.curr, 514); + + stream.next_unwrap_pending(); + + drop(barrier_tx); + assert!(stream.next_unwrap_ready().is_err()); + } +} diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 27e8d7583b37..cf2be19024e3 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -57,6 +57,7 @@ pub mod monitor; pub mod agg_common; pub mod aggregation; +mod barrier_recv; mod batch_query; mod chain; mod dispatch; @@ -103,6 +104,7 @@ mod test_utils; pub use actor::{Actor, ActorContext, ActorContextRef}; use anyhow::Context; pub use backfill::*; +pub use barrier_recv::BarrierRecvExecutor; pub use batch_query::BatchQueryExecutor; pub use chain::ChainExecutor; pub use dispatch::{DispatchExecutor, DispatcherImpl}; diff --git a/src/stream/src/from_proto/barrier_recv.rs b/src/stream/src/from_proto/barrier_recv.rs new file mode 100644 index 000000000000..d4e164a38e45 --- /dev/null +++ b/src/stream/src/from_proto/barrier_recv.rs @@ -0,0 +1,49 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::stream_plan::BarrierRecvNode; +use tokio::sync::mpsc::unbounded_channel; + +use super::*; +use crate::executor::BarrierRecvExecutor; + +pub struct BarrierRecvExecutorBuilder; + +#[async_trait::async_trait] +impl ExecutorBuilder for BarrierRecvExecutorBuilder { + type Node = BarrierRecvNode; + + async fn new_boxed_executor( + params: ExecutorParams, + _node: &Self::Node, + _store: impl StateStore, + stream: &mut LocalStreamManagerCore, + ) -> StreamResult { + assert!( + params.input.is_empty(), + "barrier receiver should not have input" + ); + + let (sender, barrier_receiver) = unbounded_channel(); + stream + .context + .lock_barrier_manager() + .register_sender(params.actor_context.id, sender); + + Ok( + BarrierRecvExecutor::new(params.actor_context, barrier_receiver, params.executor_id) + .boxed(), + ) + } +} diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 818d25b6e269..8a9fee89f39f 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -15,6 +15,7 @@ //! Build executor from protobuf. mod agg_common; +mod barrier_recv; mod batch_query; mod chain; mod dml; @@ -51,6 +52,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{StreamNode, TemporalJoinNode}; use risingwave_storage::StateStore; +use self::barrier_recv::*; use self::batch_query::*; use self::chain::*; use self::dml::*; @@ -152,5 +154,6 @@ pub async fn create_executor( NodeBody::RowIdGen => RowIdGenExecutorBuilder, NodeBody::Now => NowExecutorBuilder, NodeBody::TemporalJoin => TemporalJoinExecutorBuilder, + NodeBody::BarrierRecv => BarrierRecvExecutorBuilder, } }