Skip to content

Commit

Permalink
refactor: remove some useless methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Phoenix500526 committed Jun 2, 2023
1 parent 23a84d2 commit 4e11068
Showing 1 changed file with 2 additions and 30 deletions.
32 changes: 2 additions & 30 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{cmp::Ordering, collections::HashMap, fmt::Debug, iter, marker::Phantom
use event_listener::Event;
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
use parking_lot::RwLock;
use tokio::{sync::broadcast, time::timeout};
use tokio::time::timeout;
use tracing::{debug, instrument, warn};
use utils::{config::ClientTimeout, parking_lot_lock::RwLockMap};

Expand Down Expand Up @@ -48,30 +48,23 @@ struct State {
term: u64,
/// When a new leader is set, notify
leader_notify: Arc<Event>,
/// Send leader changes
leader_tx: broadcast::Sender<ServerId>,
}

impl State {
/// Create the initial client state
fn new() -> Self {
let (leader_tx, _) = broadcast::channel(1);
Self {
leader: None,
term: 0,
leader_notify: Arc::new(Event::new()),
leader_tx,
}
}

/// Set the leader and notify a waiter
fn set_leader(&mut self, id: ServerId) {
debug!("client update its leader to {id}");
if self.leader.as_ref().map_or(true, |prev_id| prev_id != &id) {
let _ignored = self.leader_tx.send(id.clone()).ok(); // it's ok to have no receiver
}
self.leader = Some(id);
self.leader_notify.notify(1);
self.leader_notify.notify(usize::MAX);
}

/// Update to the newest term and reset local cache
Expand Down Expand Up @@ -512,12 +505,6 @@ where
pub fn leader(&self) -> Option<ServerId> {
self.state.read().leader.clone()
}

/// Get the receiver for leader changes
#[inline]
pub fn leader_rx(&self) -> broadcast::Receiver<ServerId> {
self.state.read().leader_tx.subscribe()
}
}

/// Get the superquorum for curp protocol
Expand All @@ -536,21 +523,6 @@ fn superquorum(nodes: usize) -> usize {
mod tests {
use super::*;

#[allow(clippy::unwrap_used)]
#[tokio::test]
async fn will_get_notify_on_leader_changes() {
let mut state = State::new();
let mut rx = state.leader_tx.subscribe();

state.set_leader("S1".to_owned());
assert_eq!(rx.recv().await.unwrap().as_str(), "S1");

state.set_leader("S2".to_owned());
state.set_leader("S3".to_owned());
assert!(rx.recv().await.is_err());
assert_eq!(rx.recv().await.unwrap().as_str(), "S3");
}

#[test]
fn superquorum_should_work() {
assert_eq!(superquorum(11), 9);
Expand Down

0 comments on commit 4e11068

Please sign in to comment.