Skip to content

Commit

Permalink
feat(ctl): define initial structure for load balancer (#47)
Browse files Browse the repository at this point in the history
Co-authored-by: Luiz Felipe Gonçalves <git@luizfelipe.dev>
  • Loading branch information
gustavodiasag and lffg authored Jun 21, 2024
1 parent 206b29c commit 364c792
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ axum.workspace = true
chrono.workspace = true
clap.workspace = true
eyre.workspace = true
hyper-util.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
135 changes: 135 additions & 0 deletions ctl/src/balancer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
str::FromStr as _,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration,
};

use axum::{
body::Body,
extract::{ConnectInfo, Request, State},
http::{
uri::{Authority, Scheme},
HeaderValue, StatusCode, Uri,
},
response::IntoResponse,
};
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};
use proto::{
common::{instance::InstanceId, service::ServiceId},
well_known::{PROXY_FORWARDED_HEADER_NAME, PROXY_INSTANCE_HEADER_NAME},
};
use utils::http::{self, OptionExt as _, ResultExt as _};

#[derive(Default)]
pub struct InstanceBag {
pub instances: Vec<(InstanceId, IpAddr)>,
pub count: AtomicUsize,
}

#[derive(Clone)]
pub struct BalancerState {
pub addrs: Arc<Mutex<HashMap<ServiceId, InstanceBag>>>,
pub client: Client<HttpConnector, Body>,
}

impl BalancerState {
#[must_use]
pub fn new() -> (Self, BalancerHandle) {
let map = Arc::new(Mutex::new(HashMap::default()));
(
BalancerState {
addrs: map.clone(),
client: {
let mut connector = HttpConnector::new();
connector.set_keepalive(Some(Duration::from_secs(60)));
connector.set_nodelay(true);
Client::builder(TokioExecutor::new()).build::<_, Body>(connector)
},
},
BalancerHandle { addrs: map },
)
}

pub fn next(&self, service: &ServiceId) -> (InstanceId, IpAddr) {
let map = self.addrs.lock().unwrap();
let bag = map.get(service).unwrap();
let count = bag.count.fetch_add(1, Ordering::Relaxed);
bag.instances[count % bag.instances.len()]
}
}

pub struct BalancerHandle {
pub addrs: Arc<Mutex<HashMap<ServiceId, InstanceBag>>>,
}

impl BalancerHandle {
#[allow(dead_code)]
pub fn add_instance(&mut self, id: ServiceId, at: (InstanceId, IpAddr)) {
let mut map = self.addrs.lock().unwrap();
let bag = map.entry(id).or_default();
bag.instances.push(at);
}

#[allow(dead_code)]
pub fn drop_instance(&mut self, id: &ServiceId, at: (InstanceId, IpAddr)) {
let mut map = self.addrs.lock().unwrap();
let Some(bag) = map.get_mut(id) else {
return;
};
bag.instances
.retain(|(inst, addr)| inst == &at.0 && addr == &at.1);
}
}

pub async fn proxy(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(balancer): State<BalancerState>,
mut req: Request,
) -> http::Result<impl IntoResponse> {
let service = extract_service_id(&mut req)?;

let (instance, server_addr) = balancer.next(&service);

*req.uri_mut() = {
let uri = req.uri();
let mut parts = uri.clone().into_parts();
parts.authority = Authority::from_str(&server_addr.to_string()).ok();
parts.scheme = Some(Scheme::HTTP);
Uri::from_parts(parts).unwrap()
};

req.headers_mut().insert(
PROXY_INSTANCE_HEADER_NAME,
HeaderValue::from_str(&instance.to_string()).unwrap(),
);
req.headers_mut().insert(
PROXY_FORWARDED_HEADER_NAME,
HeaderValue::from_str(&addr.ip().to_string()).unwrap(),
);

balancer
.client
.request(req)
.await
.http_error(StatusCode::BAD_GATEWAY, "bad gateway")
}

fn extract_service_id(req: &mut Request) -> http::Result<ServiceId> {
let inner = req
.headers()
.get("Host")
.unwrap()
.to_str()
.ok()
.and_then(|s| s.parse().ok())
.or_http_error(StatusCode::BAD_REQUEST, "invalid service name")?;
Ok(ServiceId(inner))
}
15 changes: 13 additions & 2 deletions ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ use std::{
sync::Arc,
};

use axum::handler::Handler;
use clap::Parser;
use proto::well_known::{CTL_BALANCER_PORT, CTL_HTTP_PORT};
use tokio::task::JoinSet;
use tracing::info;
use utils::server::mk_listener;

use crate::{args::CtlArgs, http::HttpState, worker_mgr::WorkerMgr};
use crate::{args::CtlArgs, balancer::BalancerState, http::HttpState, worker_mgr::WorkerMgr};

mod args;
mod balancer;
mod http;
mod worker_mgr;

Expand All @@ -24,7 +26,7 @@ async fn main() -> eyre::Result<()> {
let args = Arc::new(CtlArgs::parse());
info!(?args, "started ctl");

let _balancer_listener = mk_listener(ANY_IP, CTL_BALANCER_PORT).await?;
let balancer_listener = mk_listener(ANY_IP, CTL_BALANCER_PORT).await?;
let http_listener = mk_listener(ANY_IP, CTL_HTTP_PORT).await?;

let mut bag = JoinSet::new();
Expand All @@ -34,6 +36,15 @@ async fn main() -> eyre::Result<()> {
worker_mgr.run().await;
});

let (balancer, _balancer_handle) = BalancerState::new();
bag.spawn(async move {
let app = balancer::proxy
.with_state(balancer)
.into_make_service_with_connect_info::<SocketAddr>();
info!("balancer http listening at {ANY_IP}:{CTL_BALANCER_PORT}");
axum::serve(balancer_listener, app).await.unwrap();
});

bag.spawn(async move {
let state = HttpState {
worker_mgr: worker_mgr_handle,
Expand Down
1 change: 1 addition & 0 deletions proto/src/well_known.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

pub const GRACEFUL_SHUTDOWN_DEADLINE: Duration = Duration::from_secs(20);

pub const PROXY_FORWARDED_HEADER_NAME: &str = "X-Tuc-Fwd-For";
pub const PROXY_INSTANCE_HEADER_NAME: &str = "X-Tuc-Inst";

pub const CTL_HTTP_PORT: u16 = 7070;
Expand Down

0 comments on commit 364c792

Please sign in to comment.