diff --git a/src/common/src/system_param.rs b/src/common/src/system_param/mod.rs similarity index 99% rename from src/common/src/system_param.rs rename to src/common/src/system_param/mod.rs index d2d0e57f12bc1..c525a3ae71a5c 100644 --- a/src/common/src/system_param.rs +++ b/src/common/src/system_param/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod reader; + use std::collections::HashSet; use std::fmt::Debug; use std::ops::RangeBounds; diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs new file mode 100644 index 0000000000000..fd4b58fc81a2d --- /dev/null +++ b/src/common/src/system_param/reader.rs @@ -0,0 +1,82 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::meta::SystemParams as ProstSystemParams; +use tracing::warn; + +use super::system_params_to_kv; + +/// A wrapper for [`risingwave_pb::meta::SystemParams`] for 2 purposes: +/// - Avoid misuse of deprecated fields by hiding their getters. +/// - Abstract fallback logic for fields that might not be provided by meta service due to backward +/// compatibility. +#[derive(Clone, Debug)] +pub struct SystemParamsReader { + prost: ProstSystemParams, +} + +impl From for SystemParamsReader { + fn from(prost: ProstSystemParams) -> Self { + Self { prost } + } +} + +impl SystemParamsReader { + pub fn barrier_interval_ms(&self) -> u32 { + self.prost.barrier_interval_ms.unwrap() + } + + pub fn checkpoint_frequency(&self) -> u64 { + self.prost.checkpoint_frequency.unwrap() + } + + pub fn sstable_size_mb(&self) -> u32 { + self.prost.sstable_size_mb.unwrap() + } + + pub fn block_size_kb(&self) -> u32 { + self.prost.block_size_kb.unwrap() + } + + pub fn bloom_false_positive(&self) -> f64 { + self.prost.bloom_false_positive.unwrap() + } + + // TODO(zhidong): Only read from system params in v0.1.18. + pub fn state_store(&self, from_local: String) -> String { + let from_prost = self.prost.state_store.as_ref().unwrap(); + if from_prost.is_empty() { + warn!("--state-store is not specified on meta node, reading from CLI instead"); + from_local + } else { + from_prost.clone() + } + } + + pub fn data_directory(&self) -> &str { + self.prost.data_directory.as_ref().unwrap() + } + + pub fn backup_storage_url(&self) -> &str { + self.prost.backup_storage_url.as_ref().unwrap() + } + + pub fn backup_storage_directory(&self) -> &str { + self.prost.backup_storage_directory.as_ref().unwrap() + } + + pub fn to_kv(&self) -> Vec<(String, String)> { + system_params_to_kv(&self.prost).unwrap() + } +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index a7ac0de32f44a..9fcb0f34845ac 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -14,13 +14,14 @@ use std::collections::HashMap; +use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockSnapshot; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::CreatingJobInfo; use risingwave_rpc_client::error::Result; -use risingwave_rpc_client::{HummockMetaClient, MetaClient, SystemParamsReader}; +use risingwave_rpc_client::{HummockMetaClient, MetaClient}; /// A wrapper around the `MetaClient` that only provides a minor set of meta rpc. /// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index db227c01b7520..8aa46fb9c8209 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{ DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, }; use risingwave_common::error::Result; +use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ @@ -42,7 +43,6 @@ use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; use risingwave_pb::user::{GrantPrivilege, UserInfo}; use risingwave_rpc_client::error::Result as RpcResult; -use risingwave_rpc_client::SystemParamsReader; use tempfile::{Builder, NamedTempFile}; use crate::catalog::catalog_service::CatalogWriter; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b570fb91f8186..3cd348debd247 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -52,7 +52,8 @@ use crate::barrier::snapshot::SnapshotManager; use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::HummockManagerRef; use crate::manager::{ - CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, MetaSrvEnv, WorkerId, + CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, MetaSrvEnv, + WorkerId, }; use crate::model::{ActorId, BarrierManagerState}; use crate::rpc::metrics::MetaMetrics; @@ -564,6 +565,12 @@ where let mut barrier_timer: Option = None; let (barrier_complete_tx, mut barrier_complete_rx) = tokio::sync::mpsc::unbounded_channel(); let mut checkpoint_control = CheckpointControl::new(self.metrics.clone()); + let (local_notification_tx, mut local_notification_rx) = + tokio::sync::mpsc::unbounded_channel(); + self.env + .notification_manager() + .insert_local_sender(local_notification_tx) + .await; loop { tokio::select! { biased; @@ -572,6 +579,10 @@ where tracing::info!("Barrier manager is stopped"); return; } + // Checkpoint frequency change + notification = local_notification_rx.recv() => { + self.handle_local_notification(notification.unwrap()); + } result = barrier_complete_rx.recv() => { checkpoint_control.update_barrier_nums_metrics(); @@ -982,6 +993,14 @@ where pub async fn get_ddl_progress(&self) -> Vec { self.tracker.lock().await.gen_ddl_progress() } + + /// Only handle `SystemParamsChange`. + fn handle_local_notification(&self, notification: LocalNotification) { + if let LocalNotification::SystemParamsChange(p) = notification { + self.scheduled_barriers + .set_checkpoint_frequency(p.checkpoint_frequency() as usize) + } + } } pub type BarrierManagerRef = Arc>; diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 5dc62ed57618a..ce5536251d740 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -45,7 +45,7 @@ struct Inner { /// Force checkpoint in next barrier. force_checkpoint: AtomicBool, - checkpoint_frequency: usize, + checkpoint_frequency: AtomicUsize, } /// The sender side of the barrier scheduling queue. @@ -73,7 +73,7 @@ impl BarrierScheduler { queue: RwLock::new(VecDeque::new()), changed_tx: watch::channel(()).0, num_uncheckpointed_barrier: AtomicUsize::new(0), - checkpoint_frequency, + checkpoint_frequency: AtomicUsize::new(checkpoint_frequency), force_checkpoint: AtomicBool::new(false), }); @@ -281,7 +281,7 @@ impl ScheduledBarriers { self.inner .num_uncheckpointed_barrier .load(Ordering::Relaxed) - >= self.inner.checkpoint_frequency + >= self.inner.checkpoint_frequency.load(Ordering::Relaxed) || self.inner.force_checkpoint.load(Ordering::Relaxed) } @@ -290,6 +290,13 @@ impl ScheduledBarriers { self.inner.force_checkpoint.store(true, Ordering::Relaxed) } + /// Update the `checkpoint_frequency` + pub fn set_checkpoint_frequency(&self, frequency: usize) { + self.inner + .checkpoint_frequency + .store(frequency, Ordering::Relaxed); + } + /// Update the `num_uncheckpointed_barrier` fn update_num_uncheckpointed_barrier(&self, checkpoint: bool) { if checkpoint { diff --git a/src/meta/src/hummock/compaction_scheduler.rs b/src/meta/src/hummock/compaction_scheduler.rs index a02230bf71c65..4b01b64130ed6 100644 --- a/src/meta/src/hummock/compaction_scheduler.rs +++ b/src/meta/src/hummock/compaction_scheduler.rs @@ -735,16 +735,15 @@ mod tests { fail::remove(fp_cancel_compact_task); assert_eq!(hummock_manager.list_all_tasks_ids().await.len(), 1); // Notified to retry cancellation. - let mut task_to_cancel = match rx.recv().await.unwrap() { - LocalNotification::WorkerNodeIsDeleted(_) => { - panic!() - } - LocalNotification::CompactionTaskNeedCancel(task_to_cancel) => task_to_cancel, + if let LocalNotification::CompactionTaskNeedCancel(mut task_to_cancel) = + rx.recv().await.unwrap() + { + hummock_manager + .cancel_compact_task(&mut task_to_cancel, TaskStatus::ManualCanceled) + .await + .unwrap(); }; - hummock_manager - .cancel_compact_task(&mut task_to_cancel, TaskStatus::ManualCanceled) - .await - .unwrap(); + assert!(hummock_manager.list_all_tasks_ids().await.is_empty()); // Succeeded. diff --git a/src/meta/src/hummock/manager/worker.rs b/src/meta/src/hummock/manager/worker.rs index a4d1031320c04..1d8abda94c21a 100644 --- a/src/meta/src/hummock/manager/worker.rs +++ b/src/meta/src/hummock/manager/worker.rs @@ -150,6 +150,7 @@ where tracing::info!("Cancelled compaction task {}", task_id); sync_point!("AFTER_CANCEL_COMPACTION_TASK_ASYNC"); } + LocalNotification::SystemParamsChange(_) => {} } } } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index c46cbcfc61147..1959c895db7ee 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -68,7 +68,7 @@ pub struct MetaOpts { pub max_idle_ms: u64, /// Whether run in compaction detection test mode pub compaction_deterministic_test: bool, - + // TODO: this will be read from system param channel and should be removed to avoid misuse pub checkpoint_frequency: usize, /// Interval of GC metadata in meta store and stale SSTs in object store. diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index f3d9436b7f86e..35e0dfca6a846 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -16,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; +use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::hummock::CompactTask; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -37,6 +38,7 @@ pub type NotificationVersion = u64; pub enum LocalNotification { WorkerNodeIsDeleted(WorkerNode), CompactionTaskNeedCancel(CompactTask), + SystemParamsChange(SystemParamsReader), } #[derive(Debug)] diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index 119c25d8079e6..c155b27285bfc 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -70,6 +70,14 @@ impl SystemParamManager { mem_txn.commit(); + // Sync params to other managers on the meta node only once, since it's infallible. + self.env + .notification_manager() + .notify_local_subscribers(super::LocalNotification::SystemParamsChange( + params.clone().into(), + )) + .await; + Ok(()) } diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 9110d5009e26c..493a0738b7f1f 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -20,6 +20,7 @@ use either::Either; use etcd_client::ConnectOptions; use risingwave_backup::storage::ObjectStoreMetaSnapshotStorage; use risingwave_common::monitor::process_linux::monitor_process; +use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; @@ -314,6 +315,10 @@ pub async fn start_service_as_election_leader( let registry = meta_metrics.registry(); monitor_process(registry).unwrap(); + let system_params_manager = + Arc::new(SystemParamManager::new(env.clone(), init_system_params).await?); + let system_params_reader: SystemParamsReader = system_params_manager.get_params().await.into(); + let cluster_manager = Arc::new( ClusterManager::new(env.clone(), max_heartbeat_interval) .await @@ -360,8 +365,10 @@ pub async fn start_service_as_election_leader( tokio::spawn(dashboard_service.serve(address_info.ui_path)); } - let (barrier_scheduler, scheduled_barriers) = - BarrierScheduler::new_pair(hummock_manager.clone(), env.opts.checkpoint_frequency); + let (barrier_scheduler, scheduled_barriers) = BarrierScheduler::new_pair( + hummock_manager.clone(), + system_params_reader.checkpoint_frequency() as usize, + ); let source_manager = Arc::new( SourceManager::new( @@ -442,8 +449,6 @@ pub async fn start_service_as_election_leader( backup_manager.clone(), compactor_manager.clone(), )); - let system_params_manager = - Arc::new(SystemParamManager::new(env.clone(), init_system_params).await?); let ddl_srv = DdlServiceImpl::::new( env.clone(), @@ -490,7 +495,7 @@ pub async fn start_service_as_election_leader( ); let health_srv = HealthServiceImpl::new(); let backup_srv = BackupServiceImpl::new(backup_manager); - let system_params_srv = SystemParamsServiceImpl::new(system_params_manager); + let system_params_srv = SystemParamsServiceImpl::new(system_params_manager.clone()); if let Some(prometheus_addr) = address_info.prometheus_addr { MetricsManager::boot_metrics_service( diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 265cde773c334..5e6821b60dd8f 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -54,7 +54,7 @@ mod stream_client; pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}; pub use connector_client::ConnectorClient; pub use hummock_meta_client::{CompactTaskItem, HummockMetaClient}; -pub use meta_client::{MetaClient, SystemParamsReader}; +pub use meta_client::MetaClient; pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef}; #[async_trait] diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index bf8b571146d50..e05e47fab5a97 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -25,7 +25,7 @@ use futures::stream::BoxStream; use itertools::Itertools; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; -use risingwave_common::system_param::system_params_to_kv; +use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::util::addr::HostAddr; use risingwave_hummock_sdk::compact::CompactorRuntimeConfig; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; @@ -57,7 +57,7 @@ use risingwave_pb::meta::reschedule_request::Reschedule as ProstReschedule; use risingwave_pb::meta::scale_service_client::ScaleServiceClient; use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient; use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient; -use risingwave_pb::meta::{SystemParams as ProstSystemParams, *}; +use risingwave_pb::meta::*; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; use risingwave_pb::user::user_service_client::UserServiceClient; @@ -70,7 +70,6 @@ use tokio::time; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tonic::transport::{Channel, Endpoint}; use tonic::{Code, Streaming}; -use tracing::warn; use crate::error::{Result, RpcError}; use crate::hummock_meta_client::{CompactTaskItem, HummockMetaClient}; @@ -953,69 +952,6 @@ impl HummockMetaClient for MetaClient { } } -/// A wrapper for [`risingwave_pb::meta::SystemParams`] for 2 purposes: -/// - Avoid misuse of deprecated fields by hiding their getters. -/// - Abstract fallback logic for fields that might not be provided by meta service due to backward -/// compatibility. -pub struct SystemParamsReader { - prost: ProstSystemParams, -} - -impl From for SystemParamsReader { - fn from(prost: ProstSystemParams) -> Self { - Self { prost } - } -} - -impl SystemParamsReader { - pub fn barrier_interval_ms(&self) -> u32 { - self.prost.barrier_interval_ms.unwrap() - } - - pub fn checkpoint_frequency(&self) -> u64 { - self.prost.checkpoint_frequency.unwrap() - } - - pub fn sstable_size_mb(&self) -> u32 { - self.prost.sstable_size_mb.unwrap() - } - - pub fn block_size_kb(&self) -> u32 { - self.prost.block_size_kb.unwrap() - } - - pub fn bloom_false_positive(&self) -> f64 { - self.prost.bloom_false_positive.unwrap() - } - - // TODO(zhidong): Only read from system params in v0.1.18. - pub fn state_store(&self, from_local: String) -> String { - let from_prost = self.prost.state_store.as_ref().unwrap(); - if from_prost.is_empty() { - warn!("--state-store is not specified on meta node, reading from CLI instead"); - from_local - } else { - from_prost.clone() - } - } - - pub fn data_directory(&self) -> &str { - self.prost.data_directory.as_ref().unwrap() - } - - pub fn backup_storage_url(&self) -> &str { - self.prost.backup_storage_url.as_ref().unwrap() - } - - pub fn backup_storage_directory(&self) -> &str { - self.prost.backup_storage_directory.as_ref().unwrap() - } - - pub fn to_kv(&self) -> Vec<(String, String)> { - system_params_to_kv(&self.prost).unwrap() - } -} - #[derive(Debug, Clone)] struct GrpcMetaClientCore { cluster_client: ClusterServiceClient, diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 1a3300ab77fec..9ee84f608ed9a 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -13,8 +13,8 @@ // limitations under the License. use risingwave_common::config::RwConfig; +use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::meta::SystemParams; -use risingwave_rpc_client::SystemParamsReader; #[derive(Clone, Debug)] pub struct StorageOpts {