Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jul 11, 2024
1 parent 976a814 commit ec33d8c
Showing 1 changed file with 28 additions and 37 deletions.
65 changes: 28 additions & 37 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, info, warn};
use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
Expand Down Expand Up @@ -253,20 +253,19 @@ impl Election for EtcdElection {
}
Err(_) => {
error!("Refresh lease timeout");
if self.is_leader.load(Ordering::Relaxed) {
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}

break;
}
}
}

if self.is_leader.load(Ordering::Relaxed) {
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
self.is_leader.store(false, Ordering::Relaxed);
}

Expand Down Expand Up @@ -306,35 +305,27 @@ impl EtcdElection {
) -> Result<()> {
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
if res.ttl() > 0 {
// Only after a successful `keep_alive` is the leader considered official.
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.infancy.store(true, Ordering::Relaxed);

if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
} else {
if self.is_leader.load(Ordering::Relaxed) {
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
return error::UnexpectedSnafu {
ensure!(
res.ttl() > 0,
error::UnexpectedSnafu {
violated: "Failed to refresh the lease",
}
.fail();
);

// Only after a successful `keep_alive` is the leader considered official.
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.infancy.store(true, Ordering::Relaxed);

if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
}

Expand Down

0 comments on commit ec33d8c

Please sign in to comment.