Skip to content

Commit

Permalink
Merge pull request #324 from Kuadrant/crdt
Browse files Browse the repository at this point in the history
Crdt
  • Loading branch information
alexsnaps authored May 17, 2024
2 parents 0576636 + 907dc58 commit f6f8f18
Show file tree
Hide file tree
Showing 11 changed files with 881 additions and 5 deletions.
3 changes: 3 additions & 0 deletions limitador-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ documentation = "https://kuadrant.io/docs/limitador"
readme = "README.md"
edition = "2021"

[features]
distributed_storage = ["limitador/distributed_storage"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
Expand Down
9 changes: 9 additions & 0 deletions limitador-server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::process::Command;
fn main() -> Result<(), Box<dyn Error>> {
set_git_hash("LIMITADOR_GIT_HASH");
set_profile("LIMITADOR_PROFILE");
set_features("LIMITADOR_FEATURES");
generate_protobuf()
}

Expand All @@ -31,6 +32,14 @@ fn set_profile(env: &str) {
}
}

fn set_features(env: &str) {
let mut features = vec![];
if cfg!(feature = "distributed_storage") {
features.push("+distributed");
}
println!("cargo:rustc-env={env}={features:?}");
}

fn set_git_hash(env: &str) {
let git_sha = Command::new("/usr/bin/git")
.args(["rev-parse", "HEAD"])
Expand Down
4 changes: 1 addition & 3 deletions limitador-server/examples/limits.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
---
-
namespace: test_namespace
max_value: 10
max_value: 1000000
seconds: 60
conditions:
- "req.method == 'GET'"
variables:
- user_id
-
namespace: test_namespace
max_value: 5
Expand Down
11 changes: 11 additions & 0 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,24 @@ pub enum StorageConfiguration {
InMemory(InMemoryStorageConfiguration),
Disk(DiskStorageConfiguration),
Redis(RedisStorageConfiguration),
#[cfg(feature = "distributed_storage")]
Distributed(DistributedStorageConfiguration),
}

#[derive(PartialEq, Eq, Debug)]
pub struct InMemoryStorageConfiguration {
pub cache_size: Option<u64>,
}

#[derive(PartialEq, Eq, Debug)]
#[cfg(feature = "distributed_storage")]
pub struct DistributedStorageConfiguration {
pub name: String,
pub cache_size: Option<u64>,
pub local: String,
pub broadcast: String,
}

#[derive(PartialEq, Eq, Debug)]
pub struct DiskStorageConfiguration {
pub path: String,
Expand Down
71 changes: 69 additions & 2 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
extern crate log;
extern crate clap;

#[cfg(feature = "distributed_storage")]
use crate::config::DistributedStorageConfiguration;
use crate::config::{
Configuration, DiskStorageConfiguration, InMemoryStorageConfiguration,
RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration,
Expand All @@ -22,6 +24,8 @@ use limitador::storage::redis::{
AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_BATCH_SIZE,
DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS,
};
#[cfg(feature = "distributed_storage")]
use limitador::storage::DistributedInMemoryStorage;
use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage};
use limitador::{
storage, AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder,
Expand Down Expand Up @@ -56,6 +60,7 @@ pub mod prometheus_metrics;

const LIMITADOR_VERSION: &str = env!("CARGO_PKG_VERSION");
const LIMITADOR_PROFILE: &str = env!("LIMITADOR_PROFILE");
const LIMITADOR_FEATURES: &str = env!("LIMITADOR_FEATURES");
const LIMITADOR_HEADER: &str = "Limitador Server";

#[derive(Error, Debug)]
Expand All @@ -82,6 +87,8 @@ impl Limiter {
let rate_limiter = match config.storage {
StorageConfiguration::Redis(cfg) => Self::redis_limiter(cfg).await,
StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg),
#[cfg(feature = "distributed_storage")]
StorageConfiguration::Distributed(cfg) => Self::distributed_limiter(cfg),
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg),
};

Expand Down Expand Up @@ -153,6 +160,20 @@ impl Limiter {
Self::Blocking(rate_limiter_builder.build())
}

#[cfg(feature = "distributed_storage")]
fn distributed_limiter(cfg: DistributedStorageConfiguration) -> Self {
let storage = DistributedInMemoryStorage::new(
cfg.name,
cfg.cache_size.or_else(guess_cache_size).unwrap(),
cfg.local,
cfg.broadcast,
);
let rate_limiter_builder =
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));

Self::Blocking(rate_limiter_builder.build())
}

pub async fn load_limits_from_file<P: AsRef<Path>>(
&self,
path: &P,
Expand Down Expand Up @@ -355,12 +376,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

fn create_config() -> (Configuration, &'static str) {
let full_version: &'static str = formatcp!(
"v{} ({}) {}",
"v{} ({}) {} {}",
LIMITADOR_VERSION,
env!("LIMITADOR_GIT_HASH"),
LIMITADOR_FEATURES,
LIMITADOR_PROFILE,
);

// wire args based of defaults
let limit_arg = Arg::new("LIMITS_FILE")
.action(ArgAction::Set)
Expand Down Expand Up @@ -570,6 +591,43 @@ fn create_config() -> (Configuration, &'static str) {
),
);

#[cfg(feature = "distributed_storage")]
let cmdline = cmdline.subcommand(
Command::new("distributed")
.about("Replicates CRDT-based counters across multiple Limitador servers")
.display_order(5)
.arg(
Arg::new("NAME")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Unique name to identify this Limitador instance"),
)
.arg(
Arg::new("LOCAL")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Local IP:PORT to send datagrams from"),
)
.arg(
Arg::new("BROADCAST")
.action(ArgAction::Set)
.required(true)
.display_order(3)
.help("Broadcast IP:PORT to send datagrams to"),
)
.arg(
Arg::new("CACHE_SIZE")
.long("cache")
.short('c')
.action(ArgAction::Set)
.value_parser(value_parser!(u64))
.display_order(4)
.help("Sets the size of the cache for 'qualified counters'"),
),
);

let matches = cmdline.get_matches();

let limits_file = matches.get_one::<String>("LIMITS_FILE").unwrap();
Expand Down Expand Up @@ -635,6 +693,15 @@ fn create_config() -> (Configuration, &'static str) {
Some(("memory", sub)) => StorageConfiguration::InMemory(InMemoryStorageConfiguration {
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
}),
#[cfg(feature = "distributed_storage")]
Some(("distributed", sub)) => {
StorageConfiguration::Distributed(DistributedStorageConfiguration {
name: sub.get_one::<String>("NAME").unwrap().to_owned(),
local: sub.get_one::<String>("LOCAL").unwrap().to_owned(),
broadcast: sub.get_one::<String>("BROADCAST").unwrap().to_owned(),
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
})
}
None => storage_config_from_env(),
_ => unreachable!("Some storage wasn't configured!"),
};
Expand Down
1 change: 1 addition & 0 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ edition = "2021"
[features]
default = ["disk_storage", "redis_storage"]
disk_storage = ["rocksdb"]
distributed_storage = []
redis_storage = ["redis", "r2d2", "tokio"]
lenient_conditions = []

Expand Down
47 changes: 47 additions & 0 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,47 @@ impl AtomicExpiryTime {
}
false
}

#[allow(dead_code)]
pub fn merge(&self, other: Self) {
let mut other = other;
loop {
let now = SystemTime::now();
other = match self.merge_at(other, now) {
Ok(_) => return,
Err(other) => other,
};
}
}

pub fn merge_at(&self, other: Self, when: SystemTime) -> Result<(), Self> {
let other_exp = other.expiry.load(Ordering::SeqCst);
let expiry = self.expiry.load(Ordering::SeqCst);
if other_exp < expiry && other_exp > Self::since_epoch(when) {
// if our expiry changed, some thread observed the time window as elapsed...
// `other` can't be in the future anymore! Safely ignoring the failure scenario
return match self.expiry.compare_exchange(
expiry,
other_exp,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => Ok(()),
Err(_) => Err(other),
};
}
Ok(())
}

#[allow(dead_code)]
pub fn into_inner(self) -> SystemTime {
self.expires_at()
}

#[allow(dead_code)]
pub fn expires_at(&self) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst))
}
}

impl Clone for AtomicExpiryTime {
Expand Down Expand Up @@ -130,6 +171,12 @@ impl Clone for AtomicExpiringValue {
}
}

impl From<SystemTime> for AtomicExpiryTime {
fn from(value: SystemTime) -> Self {
AtomicExpiryTime::new(value)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit f6f8f18

Please sign in to comment.