Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce LeadershipChangeNotifier and LeadershipChangeListener #4817

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,20 @@ pub enum Error {
source: common_procedure::Error,
},

#[snafu(display("Failed to start procedure manager"))]
StartProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::Error,
},

#[snafu(display("Failed to stop procedure manager"))]
StopProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::Error,
},

#[snafu(display(
"Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
))]
Expand Down Expand Up @@ -715,7 +729,9 @@ impl ErrorExt for Error {

SubmitProcedure { source, .. }
| QueryProcedure { source, .. }
| WaitProcedure { source, .. } => source.status_code(),
| WaitProcedure { source, .. }
| StartProcedureManager { source, .. }
| StopProcedureManager { source, .. } => source.status_code(),
RegisterProcedureLoader { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
OperateDatanode { source, .. } => source.status_code(),
Expand Down
156 changes: 156 additions & 0 deletions src/common/meta/src/leadership_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use common_telemetry::error;

use crate::error::Result;

pub type LeadershipChangeNotifierCustomizerRef = Arc<dyn LeadershipChangeNotifierCustomizer>;

/// A trait for customizing the leadership change notifier.
pub trait LeadershipChangeNotifierCustomizer: Send + Sync {
fn customize(&self, notifier: &mut LeadershipChangeNotifier);
}

/// A trait for handling leadership change events in a distributed system.
#[async_trait]
pub trait LeadershipChangeListener: Send + Sync {
/// Returns the listener name.
fn name(&self) -> &str;

/// Called when the node transitions to the leader role.
async fn on_leader_start(&self) -> Result<()>;

/// Called when the node transitions to the follower role.
async fn on_leader_stop(&self) -> Result<()>;
}

/// A notifier for leadership change events.
#[derive(Default)]
pub struct LeadershipChangeNotifier {
listeners: Vec<Arc<dyn LeadershipChangeListener>>,
}

impl LeadershipChangeNotifier {
/// Adds a listener to the notifier.
pub fn add_listener(&mut self, listener: Arc<dyn LeadershipChangeListener>) {
self.listeners.push(listener);
}

/// Notify all listeners that the node has become a leader.
pub async fn notify_on_leader_start(&self) {
for listener in &self.listeners {
if let Err(err) = listener.on_leader_start().await {
error!(
err;
"Failed to notify listener: {}, event 'on_leader_start'",
listener.name()
);
}
}
}

/// Notify all listeners that the node has become a follower.
pub async fn notify_on_leader_stop(&self) {
for listener in &self.listeners {
if let Err(err) = listener.on_leader_stop().await {
error!(
err;
"Failed to notify listener: {}, event: 'on_follower_start'",
listener.name()
);
}
}
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use super::*;

struct MockListener {
name: String,
on_leader_start_fn: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
on_follower_start_fn: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
}

#[async_trait::async_trait]
impl LeadershipChangeListener for MockListener {
fn name(&self) -> &str {
&self.name
}

async fn on_leader_start(&self) -> Result<()> {
if let Some(f) = &self.on_leader_start_fn {
return f();
}
Ok(())
}

async fn on_leader_stop(&self) -> Result<()> {
if let Some(f) = &self.on_follower_start_fn {
return f();
}
Ok(())
}
}

#[tokio::test]
async fn test_leadership_change_notifier() {
let mut notifier = LeadershipChangeNotifier::default();
let listener1 = Arc::new(MockListener {
name: "listener1".to_string(),
on_leader_start_fn: None,
on_follower_start_fn: None,
});
let called_on_leader_start = Arc::new(AtomicBool::new(false));
let called_on_follower_start = Arc::new(AtomicBool::new(false));
let called_on_leader_start_moved = called_on_leader_start.clone();
let called_on_follower_start_moved = called_on_follower_start.clone();
let listener2 = Arc::new(MockListener {
name: "listener2".to_string(),
on_leader_start_fn: Some(Box::new(move || {
called_on_leader_start_moved.store(true, Ordering::Relaxed);
Ok(())
})),
on_follower_start_fn: Some(Box::new(move || {
called_on_follower_start_moved.store(true, Ordering::Relaxed);
Ok(())
})),
});

notifier.add_listener(listener1);
notifier.add_listener(listener2);

let listener1 = notifier.listeners.first().unwrap();
let listener2 = notifier.listeners.get(1).unwrap();

assert_eq!(listener1.name(), "listener1");
assert_eq!(listener2.name(), "listener2");

notifier.notify_on_leader_start().await;
assert!(!called_on_follower_start.load(Ordering::Relaxed));
assert!(called_on_leader_start.load(Ordering::Relaxed));

notifier.notify_on_leader_stop().await;
assert!(called_on_follower_start.load(Ordering::Relaxed));
assert!(called_on_leader_start.load(Ordering::Relaxed));
}
}
1 change: 1 addition & 0 deletions src/common/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod heartbeat;
pub mod instruction;
pub mod key;
pub mod kv_backend;
pub mod leadership_notifier;
pub mod lock_key;
pub mod metrics;
pub mod node_manager;
Expand Down
17 changes: 17 additions & 0 deletions src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ pub mod kafka;
use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use common_wal::config::MetasrvWalConfig;
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};

use crate::error::{EncodeWalOptionsSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::wal_options_allocator::kafka::topic_manager::TopicManager as KafkaTopicManager;

/// Allocates wal options in region granularity.
Expand Down Expand Up @@ -94,6 +96,21 @@ impl WalOptionsAllocator {
}
}

#[async_trait]
impl LeadershipChangeListener for WalOptionsAllocator {
fn name(&self) -> &str {
"WalOptionsAllocator"
}

async fn on_leader_start(&self) -> Result<()> {
self.start().await
}

async fn on_leader_stop(&self) -> Result<()> {
Ok(())
}
}

/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
pub fn allocate_region_wal_options(
regions: Vec<RegionNumber>,
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;

pub use crate::error::Result;

mod greptimedb_telemetry;
Expand Down
62 changes: 30 additions & 32 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::leadership_notifier::{
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
};
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
Expand Down Expand Up @@ -56,6 +59,7 @@ use crate::handler::HeartbeatHandlerGroupRef;
use crate::lease::lookup_datanode_peer;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::region::supervisor::RegionSupervisorTickerRef;
use crate::selector::{Selector, SelectorType};
Expand Down Expand Up @@ -291,17 +295,15 @@ pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<
pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;

pub struct MetaStateHandler {
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
subscribe_manager: Option<SubscriptionManagerRef>,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
leadership_change_notifier: LeadershipChangeNotifier,
state: StateRef,
}

impl MetaStateHandler {
pub async fn on_become_leader(&self) {
pub async fn on_leader_start(&self) {
self.state.write().unwrap().next_state(become_leader(false));

if let Err(e) = self.leader_cached_kv_backend.load().await {
Expand All @@ -310,33 +312,19 @@ impl MetaStateHandler {
self.state.write().unwrap().next_state(become_leader(true));
}

if let Some(ticker) = self.region_supervisor_ticker.as_ref() {
ticker.start();
}

if let Err(e) = self.procedure_manager.start().await {
error!(e; "Failed to start procedure manager");
}

if let Err(e) = self.wal_options_allocator.start().await {
error!(e; "Failed to start wal options allocator");
}
self.leadership_change_notifier
.notify_on_leader_start()
.await;

self.greptimedb_telemetry_task.should_report(true);
}

pub async fn on_become_follower(&self) {
pub async fn on_leader_stop(&self) {
self.state.write().unwrap().next_state(become_follower());

// Stops the procedures.
if let Err(e) = self.procedure_manager.stop().await {
error!(e; "Failed to stop procedure manager");
}

if let Some(ticker) = self.region_supervisor_ticker.as_ref() {
// Stops the supervisor ticker.
ticker.stop();
}
self.leadership_change_notifier
.notify_on_leader_stop()
.await;

// Suspends reporting.
self.greptimedb_telemetry_task.should_report(false);
Expand Down Expand Up @@ -410,15 +398,25 @@ impl Metasrv {
greptimedb_telemetry_task
.start()
.context(StartTelemetryTaskSnafu)?;
let region_supervisor_ticker = self.region_supervisor_ticker.clone();

// Builds leadership change notifier.
let mut leadership_change_notifier = LeadershipChangeNotifier::default();
leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
leadership_change_notifier
.add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
}
if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
customizer.customize(&mut leadership_change_notifier);
}

let state_handler = MetaStateHandler {
greptimedb_telemetry_task,
subscribe_manager,
procedure_manager,
wal_options_allocator: self.wal_options_allocator.clone(),
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
region_supervisor_ticker,
leadership_change_notifier,
};
let _handle = common_runtime::spawn_global(async move {
loop {
Expand All @@ -429,12 +427,12 @@ impl Metasrv {
info!("Leader's cache has bean cleared on leader change: {msg}");
match msg {
LeaderChangeMessage::Elected(_) => {
state_handler.on_become_leader().await;
state_handler.on_leader_start().await;
}
LeaderChangeMessage::StepDown(leader) => {
error!("Leader :{:?} step down", leader);

state_handler.on_become_follower().await;
state_handler.on_leader_stop().await;
}
}
}
Expand All @@ -448,7 +446,7 @@ impl Metasrv {
}
}

state_handler.on_become_follower().await;
state_handler.on_leader_stop().await;
});

// Register candidate and keep lease in background.
Expand Down
Loading