From 0e562d1a2d499c55b05084e634ef20b37fe4e16b Mon Sep 17 00:00:00 2001 From: Dessalines Date: Sat, 26 Mar 2022 14:51:01 -0400 Subject: [PATCH 1/4] Rate limit websocket joins. --- Cargo.lock | 4 ++++ Cargo.toml | 1 + crates/apub/Cargo.toml | 1 + crates/apub/src/objects/mod.rs | 2 +- crates/utils/Cargo.toml | 1 + crates/utils/src/rate_limit/mod.rs | 6 +++--- crates/websocket/Cargo.toml | 1 + crates/websocket/src/routes.rs | 22 +++++++++++++++++++++- src/main.rs | 3 ++- 9 files changed, 35 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 430f18d137..74b66e8898 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1962,6 +1962,7 @@ dependencies = [ "lemmy_utils", "lemmy_websocket", "once_cell", + "parking_lot 0.12.0", "percent-encoding", "rand 0.8.4", "reqwest", @@ -2129,6 +2130,7 @@ dependencies = [ "openssl", "opentelemetry", "opentelemetry-otlp", + "parking_lot 0.12.0", "reqwest", "reqwest-middleware", "reqwest-tracing", @@ -2166,6 +2168,7 @@ dependencies = [ "lettre", "once_cell", "openssl", + "parking_lot 0.12.0", "percent-encoding", "rand 0.8.4", "regex", @@ -2204,6 +2207,7 @@ dependencies = [ "lemmy_db_views_actor", "lemmy_utils", "opentelemetry", + "parking_lot 0.12.0", "rand 0.8.4", "reqwest", "reqwest-middleware", diff --git a/Cargo.toml b/Cargo.toml index 90fe4658ff..e3a294e080 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,3 +75,4 @@ doku = "0.10.2" opentelemetry = { version = "0.16", features = ["rt-tokio"] } opentelemetry-otlp = "0.9" tracing-opentelemetry = "0.16" +parking_lot = "0.12" diff --git a/crates/apub/Cargo.toml b/crates/apub/Cargo.toml index cc640c5121..fce4f406c7 100644 --- a/crates/apub/Cargo.toml +++ b/crates/apub/Cargo.toml @@ -50,6 +50,7 @@ background-jobs = "0.11.0" reqwest = { version = "0.11.7", features = ["json"] } html2md = "0.2.13" once_cell = "1.8.0" +parking_lot = "0.12" [dev-dependencies] serial_test = "0.5.1" diff --git a/crates/apub/src/objects/mod.rs b/crates/apub/src/objects/mod.rs index 9f939aaa86..b387564a78 100644 --- a/crates/apub/src/objects/mod.rs +++ b/crates/apub/src/objects/mod.rs @@ -58,10 +58,10 @@ pub(crate) mod tests { LemmyError, }; use lemmy_websocket::{chat_server::ChatServer, LemmyContext}; + use parking_lot::Mutex; use reqwest::Client; use reqwest_middleware::ClientBuilder; use std::sync::Arc; - use tokio::sync::Mutex; // TODO: would be nice if we didnt have to use a full context for tests. // or at least write a helper function so this code is shared with main.rs diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index 0a25195176..475d397cc7 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -48,6 +48,7 @@ uuid = { version = "0.8.2", features = ["serde", "v4"] } encoding = "0.2.33" html2text = "0.2.1" rosetta-i18n = "0.1" +parking_lot = "0.12" [build-dependencies] rosetta-build = "0.1" diff --git a/crates/utils/src/rate_limit/mod.rs b/crates/utils/src/rate_limit/mod.rs index 6027520f0a..f52d81b1c8 100644 --- a/crates/utils/src/rate_limit/mod.rs +++ b/crates/utils/src/rate_limit/mod.rs @@ -4,6 +4,7 @@ use actix_web::{ HttpResponse, }; use futures::future::{ok, Ready}; +use parking_lot::Mutex; use rate_limiter::{RateLimitType, RateLimiter}; use std::{ future::Future, @@ -12,7 +13,6 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::sync::Mutex; pub mod rate_limiter; @@ -73,8 +73,6 @@ impl RateLimited { // and the operation here locks only long enough to clone let rate_limit = self.rate_limit_config; - let mut limiter = self.rate_limiter.lock().await; - let (kind, interval) = match self.type_ { RateLimitType::Message => (rate_limit.message, rate_limit.message_per_second), RateLimitType::Post => (rate_limit.post, rate_limit.post_per_second), @@ -82,6 +80,8 @@ impl RateLimited { RateLimitType::Image => (rate_limit.image, rate_limit.image_per_second), RateLimitType::Comment => (rate_limit.comment, rate_limit.comment_per_second), }; + let mut limiter = self.rate_limiter.lock(); + limiter.check_rate_limit_full(self.type_, &ip_addr, kind, interval) } } diff --git a/crates/websocket/Cargo.toml b/crates/websocket/Cargo.toml index fd10a74628..0dd30149dc 100644 --- a/crates/websocket/Cargo.toml +++ b/crates/websocket/Cargo.toml @@ -36,3 +36,4 @@ actix-web = { version = "4.0.0", default-features = false, features = ["rustls"] actix-web-actors = { version = "4.1.0", default-features = false } opentelemetry = "0.16" tracing-opentelemetry = "0.16" +parking_lot = "0.12" diff --git a/crates/websocket/src/routes.rs b/crates/websocket/src/routes.rs index 41b1782466..69c5123aa6 100644 --- a/crates/websocket/src/routes.rs +++ b/crates/websocket/src/routes.rs @@ -6,7 +6,7 @@ use crate::{ use actix::prelude::*; use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web_actors::ws; -use lemmy_utils::{utils::get_ip, ConnectionId, IpAddr}; +use lemmy_utils::{rate_limit::RateLimit, utils::get_ip, ConnectionId, IpAddr}; use std::time::{Duration, Instant}; use tracing::{debug, error, info}; @@ -20,6 +20,7 @@ pub async fn chat_route( req: HttpRequest, stream: web::Payload, context: web::Data, + rate_limiter: web::Data, ) -> Result { ws::start( WsSession { @@ -27,6 +28,7 @@ pub async fn chat_route( id: 0, hb: Instant::now(), ip: get_ip(&req.connection_info()), + rate_limiter: rate_limiter.as_ref().to_owned(), }, &req, stream, @@ -41,6 +43,8 @@ struct WsSession { /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), /// otherwise we drop connection. hb: Instant, + /// A rate limiter for websocket joins + rate_limiter: RateLimit, } impl Actor for WsSession { @@ -57,6 +61,22 @@ impl Actor for WsSession { // before processing any other events. // across all routes within application let addr = ctx.address(); + + // Rate limit check a joining IP first, before doing any parsing + self + .rate_limiter + .message() + .check(self.ip.to_owned()) + .into_actor(self) + .then(|res, act, ctx| { + if !res { + debug!("Websocket join with IP: {} has been rate limited.", act.ip); + ctx.stop() + } + actix::fut::ready(()) + }) + .wait(ctx); + self .cs_addr .send(Connect { diff --git a/src/main.rs b/src/main.rs index d0b3c04ee4..9297334e58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,11 +29,11 @@ use lemmy_utils::{ REQWEST_TIMEOUT, }; use lemmy_websocket::{chat_server::ChatServer, LemmyContext}; +use parking_lot::Mutex; use reqwest::Client; use reqwest_middleware::ClientBuilder; use reqwest_tracing::TracingMiddleware; use std::{env, sync::Arc, thread}; -use tokio::sync::Mutex; use tracing_actix_web::TracingLogger; embed_migrations!(); @@ -136,6 +136,7 @@ async fn main() -> Result<(), LemmyError> { .wrap(actix_web::middleware::Logger::default()) .wrap(TracingLogger::::new()) .app_data(Data::new(context)) + .app_data(Data::new(rate_limiter.clone())) // The routes .configure(|cfg| api_routes::config(cfg, &rate_limiter)) .configure(|cfg| lemmy_apub::http::routes::config(cfg, &settings)) From 2566ac495e3183d1616962b37492a00d09e9c20e Mon Sep 17 00:00:00 2001 From: Dessalines Date: Sat, 26 Mar 2022 19:54:36 -0400 Subject: [PATCH 2/4] Removing async on mutex lock fn. --- crates/utils/src/rate_limit/mod.rs | 4 ++-- crates/websocket/src/chat_server.rs | 14 +++++++------- crates/websocket/src/routes.rs | 27 +++++++++++++-------------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/crates/utils/src/rate_limit/mod.rs b/crates/utils/src/rate_limit/mod.rs index f52d81b1c8..69bcbcecda 100644 --- a/crates/utils/src/rate_limit/mod.rs +++ b/crates/utils/src/rate_limit/mod.rs @@ -68,7 +68,7 @@ impl RateLimit { impl RateLimited { /// Returns true if the request passed the rate limit, false if it failed and should be rejected. - pub async fn check(self, ip_addr: IpAddr) -> bool { + pub fn check(self, ip_addr: IpAddr) -> bool { // Does not need to be blocking because the RwLock in settings never held across await points, // and the operation here locks only long enough to clone let rate_limit = self.rate_limit_config; @@ -127,7 +127,7 @@ where let service = self.service.clone(); Box::pin(async move { - if rate_limited.check(ip_addr).await { + if rate_limited.check(ip_addr) { service.call(req).await } else { let (http_req, _) = req.into_parts(); diff --git a/crates/websocket/src/chat_server.rs b/crates/websocket/src/chat_server.rs index 1e95344b0c..d9de90dbef 100644 --- a/crates/websocket/src/chat_server.rs +++ b/crates/websocket/src/chat_server.rs @@ -481,19 +481,19 @@ impl ChatServer { // check if api call passes the rate limit, and generate future for later execution let (passed, fut) = if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) { let passed = match user_operation_crud { - UserOperationCrud::Register => rate_limiter.register().check(ip).await, - UserOperationCrud::CreatePost => rate_limiter.post().check(ip).await, - UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip).await, - UserOperationCrud::CreateComment => rate_limiter.comment().check(ip).await, - _ => rate_limiter.message().check(ip).await, + UserOperationCrud::Register => rate_limiter.register().check(ip), + UserOperationCrud::CreatePost => rate_limiter.post().check(ip), + UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip), + UserOperationCrud::CreateComment => rate_limiter.comment().check(ip), + _ => rate_limiter.message().check(ip), }; let fut = (message_handler_crud)(context, msg.id, user_operation_crud, data); (passed, fut) } else { let user_operation = UserOperation::from_str(op)?; let passed = match user_operation { - UserOperation::GetCaptcha => rate_limiter.post().check(ip).await, - _ => rate_limiter.message().check(ip).await, + UserOperation::GetCaptcha => rate_limiter.post().check(ip), + _ => rate_limiter.message().check(ip), }; let fut = (message_handler)(context, msg.id, user_operation, data); (passed, fut) diff --git a/crates/websocket/src/routes.rs b/crates/websocket/src/routes.rs index 69c5123aa6..c11daf7112 100644 --- a/crates/websocket/src/routes.rs +++ b/crates/websocket/src/routes.rs @@ -62,20 +62,7 @@ impl Actor for WsSession { // across all routes within application let addr = ctx.address(); - // Rate limit check a joining IP first, before doing any parsing - self - .rate_limiter - .message() - .check(self.ip.to_owned()) - .into_actor(self) - .then(|res, act, ctx| { - if !res { - debug!("Websocket join with IP: {} has been rate limited.", act.ip); - ctx.stop() - } - actix::fut::ready(()) - }) - .wait(ctx); + self.rate_limit_check(ctx); self .cs_addr @@ -118,6 +105,8 @@ impl Handler for WsSession { /// WebSocket message handler impl StreamHandler> for WsSession { fn handle(&mut self, result: Result, ctx: &mut Self::Context) { + self.rate_limit_check(ctx); + let message = match result { Ok(m) => m, Err(e) => { @@ -189,4 +178,14 @@ impl WsSession { ctx.ping(b""); }); } + + /// Check the rate limit, and stop the ctx if it fails + fn rate_limit_check(&mut self, ctx: &mut ws::WebsocketContext) { + let ip = self.ip.to_owned(); + + if !self.rate_limiter.message().check(ip) { + debug!("Websocket join with IP: {} has been rate limited.", self.ip); + ctx.stop() + } + } } From cfc5a1bb3ee5c189fedb197c6a4cbccb7146a2c9 Mon Sep 17 00:00:00 2001 From: Dessalines Date: Sat, 26 Mar 2022 20:03:01 -0400 Subject: [PATCH 3/4] Removing redundant ip --- crates/websocket/src/routes.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/websocket/src/routes.rs b/crates/websocket/src/routes.rs index c11daf7112..7ea9c00e30 100644 --- a/crates/websocket/src/routes.rs +++ b/crates/websocket/src/routes.rs @@ -181,9 +181,7 @@ impl WsSession { /// Check the rate limit, and stop the ctx if it fails fn rate_limit_check(&mut self, ctx: &mut ws::WebsocketContext) { - let ip = self.ip.to_owned(); - - if !self.rate_limiter.message().check(ip) { + if !self.rate_limiter.message().check(self.ip.to_owned()) { debug!("Websocket join with IP: {} has been rate limited.", self.ip); ctx.stop() } From 829b4e0ecc6a256afd38b78cea102c8bc19c9edd Mon Sep 17 00:00:00 2001 From: Dessalines Date: Sat, 26 Mar 2022 20:10:45 -0400 Subject: [PATCH 4/4] Return early if check fails. --- crates/websocket/src/routes.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/websocket/src/routes.rs b/crates/websocket/src/routes.rs index 7ea9c00e30..e99e683eb6 100644 --- a/crates/websocket/src/routes.rs +++ b/crates/websocket/src/routes.rs @@ -62,7 +62,9 @@ impl Actor for WsSession { // across all routes within application let addr = ctx.address(); - self.rate_limit_check(ctx); + if !self.rate_limit_check(ctx) { + return; + } self .cs_addr @@ -105,7 +107,9 @@ impl Handler for WsSession { /// WebSocket message handler impl StreamHandler> for WsSession { fn handle(&mut self, result: Result, ctx: &mut Self::Context) { - self.rate_limit_check(ctx); + if !self.rate_limit_check(ctx) { + return; + } let message = match result { Ok(m) => m, @@ -180,10 +184,12 @@ impl WsSession { } /// Check the rate limit, and stop the ctx if it fails - fn rate_limit_check(&mut self, ctx: &mut ws::WebsocketContext) { - if !self.rate_limiter.message().check(self.ip.to_owned()) { + fn rate_limit_check(&mut self, ctx: &mut ws::WebsocketContext) -> bool { + let check = self.rate_limiter.message().check(self.ip.to_owned()); + if !check { debug!("Websocket join with IP: {} has been rate limited.", self.ip); ctx.stop() } + check } }