Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[distributed storage] support re-syncing counters after a peer is partitioned. #343

Merged
merged 2 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions limitador/proto/distributed.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ package limitador.service.distributed.v1;
// A packet defines all the types of messages that can be sent between replication peers.
message Packet {
oneof message {
// the Hello message is used to introduce a peer to another peer. It is the first message sent by a peer.
// the hello message is used to introduce a peer to another peer. It is the first message sent by a peer.
Hello hello = 1;
// the MembershipUpdate message is used to gossip about the other peers in the cluster:
// the membership_update message is used to gossip about the other peers in the cluster:
// 1) sent after the first Hello message
// 2) sent when the membership state changes
MembershipUpdate membership_update = 2;
// the Ping message is used to request a pong from the other peer.
Ping ping = 3;
// the Pong message is used to respond to a ping.
// the ping message is used to request a pong from the other peer.
Empty ping = 3;
// the pong message is used to respond to a ping.
Pong pong = 4;
// the CounterUpdate message is used to send counter updates.
// the counter_update message is used to send counter updates.
CounterUpdate counter_update = 5;
// the re_sync_end message is used to signal that the re-sync process has ended.
Empty re_sync_end = 6;
}
}

Expand All @@ -30,8 +32,8 @@ message Hello {
optional string receiver_url = 3;
}

// A request to a peer to respond with a Pong message.
message Ping {}
// A packet message that does not have any additional data.
message Empty {}

// Pong is the response to a Ping and Hello message.
message Pong {
Expand Down
8 changes: 8 additions & 0 deletions limitador/src/storage/distributed/cr_counter_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ impl<A: Ord> CrCounterValue<A> {
(expiry.into_inner(), map)
}

pub fn local_values(&self) -> (SystemTime, &A, u64) {
(
self.expiry.clone().into_inner(),
&self.ourselves,
self.value.load(Ordering::Relaxed),
)
}

fn reset(&self, expiry: SystemTime) {
let mut guard = self.others.write().unwrap();
self.expiry.update(expiry);
Expand Down
66 changes: 54 additions & 12 deletions limitador/src/storage/distributed/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{error::Error, io::ErrorKind, pin::Pin};

use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::Sender;
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio::time::sleep;

use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Code, Request, Response, Status, Streaming};
use tracing::debug;
Expand All @@ -17,7 +17,7 @@ use crate::storage::distributed::grpc::v1::packet::Message;
use crate::storage::distributed::grpc::v1::replication_client::ReplicationClient;
use crate::storage::distributed::grpc::v1::replication_server::{Replication, ReplicationServer};
use crate::storage::distributed::grpc::v1::{
CounterUpdate, Hello, MembershipUpdate, Packet, Peer, Pong,
CounterUpdate, Empty, Hello, MembershipUpdate, Packet, Peer, Pong,
};

// clippy will barf on protobuff generated code for enum variants in
Expand Down Expand Up @@ -84,8 +84,8 @@ struct Session {

impl Session {
async fn close(&mut self) {
let mut state = self.replication_state.write().await;
if let Some(peer) = state.peer_trackers.get_mut(&self.peer_id) {
let mut replication_state = self.replication_state.write().await;
if let Some(peer) = replication_state.peer_trackers.get_mut(&self.peer_id) {
peer.session = None;
}
}
Expand All @@ -105,11 +105,50 @@ impl Session {
}))
.await?;

let mut udpates_to_send = self.broker_state.publisher.subscribe();

// start the re-sync process with the peer, start sending him all the local counter values
let (tx, mut rx) = mpsc::channel::<Option<CounterUpdate>>(1);
let peer_id = self.peer_id.clone();
let out_stream = self.out_stream.clone();
tokio::spawn(async move {
let mut counter = 0u64;
while let Some(rsync_message) = rx.recv().await {
match rsync_message {
Some(update) => {
counter += 1;
if let Err(err) = out_stream
.clone()
.send(Ok(Message::CounterUpdate(update)))
.await
{
debug!("peer: '{}': ReSyncRequest: send error: {:?}", peer_id, err);
return;
}
}
None => {
debug!(
"peer: '{}': rysnc completed, sent %d updates: {:?}",
peer_id, counter
);
_ = out_stream
.clone()
.send(Ok(Message::ReSyncEnd(Empty::default())))
.await;
}
}
}
});
self.broker_state
.on_re_sync
.try_send(tx)
.map_err(|err| match err {
TrySendError::Full(_) => Status::resource_exhausted("re-sync channel full"),
TrySendError::Closed(_) => Status::unavailable("re-sync channel closed"),
})?;

let mut updates = self.broker_state.publisher.subscribe();
loop {
tokio::select! {
update = udpates_to_send.recv() => {
update = updates.recv() => {
let update = update.map_err(|_| Status::unknown("broadcast error"))?;
self.send(Message::CounterUpdate(update)).await?;
}
Expand All @@ -123,11 +162,11 @@ impl Session {
self.process_packet(packet).await?;
},
Some(Err(err)) => {
if is_disconnect(&err) {
return if is_disconnect(&err) {
alexsnaps marked this conversation as resolved.
Show resolved Hide resolved
debug!("peer: '{}': disconnected: {:?}", self.peer_id, err);
return Ok(());
Ok(())
} else {
return Err(err);
Err(err)
}
},
}
Expand Down Expand Up @@ -289,13 +328,13 @@ fn is_disconnect(err: &Status) -> bool {

// MessageSender is used to abstract the difference between the server and client sender streams...
#[derive(Clone)]
enum MessageSender {
pub enum MessageSender {
Server(Sender<Result<Packet, Status>>),
Client(Sender<Packet>),
}

impl MessageSender {
async fn send(self, message: Result<Message, Status>) -> Result<(), Status> {
pub async fn send(self, message: Result<Message, Status>) -> Result<(), Status> {
match self {
MessageSender::Server(sender) => {
let value = message.map(|x| Packet { message: Some(x) });
Expand Down Expand Up @@ -324,6 +363,7 @@ struct BrokerState {
id: String,
publisher: broadcast::Sender<CounterUpdate>,
on_counter_update: Arc<CounterUpdateFn>,
on_re_sync: Arc<Sender<Sender<Option<CounterUpdate>>>>,
}

#[derive(Clone)]
Expand All @@ -340,6 +380,7 @@ impl Broker {
listen_address: SocketAddr,
peer_urls: Vec<String>,
on_counter_update: CounterUpdateFn,
on_re_sync: Sender<Sender<Option<CounterUpdate>>>,
) -> Broker {
let (tx, _) = broadcast::channel(16);
let publisher: broadcast::Sender<CounterUpdate> = tx;
Expand All @@ -351,6 +392,7 @@ impl Broker {
id,
publisher,
on_counter_update: Arc::new(on_counter_update),
on_re_sync: Arc::new(on_re_sync),
},
replication_state: Arc::new(RwLock::new(ReplicationState {
discovered_urls: HashSet::new(),
Expand Down
59 changes: 59 additions & 0 deletions limitador/src/storage/distributed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tracing::debug;

use crate::counter::Counter;
use crate::limit::{Limit, Namespace};
Expand Down Expand Up @@ -217,6 +220,8 @@ impl CrInMemoryStorage {
let limits = Arc::new(RwLock::new(LimitsMap::new()));

let limits_clone = limits.clone();

let (re_sync_queue_tx, mut re_sync_queue_rx) = mpsc::channel(100);
let broker = grpc::Broker::new(
identifier.clone(),
listen_address,
Expand All @@ -232,6 +237,7 @@ impl CrInMemoryStorage {
let value = limits.get(&update.key).unwrap();
value.merge((UNIX_EPOCH + Duration::from_secs(update.expires_at), values).into());
}),
re_sync_queue_tx,
);

{
Expand All @@ -241,6 +247,16 @@ impl CrInMemoryStorage {
});
}

// process the re-sync requests...
{
let limits = limits.clone();
tokio::spawn(async move {
while let Some(sender) = re_sync_queue_rx.recv().await {
process_re_sync(&limits, sender).await;
}
});
}

Self {
identifier,
limits,
Expand Down Expand Up @@ -279,6 +295,49 @@ impl CrInMemoryStorage {
}
}

async fn process_re_sync(
alexsnaps marked this conversation as resolved.
Show resolved Hide resolved
limits: &Arc<RwLock<HashMap<Vec<u8>, CrCounterValue<String>>>>,
sender: Sender<Option<CounterUpdate>>,
) {
// sending all the counters to the peer might take a while, so we don't want to lock
// the limits map for too long, lets figure first get the list of keys that needs to be sent.
let keys: Vec<_> = {
let limits = limits.read().unwrap();
limits.keys().cloned().collect()
};

for key in keys {
let update = {
let limits = limits.read().unwrap();
limits.get(&key).and_then(|store_value| {
let (expiry, ourself, value) = store_value.local_values();
if value == 0 || expiry <= SystemTime::now() {
None // no point in sending a counter that is empty
alexsnaps marked this conversation as resolved.
Show resolved Hide resolved
} else {
let values = HashMap::from([(ourself.clone(), value)]);
Some(CounterUpdate {
key: key.clone(),
values,
expires_at: expiry.duration_since(UNIX_EPOCH).unwrap().as_secs(),
})
}
})
};
// skip None, it means the counter was deleted.
if let Some(update) = update {
match sender.send(Some(update)).await {
Ok(_) => {}
Err(err) => {
debug!("Failed to send re-sync counter update to peer: {:?}", err);
break;
}
}
}
}
// signal the end of the re-sync
_ = sender.send(None).await;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
struct CounterKey {
namespace: Namespace,
Expand Down
2 changes: 0 additions & 2 deletions limitador/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,8 +1184,6 @@ mod test {
Fut: Future<Output = Vec<TestsLimiter>>,
{
let rate_limiters = create_distributed_limiters(2).await;
tokio::time::sleep(Duration::from_secs(1)).await;

let namespace = "test_namespace";
let max_hits = 3;
let limit = Limit::new(
Expand Down
Loading