Skip to content

Commit

Permalink
feat: heartbeat handler control (#2780)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Nov 21, 2023
1 parent 5f87b1f commit dc351a6
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 88 deletions.
26 changes: 18 additions & 8 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,16 @@ pub trait HeartbeatHandler: Send + Sync {
req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()>;
) -> Result<HandleControl>;
}

/// HandleControl
///
/// Controls process of handling heartbeat request.
#[derive(PartialEq)]
pub enum HandleControl {
Continue,
Done,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -246,15 +255,16 @@ impl HeartbeatHandlerGroup {
})?;

for NameCachedHandler { name, handler } in handlers.iter() {
if ctx.is_skip_all() {
break;
if !handler.is_acceptable(role) {
continue;
}

if handler.is_acceptable(role) {
let _timer = METRIC_META_HANDLER_EXECUTE
.with_label_values(&[*name])
.start_timer();
handler.handle(&req, &mut ctx, &mut acc).await?;
let _timer = METRIC_META_HANDLER_EXECUTE
.with_label_values(&[*name])
.start_timer();

if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
break;
}
}
let header = std::mem::take(&mut acc.header);
Expand Down
32 changes: 20 additions & 12 deletions src/meta-srv/src/handler/check_leader_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use api::v1::meta::{Error, HeartbeatRequest, Role};
use common_telemetry::warn;

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

pub struct CheckLeaderHandler;
Expand All @@ -32,17 +32,25 @@ impl HeartbeatHandler for CheckLeaderHandler {
req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if let Some(election) = &ctx.election {
if election.is_leader() {
return Ok(());
}
if let Some(header) = &mut acc.header {
header.error = Some(Error::is_not_leader());
ctx.set_skip_all();
warn!("Received a heartbeat {:?}, but the current node is not the leader, so the heartbeat will be ignored.", req.header);
}
) -> Result<HandleControl> {
let Some(election) = &ctx.election else {
return Ok(HandleControl::Continue);
};

if election.is_leader() {
return Ok(HandleControl::Continue);
}
Ok(())

warn!(
"A heartbeat was received {:?}, however, since the current node is not the leader,\
this heartbeat will be disregarded.",
req.header
);

if let Some(header) = &mut acc.header {
header.error = Some(Error::is_not_leader());
}

return Ok(HandleControl::Done);
}
}
8 changes: 4 additions & 4 deletions src/meta-srv/src/handler/collect_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_telemetry::warn;

use super::node_stat::Stat;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

pub struct CollectStatsHandler;
Expand All @@ -33,11 +33,11 @@ impl HeartbeatHandler for CollectStatsHandler {
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
if req.mailbox_message.is_some() {
// If the heartbeat is a mailbox message, it may have no other valid information,
// so we don't need to collect stats.
return Ok(());
return Ok(HandleControl::Continue);
}

match Stat::try_from(req.clone()) {
Expand All @@ -49,6 +49,6 @@ impl HeartbeatHandler for CollectStatsHandler {
}
};

Ok(())
Ok(HandleControl::Continue)
}
}
9 changes: 5 additions & 4 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner};
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::{Context, ElectionRef};
use crate::procedure::region_failover::RegionFailoverManager;

Expand Down Expand Up @@ -70,15 +70,15 @@ impl HeartbeatHandler for RegionFailureHandler {
_: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
if ctx.is_infancy {
self.failure_detect_runner
.send_control(FailureDetectControl::Purge)
.await;
}

let Some(stat) = acc.stat.as_ref() else {
return Ok(());
return Ok(HandleControl::Continue);
};

let heartbeat = DatanodeHeartbeat {
Expand All @@ -101,7 +101,8 @@ impl HeartbeatHandler for RegionFailureHandler {
};

self.failure_detect_runner.send_heartbeat(heartbeat).await;
Ok(())

Ok(HandleControl::Continue)
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/meta-srv/src/handler/filter_inactive_region_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use async_trait::async_trait;
use common_telemetry::warn;

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

pub struct FilterInactiveRegionStatsHandler;
Expand All @@ -33,9 +33,9 @@ impl HeartbeatHandler for FilterInactiveRegionStatsHandler {
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
if acc.inactive_region_ids.is_empty() {
return Ok(());
return Ok(HandleControl::Continue);
}

warn!(
Expand All @@ -44,11 +44,11 @@ impl HeartbeatHandler for FilterInactiveRegionStatsHandler {
);

let Some(stat) = acc.stat.as_mut() else {
return Ok(());
return Ok(HandleControl::Continue);
};

stat.retain_active_region_stats(&acc.inactive_region_ids);

Ok(())
Ok(HandleControl::Continue)
}
}
10 changes: 5 additions & 5 deletions src/meta-srv/src/handler/keep_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_telemetry::{trace, warn};
use common_time::util as time_util;

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::{LeaseKey, LeaseValue};
use crate::metasrv::Context;

Expand All @@ -35,13 +35,13 @@ impl HeartbeatHandler for KeepLeaseHandler {
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
let HeartbeatRequest { header, peer, .. } = req;
let Some(header) = &header else {
return Ok(());
return Ok(HandleControl::Continue);
};
let Some(peer) = &peer else {
return Ok(());
return Ok(HandleControl::Continue);
};

let key = LeaseKey {
Expand Down Expand Up @@ -69,6 +69,6 @@ impl HeartbeatHandler for KeepLeaseHandler {
warn!("Failed to update lease KV, peer: {peer:?}, {err}");
}

Ok(())
Ok(HandleControl::Continue)
}
}
15 changes: 8 additions & 7 deletions src/meta-srv/src/handler/mailbox_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use api::v1::meta::{HeartbeatRequest, Role};

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

pub struct MailboxHandler;
Expand All @@ -31,12 +31,13 @@ impl HeartbeatHandler for MailboxHandler {
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if let Some(message) = &req.mailbox_message {
ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?;
ctx.set_skip_all();
}
) -> Result<HandleControl> {
let Some(message) = &req.mailbox_message else {
return Ok(HandleControl::Continue);
};

Ok(())
ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?;

Ok(HandleControl::Done)
}
}
25 changes: 14 additions & 11 deletions src/meta-srv/src/handler/on_leader_start_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use api::v1::meta::{HeartbeatRequest, Role};

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

pub struct OnLeaderStartHandler;
Expand All @@ -31,16 +31,19 @@ impl HeartbeatHandler for OnLeaderStartHandler {
_req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if let Some(election) = &ctx.election {
if election.in_infancy() {
ctx.is_infancy = true;
// TODO(weny): Unifies the multiple leader state between Context and MetaSrv.
// we can't ensure the in-memory kv has already been reset in the outside loop.
// We still use heartbeat requests to trigger resetting in-memory kv.
ctx.reset_in_memory();
}
) -> Result<HandleControl> {
let Some(election) = &ctx.election else {
return Ok(HandleControl::Continue);
};

if election.in_infancy() {
ctx.is_infancy = true;
// TODO(weny): Unifies the multiple leader state between Context and MetaSrv.
// we can't ensure the in-memory kv has already been reset in the outside loop.
// We still use heartbeat requests to trigger resetting in-memory kv.
ctx.reset_in_memory();
}
Ok(())

Ok(HandleControl::Continue)
}
}
12 changes: 5 additions & 7 deletions src/meta-srv/src/handler/persist_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use snafu::ResultExt;

use crate::error::{self, Result};
use crate::handler::node_stat::Stat;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::{StatKey, StatValue};
use crate::metasrv::Context;

Expand Down Expand Up @@ -82,9 +82,9 @@ impl HeartbeatHandler for PersistStatsHandler {
_req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
let Some(current_stat) = acc.stat.take() else {
return Ok(());
return Ok(HandleControl::Continue);
};

let key = current_stat.stat_key();
Expand Down Expand Up @@ -118,7 +118,7 @@ impl HeartbeatHandler for PersistStatsHandler {
epoch_stats.push(current_stat);

if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY {
return Ok(());
return Ok(HandleControl::Continue);
}

let value: Vec<u8> = StatValue {
Expand All @@ -137,13 +137,12 @@ impl HeartbeatHandler for PersistStatsHandler {
.await
.context(error::KvBackendSnafu)?;

Ok(())
Ok(HandleControl::Continue)
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use common_meta::key::TableMetadataManager;
Expand Down Expand Up @@ -180,7 +179,6 @@ mod tests {
meta_peer_client,
mailbox,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
};
Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/handler/publish_heartbeat_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::pubsub::{Message, PublishRef};

Expand All @@ -41,10 +41,10 @@ impl HeartbeatHandler for PublishHeartbeatHandler {
req: &HeartbeatRequest,
_: &mut Context,
_: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
let msg = Message::Heartbeat(Box::new(req.clone()));
self.publish.send_msg(msg).await;

Ok(())
Ok(HandleControl::Continue)
}
}
8 changes: 4 additions & 4 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef};
use crate::region::RegionLeaseKeeper;
Expand Down Expand Up @@ -90,9 +90,9 @@ impl HeartbeatHandler for RegionLeaseHandler {
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
let Some(stat) = acc.stat.as_ref() else {
return Ok(());
return Ok(HandleControl::Continue);
};

let regions = stat.regions();
Expand Down Expand Up @@ -152,7 +152,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
lease_seconds: self.region_lease_seconds,
});

Ok(())
Ok(HandleControl::Continue)
}
}

Expand Down
Loading

0 comments on commit dc351a6

Please sign in to comment.