Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(barrier): add control request to explicitly create partial graph #19383

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,19 @@
}

message StreamingControlStreamRequest {
message InitRequest {
message InitialPartialGraph {
uint64 partial_graph_id = 1;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2;
}

message InitRequest {

Check failure on line 71 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "subscriptions" on message "InitRequest" was deleted without reserving the name "subscriptions".

Check failure on line 71 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "subscriptions" on message "InitRequest" was deleted without reserving the number "2".
repeated InitialPartialGraph graphs = 1;
}

message CreatePartialGraphRequest {
uint64 partial_graph_id = 1;
}

message RemovePartialGraphRequest {
repeated uint64 partial_graph_ids = 1;
}
Expand All @@ -75,6 +84,7 @@
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
RemovePartialGraphRequest remove_partial_graph = 3;
CreatePartialGraphRequest create_partial_graph = 4;
}
}

Expand Down
18 changes: 11 additions & 7 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::{debug, warn};

Expand All @@ -40,7 +39,7 @@ use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
use crate::barrier::utils::collect_creating_job_commit_epoch_info;
use crate::barrier::{
BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
SnapshotBackfillInfo, TracedEpoch,
InflightSubscriptionInfo, SnapshotBackfillInfo, TracedEpoch,
};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::rpc::metrics::GLOBAL_META_METRICS;
Expand Down Expand Up @@ -147,6 +146,7 @@ impl CheckpointControl {
} else {
new_database.state.in_flight_prev_epoch().clone()
};
control_stream_manager.add_partial_graph(database_id, None)?;
(entry.insert(new_database), max_prev_epoch)
}
Command::Flush
Expand Down Expand Up @@ -276,10 +276,12 @@ impl CheckpointControl {
.for_each(|database| database.create_mview_tracker.abort_all());
}

pub(crate) fn subscriptions(&self) -> impl Iterator<Item = PbSubscriptionUpstreamInfo> + '_ {
self.databases
.values()
.flat_map(|database| &database.state.inflight_subscription_info)
pub(crate) fn subscriptions(
&self,
) -> impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)> + '_ {
self.databases.iter().map(|(database_id, database)| {
(*database_id, &database.state.inflight_subscription_info)
})
}
}

Expand Down Expand Up @@ -822,8 +824,10 @@ impl DatabaseCheckpointControl {
.expect("checked Some")
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
let job_id = info.table_fragments.table_id();
control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
self.creating_streaming_job_controls.insert(
info.table_fragments.table_id(),
job_id,
CreatingStreamingJobControl::new(
info.clone(),
snapshot_backfill_info.clone(),
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::catalog::TableId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use risingwave_rpc_client::StreamingControlHandle;

Expand Down Expand Up @@ -71,9 +71,9 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
async fn new_control_stream(
&self,
node: &WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
init_request: &PbInitRequest,
) -> MetaResult<StreamingControlHandle> {
self.new_control_stream_impl(node, subscriptions).await
self.new_control_stream_impl(node, init_request).await
}

async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
use risingwave_rpc_client::StreamingControlHandle;

use crate::barrier::command::CommandContext;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
async fn new_control_stream(
&self,
node: &WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
init_request: &PbInitRequest,
) -> MetaResult<StreamingControlHandle>;

async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
Expand Down
61 changes: 46 additions & 15 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo};
use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest;
use risingwave_pb::stream_service::streaming_control_stream_request::{
CreatePartialGraphRequest, PbInitRequest, PbInitialPartialGraph, RemovePartialGraphRequest,
};
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
InjectBarrierRequest, StreamingControlStreamRequest,
Expand Down Expand Up @@ -94,7 +96,7 @@ impl ControlStreamManager {
pub(super) async fn add_worker(
&mut self,
node: WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
initial_subscriptions: impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)>,
context: &impl GlobalBarrierWorkerContext,
) {
let node_id = node.id as WorkerId;
Expand All @@ -106,13 +108,10 @@ impl ControlStreamManager {
let mut backoff = ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(3))
.factor(5);
let subscriptions = subscriptions.collect_vec();
let init_request = Self::collect_init_request(initial_subscriptions);
const MAX_RETRY: usize = 5;
for i in 1..=MAX_RETRY {
match context
.new_control_stream(&node, subscriptions.iter().cloned())
.await
{
match context.new_control_stream(&node, &init_request).await {
Ok(handle) => {
assert!(self
.nodes
Expand Down Expand Up @@ -141,16 +140,14 @@ impl ControlStreamManager {

pub(super) async fn reset(
&mut self,
subscriptions: impl Iterator<Item = &InflightSubscriptionInfo>,
initial_subscriptions: impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)>,
nodes: &HashMap<WorkerId, WorkerNode>,
context: &impl GlobalBarrierWorkerContext,
) -> MetaResult<()> {
let subscriptions = subscriptions.cloned().collect_vec();
let subscriptions = &subscriptions;
let init_request = Self::collect_init_request(initial_subscriptions);
let init_request = &init_request;
let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async move {
let handle = context
.new_control_stream(node, subscriptions.iter().flatten())
.await?;
let handle = context.new_control_stream(node, init_request).await?;
Result::<_, MetaError>::Ok((
*worker_id,
ControlStreamNode {
Expand Down Expand Up @@ -270,6 +267,19 @@ impl ControlStreamManager {
tracing::debug!(?errors, "collected stream errors");
errors
}

fn collect_init_request(
initial_subscriptions: impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)>,
) -> PbInitRequest {
PbInitRequest {
graphs: initial_subscriptions
.map(|(database_id, info)| PbInitialPartialGraph {
partial_graph_id: to_partial_graph_id(database_id, None),
subscriptions: info.into_iter().collect_vec(),
})
.collect(),
}
}
}

impl ControlStreamManager {
Expand Down Expand Up @@ -436,6 +446,27 @@ impl ControlStreamManager {
Ok(node_need_collect)
}

pub(super) fn add_partial_graph(
&mut self,
database_id: DatabaseId,
creating_job_id: Option<TableId>,
) -> MetaResult<()> {
let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
self.nodes.iter().try_for_each(|(_, node)| {
node.handle
.request_sender
.send(StreamingControlStreamRequest {
request: Some(
streaming_control_stream_request::Request::CreatePartialGraph(
CreatePartialGraphRequest { partial_graph_id },
),
),
})
.map_err(|_| anyhow!("failed to add partial graph"))
})?;
Ok(())
}

pub(super) fn remove_partial_graph(
&mut self,
database_id: DatabaseId,
Expand Down Expand Up @@ -472,14 +503,14 @@ impl GlobalBarrierWorkerContextImpl {
pub(super) async fn new_control_stream_impl(
&self,
node: &WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
init_request: &PbInitRequest,
) -> MetaResult<StreamingControlHandle> {
let handle = self
.env
.stream_client_pool()
.get(node)
.await?
.start_streaming_control(subscriptions)
.start_streaming_control(init_request.clone())
.await?;
Ok(handle)
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/barrier/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;
use std::mem::replace;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use arc_swap::ArcSwap;
Expand Down Expand Up @@ -46,7 +46,7 @@ use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager};
use crate::barrier::schedule::PeriodicBarriers;
use crate::barrier::{
schedule, BarrierKind, BarrierManagerRequest, BarrierManagerStatus,
BarrierWorkerRuntimeInfoSnapshot, RecoveryReason, TracedEpoch,
BarrierWorkerRuntimeInfoSnapshot, InflightSubscriptionInfo, RecoveryReason, TracedEpoch,
};
use crate::error::MetaErrorInner;
use crate::hummock::HummockManagerRef;
Expand Down Expand Up @@ -558,9 +558,10 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {

let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
let reset_start_time = Instant::now();
let empty_subscriptions = LazyLock::new(InflightSubscriptionInfo::default);
control_stream_manager
.reset(
subscription_infos.values(),
database_fragment_infos.keys().map(|database_id| (*database_id, subscription_infos.get(database_id).unwrap_or_else(|| &*empty_subscriptions))),
active_streaming_nodes.current(),
&*self.context,
)
Expand Down
9 changes: 3 additions & 6 deletions src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ use futures::TryStreamExt;
use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE;
use risingwave_common::monitor::{EndpointExt, TcpConfig};
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
use risingwave_pb::stream_service::stream_service_client::StreamServiceClient;
use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse;
use risingwave_pb::stream_service::*;
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -86,13 +85,11 @@ pub type StreamingControlHandle =
impl StreamClient {
pub async fn start_streaming_control(
&self,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
init_request: PbInitRequest,
) -> Result<StreamingControlHandle> {
let first_request = StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::Init(
InitRequest {
subscriptions: subscriptions.collect(),
},
init_request,
)),
};
let mut client = self.0.to_owned();
Expand Down
Loading
Loading