Skip to content

Commit

Permalink
feat(meta): mutable checkpoint frequency (#8010)
Browse files Browse the repository at this point in the history
As title. Use `LocalNotification` to asyncly notify other components on the meta node of the latest params.

Approved-By: BugenZhao

Co-Authored-By: Gun9niR <gun9nir.guo@gmail.com>
Co-Authored-By: Zhidong Guo <52783948+Gun9niR@users.noreply.github.com>
  • Loading branch information
Gun9niR and Gun9niR authored Feb 22, 2023
1 parent 3a598fb commit 7cadc39
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod reader;

use std::collections::HashSet;
use std::fmt::Debug;
use std::ops::RangeBounds;
Expand Down
82 changes: 82 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::meta::SystemParams as ProstSystemParams;
use tracing::warn;

use super::system_params_to_kv;

/// A wrapper for [`risingwave_pb::meta::SystemParams`] for 2 purposes:
/// - Avoid misuse of deprecated fields by hiding their getters.
/// - Abstract fallback logic for fields that might not be provided by meta service due to backward
/// compatibility.
#[derive(Clone, Debug)]
pub struct SystemParamsReader {
prost: ProstSystemParams,
}

impl From<ProstSystemParams> for SystemParamsReader {
fn from(prost: ProstSystemParams) -> Self {
Self { prost }
}
}

impl SystemParamsReader {
pub fn barrier_interval_ms(&self) -> u32 {
self.prost.barrier_interval_ms.unwrap()
}

pub fn checkpoint_frequency(&self) -> u64 {
self.prost.checkpoint_frequency.unwrap()
}

pub fn sstable_size_mb(&self) -> u32 {
self.prost.sstable_size_mb.unwrap()
}

pub fn block_size_kb(&self) -> u32 {
self.prost.block_size_kb.unwrap()
}

pub fn bloom_false_positive(&self) -> f64 {
self.prost.bloom_false_positive.unwrap()
}

// TODO(zhidong): Only read from system params in v0.1.18.
pub fn state_store(&self, from_local: String) -> String {
let from_prost = self.prost.state_store.as_ref().unwrap();
if from_prost.is_empty() {
warn!("--state-store is not specified on meta node, reading from CLI instead");
from_local
} else {
from_prost.clone()
}
}

pub fn data_directory(&self) -> &str {
self.prost.data_directory.as_ref().unwrap()
}

pub fn backup_storage_url(&self) -> &str {
self.prost.backup_storage_url.as_ref().unwrap()
}

pub fn backup_storage_directory(&self) -> &str {
self.prost.backup_storage_directory.as_ref().unwrap()
}

pub fn to_kv(&self) -> Vec<(String, String)> {
system_params_to_kv(&self.prost).unwrap()
}
}
3 changes: 2 additions & 1 deletion src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

use std::collections::HashMap;

use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockSnapshot;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::CreatingJobInfo;
use risingwave_rpc_client::error::Result;
use risingwave_rpc_client::{HummockMetaClient, MetaClient, SystemParamsReader};
use risingwave_rpc_client::{HummockMetaClient, MetaClient};

/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_common::catalog::{
DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME,
};
use risingwave_common::error::Result;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
Expand All @@ -42,7 +43,6 @@ use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_pb::user::update_user_request::UpdateField;
use risingwave_pb::user::{GrantPrivilege, UserInfo};
use risingwave_rpc_client::error::Result as RpcResult;
use risingwave_rpc_client::SystemParamsReader;
use tempfile::{Builder, NamedTempFile};

use crate::catalog::catalog_service::CatalogWriter;
Expand Down
21 changes: 20 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ use crate::barrier::snapshot::SnapshotManager;
use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::HummockManagerRef;
use crate::manager::{
CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, MetaSrvEnv, WorkerId,
CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, MetaSrvEnv,
WorkerId,
};
use crate::model::{ActorId, BarrierManagerState};
use crate::rpc::metrics::MetaMetrics;
Expand Down Expand Up @@ -564,6 +565,12 @@ where
let mut barrier_timer: Option<HistogramTimer> = None;
let (barrier_complete_tx, mut barrier_complete_rx) = tokio::sync::mpsc::unbounded_channel();
let mut checkpoint_control = CheckpointControl::new(self.metrics.clone());
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
self.env
.notification_manager()
.insert_local_sender(local_notification_tx)
.await;
loop {
tokio::select! {
biased;
Expand All @@ -572,6 +579,10 @@ where
tracing::info!("Barrier manager is stopped");
return;
}
// Checkpoint frequency change
notification = local_notification_rx.recv() => {
self.handle_local_notification(notification.unwrap());
}
result = barrier_complete_rx.recv() => {
checkpoint_control.update_barrier_nums_metrics();

Expand Down Expand Up @@ -982,6 +993,14 @@ where
pub async fn get_ddl_progress(&self) -> Vec<DdlProgress> {
self.tracker.lock().await.gen_ddl_progress()
}

/// Only handle `SystemParamsChange`.
fn handle_local_notification(&self, notification: LocalNotification) {
if let LocalNotification::SystemParamsChange(p) = notification {
self.scheduled_barriers
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
}
}
}

pub type BarrierManagerRef<S> = Arc<GlobalBarrierManager<S>>;
Expand Down
13 changes: 10 additions & 3 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct Inner {
/// Force checkpoint in next barrier.
force_checkpoint: AtomicBool,

checkpoint_frequency: usize,
checkpoint_frequency: AtomicUsize,
}

/// The sender side of the barrier scheduling queue.
Expand Down Expand Up @@ -73,7 +73,7 @@ impl<S: MetaStore> BarrierScheduler<S> {
queue: RwLock::new(VecDeque::new()),
changed_tx: watch::channel(()).0,
num_uncheckpointed_barrier: AtomicUsize::new(0),
checkpoint_frequency,
checkpoint_frequency: AtomicUsize::new(checkpoint_frequency),
force_checkpoint: AtomicBool::new(false),
});

Expand Down Expand Up @@ -281,7 +281,7 @@ impl ScheduledBarriers {
self.inner
.num_uncheckpointed_barrier
.load(Ordering::Relaxed)
>= self.inner.checkpoint_frequency
>= self.inner.checkpoint_frequency.load(Ordering::Relaxed)
|| self.inner.force_checkpoint.load(Ordering::Relaxed)
}

Expand All @@ -290,6 +290,13 @@ impl ScheduledBarriers {
self.inner.force_checkpoint.store(true, Ordering::Relaxed)
}

/// Update the `checkpoint_frequency`
pub fn set_checkpoint_frequency(&self, frequency: usize) {
self.inner
.checkpoint_frequency
.store(frequency, Ordering::Relaxed);
}

/// Update the `num_uncheckpointed_barrier`
fn update_num_uncheckpointed_barrier(&self, checkpoint: bool) {
if checkpoint {
Expand Down
17 changes: 8 additions & 9 deletions src/meta/src/hummock/compaction_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,16 +735,15 @@ mod tests {
fail::remove(fp_cancel_compact_task);
assert_eq!(hummock_manager.list_all_tasks_ids().await.len(), 1);
// Notified to retry cancellation.
let mut task_to_cancel = match rx.recv().await.unwrap() {
LocalNotification::WorkerNodeIsDeleted(_) => {
panic!()
}
LocalNotification::CompactionTaskNeedCancel(task_to_cancel) => task_to_cancel,
if let LocalNotification::CompactionTaskNeedCancel(mut task_to_cancel) =
rx.recv().await.unwrap()
{
hummock_manager
.cancel_compact_task(&mut task_to_cancel, TaskStatus::ManualCanceled)
.await
.unwrap();
};
hummock_manager
.cancel_compact_task(&mut task_to_cancel, TaskStatus::ManualCanceled)
.await
.unwrap();

assert!(hummock_manager.list_all_tasks_ids().await.is_empty());

// Succeeded.
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ where
tracing::info!("Cancelled compaction task {}", task_id);
sync_point!("AFTER_CANCEL_COMPACTION_TASK_ASYNC");
}
LocalNotification::SystemParamsChange(_) => {}
}
}
}
2 changes: 1 addition & 1 deletion src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct MetaOpts {
pub max_idle_ms: u64,
/// Whether run in compaction detection test mode
pub compaction_deterministic_test: bool,

// TODO: this will be read from system param channel and should be removed to avoid misuse
pub checkpoint_frequency: usize,

/// Interval of GC metadata in meta store and stale SSTs in object store.
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::hummock::CompactTask;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand All @@ -37,6 +38,7 @@ pub type NotificationVersion = u64;
pub enum LocalNotification {
WorkerNodeIsDeleted(WorkerNode),
CompactionTaskNeedCancel(CompactTask),
SystemParamsChange(SystemParamsReader),
}

#[derive(Debug)]
Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ impl<S: MetaStore> SystemParamManager<S> {

mem_txn.commit();

// Sync params to other managers on the meta node only once, since it's infallible.
self.env
.notification_manager()
.notify_local_subscribers(super::LocalNotification::SystemParamsChange(
params.clone().into(),
))
.await;

Ok(())
}

Expand Down
15 changes: 10 additions & 5 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use either::Either;
use etcd_client::ConnectOptions;
use risingwave_backup::storage::ObjectStoreMetaSnapshotStorage;
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common_service::metrics_manager::MetricsManager;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
Expand Down Expand Up @@ -314,6 +315,10 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
let registry = meta_metrics.registry();
monitor_process(registry).unwrap();

let system_params_manager =
Arc::new(SystemParamManager::new(env.clone(), init_system_params).await?);
let system_params_reader: SystemParamsReader = system_params_manager.get_params().await.into();

let cluster_manager = Arc::new(
ClusterManager::new(env.clone(), max_heartbeat_interval)
.await
Expand Down Expand Up @@ -360,8 +365,10 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
tokio::spawn(dashboard_service.serve(address_info.ui_path));
}

let (barrier_scheduler, scheduled_barriers) =
BarrierScheduler::new_pair(hummock_manager.clone(), env.opts.checkpoint_frequency);
let (barrier_scheduler, scheduled_barriers) = BarrierScheduler::new_pair(
hummock_manager.clone(),
system_params_reader.checkpoint_frequency() as usize,
);

let source_manager = Arc::new(
SourceManager::new(
Expand Down Expand Up @@ -442,8 +449,6 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
backup_manager.clone(),
compactor_manager.clone(),
));
let system_params_manager =
Arc::new(SystemParamManager::new(env.clone(), init_system_params).await?);

let ddl_srv = DdlServiceImpl::<S>::new(
env.clone(),
Expand Down Expand Up @@ -490,7 +495,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
);
let health_srv = HealthServiceImpl::new();
let backup_srv = BackupServiceImpl::new(backup_manager);
let system_params_srv = SystemParamsServiceImpl::new(system_params_manager);
let system_params_srv = SystemParamsServiceImpl::new(system_params_manager.clone());

if let Some(prometheus_addr) = address_info.prometheus_addr {
MetricsManager::boot_metrics_service(
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod stream_client;
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
pub use connector_client::ConnectorClient;
pub use hummock_meta_client::{CompactTaskItem, HummockMetaClient};
pub use meta_client::{MetaClient, SystemParamsReader};
pub use meta_client::MetaClient;
pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef};

#[async_trait]
Expand Down
Loading

0 comments on commit 7cadc39

Please sign in to comment.