Skip to content

Commit

Permalink
refactor ranker module and add clap arg 'ranker_period'
Browse files Browse the repository at this point in the history
  • Loading branch information
sameh-farouk committed Sep 12, 2023
1 parent db70407 commit 0773121
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 14 deletions.
9 changes: 7 additions & 2 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ struct Args {
/// limits used by the rate limiter. basically a user will be only permited to send <count> messages with size <size> in a time window (usually a minute).
#[clap(long, num_args=2, value_names=["count", "size"])]
limit: Vec<usize>,

/// period in seconds used by ranker to determine the recent period of time during which failures will be considered.
/// failures that occurred outside this specified period will be disregarded.
#[clap(short, long, default_value_t = 3600)]
ranker_period: u64,
}

fn set_limits() -> Result<()> {
Expand Down Expand Up @@ -157,8 +162,8 @@ async fn app(args: &Args) -> Result<()> {
},
)
};

let r = relay::Relay::new(&args.domain, twins, opt, federation, limiter)
let ranker = relay::ranker::RelayRanker::new(Duration::from_secs(args.ranker_period));
let r = relay::Relay::new(&args.domain, twins, opt, federation, limiter, ranker)
.await
.unwrap();
r.start(&args.listen).await.unwrap();
Expand Down
11 changes: 5 additions & 6 deletions src/relay/federation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::marker::PhantomData;

use self::router::Router;
use super::switch::Sink;
use super::{ranker::RelayRanker, switch::Sink};
use crate::twin::TwinDB;
use anyhow::Result;
use bb8_redis::{
Expand All @@ -12,7 +12,6 @@ use bb8_redis::{
use prometheus::{IntCounterVec, Opts, Registry};
use workers::WorkerPool;

pub mod ranker;
mod router;

lazy_static::lazy_static! {
Expand Down Expand Up @@ -66,8 +65,8 @@ where
self
}

pub(crate) fn build(self, sink: Sink, twins: D) -> Result<Federation<D>> {
Federation::new(self, sink, twins)
pub(crate) fn build(self, sink: Sink, twins: D, ranker: RelayRanker) -> Result<Federation<D>> {
Federation::new(self, sink, twins, ranker)
}
}

Expand Down Expand Up @@ -101,11 +100,11 @@ where
D: TwinDB,
{
/// create a new federation router
fn new(opts: FederationOptions<D>, sink: Sink, twins: D) -> Result<Self> {
fn new(opts: FederationOptions<D>, sink: Sink, twins: D, ranker: RelayRanker) -> Result<Self> {
opts.registry.register(Box::new(MESSAGE_SUCCESS.clone()))?;
opts.registry.register(Box::new(MESSAGE_ERROR.clone()))?;

let runner = Router::new(sink, twins);
let runner = Router::new(sink, twins, ranker);
let workers = WorkerPool::new(runner, opts.workers);

Ok(Self {
Expand Down
6 changes: 3 additions & 3 deletions src/relay/federation/router.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Debug;

use crate::{
relay::federation::ranker::{RelayRanker, HOUR},
relay::ranker::RelayRanker,
relay::switch::{Sink, StreamID},
twin::TwinDB,
types::Envelope,
Expand All @@ -24,11 +24,11 @@ impl<D> Router<D>
where
D: TwinDB,
{
pub fn new(sink: Sink, twins: D) -> Self {
pub fn new(sink: Sink, twins: D, ranker: RelayRanker) -> Self {
Self {
sink: Some(sink),
twins: twins,

Check warning on line 30 in src/relay/federation/router.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

redundant field names in struct initialization
ranker: RelayRanker::new(HOUR),
ranker,
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ mod federation;
pub mod limiter;
mod switch;
use self::limiter::RateLimiter;
use self::ranker::RelayRanker;
use api::RelayHook;
use federation::Federation;
pub use federation::FederationOptions;
use std::sync::Arc;
use switch::Switch;
pub use switch::SwitchOptions;
pub mod ranker;

pub struct Relay<D: TwinDB, R: RateLimiter> {
switch: Arc<Switch<RelayHook>>,
Expand All @@ -37,9 +39,10 @@ where
opt: SwitchOptions,
federation: FederationOptions<D>,
limiter: R,
ranker: RelayRanker,
) -> Result<Self> {
let switch = opt.build().await?;
let federation = federation.build(switch.sink(), twins.clone())?;
let federation = federation.build(switch.sink(), twins.clone(), ranker)?;
Ok(Self {
switch: Arc::new(switch),
twins,
Expand Down
4 changes: 2 additions & 2 deletions src/relay/federation/ranker.rs → src/relay/ranker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl RelayStats {
});
log::trace!("cleaning {:?} entires", count - &self.failure_times.len());

Check warning on line 32 in src/relay/ranker/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

taken reference of right operand
}

fn add_failure(&mut self, retain: Duration) {
self.failure_times.push(SystemTime::now());
self._clean(retain);
Expand Down Expand Up @@ -91,7 +91,7 @@ impl RelayRanker {
}

/// Sort the domains of relays in ascending order based on their recent failure rate.
///
///
/// The ranking of relays is determined by the number of failures that occur during a specified period of time.
/// This ensures that the affected relay’s rank will improve over time, and messages will be routed to it again if the service recovers.
/// If multiple relays have the same failure rate, their order will be randomized
Expand Down

0 comments on commit 0773121

Please sign in to comment.