Skip to content

Commit

Permalink
refac(balaner): define balancer endpoint and round-robin routing
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavodiasag committed Jun 20, 2024
1 parent eb20c98 commit 5cef969
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ axum.workspace = true
chrono.workspace = true
clap.workspace = true
eyre.workspace = true
hyper-util.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
128 changes: 100 additions & 28 deletions ctl/src/balancer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,116 @@
#![allow(dead_code)]

use std::{
collections::HashMap,
net::SocketAddr
net::IpAddr,
str::FromStr as _,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration,
};

use axum::extract::Request;
use proto::common::instance::InstanceId;
use proto::common::service::ServiceId;
use axum::{
body::Body,
extract::{Request, State},
http::{
uri::{Authority, Scheme},
HeaderValue, StatusCode, Uri,
},
response::IntoResponse,
};
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};
use proto::{
common::{instance::InstanceId, service::ServiceId},
well_known::PROXY_INSTANCE_HEADER_NAME,
};
use utils::http::{self, OptionExt as _, ResultExt as _};

struct Balancer<S> {
strategy: S,
addrs: HashMap<ServiceId, Vec<(InstanceId, SocketAddr)>>
pub struct InstanceBag {
pub instances: Vec<(InstanceId, IpAddr)>,
pub count: AtomicUsize,
}

trait Strategy {
async fn get_server(&self, _req: &Request) -> (InstanceId, SocketAddr);
#[derive(Clone)]
pub struct Balancer {
pub addrs: Arc<Mutex<HashMap<ServiceId, InstanceBag>>>,
}

impl<S> Balancer<S>
where
S: Strategy
{
pub async fn run() {
todo!();
impl Balancer {
pub fn new() -> Self {
Balancer {
addrs: Arc::new(Mutex::new(HashMap::default())),
}
}

async fn next_server(&self, _req: &Request) -> (InstanceId, SocketAddr) {
todo!();
pub fn next(&self, service: &ServiceId) -> (InstanceId, IpAddr) {
let map = self.addrs.lock().unwrap();
let bag = map.get(service).unwrap();
let count = bag.count.fetch_add(1, Ordering::Relaxed);
bag.instances[count % bag.instances.len()]
}
}

pub async fn drop_instance(&self, _id: InstanceId) {
todo!();
}

pub async fn add_instance(&self, _id: InstanceId, _at: SocketAddr) {
todo!();
}
#[derive(Clone)]
pub struct BalancerState {
pub balancer: Balancer,
pub client: Client<HttpConnector, Body>,
}

pub async fn swap_instance(_old_id: InstanceId, _new_id: InstanceId, _new_at: SocketAddr) {
todo!();
impl BalancerState {
#[must_use]
pub fn new() -> Self {
BalancerState {
balancer: Balancer::new(),
client: {
let mut connector = HttpConnector::new();
connector.set_keepalive(Some(Duration::from_secs(60)));
connector.set_nodelay(true);
Client::builder(TokioExecutor::new()).build::<_, Body>(connector)
},
}
}
}

#[axum::debug_handler]
pub async fn proxy(
State(state): State<BalancerState>,
mut req: Request,
) -> http::Result<impl IntoResponse> {
let service = extract_service_id(&mut req)?;

let (instance, server) = state.balancer.next(&service);

*req.uri_mut() = {
let uri = req.uri();
let mut parts = uri.clone().into_parts();
parts.authority = Authority::from_str(&format!("{server}")).ok();
parts.scheme = Some(Scheme::HTTP);
Uri::from_parts(parts).unwrap()
};

req.headers_mut().insert(
PROXY_INSTANCE_HEADER_NAME,
HeaderValue::from_str(&format!("{instance}")).unwrap(),
);

state
.client
.request(req)
.await
.http_error(StatusCode::BAD_GATEWAY, "bad gateway")
}

fn extract_service_id(req: &mut Request) -> http::Result<ServiceId> {
let inner = req
.headers()
.get("host")
.unwrap()
.to_str()
.ok()
.and_then(|s| s.parse().ok())
.or_http_error(StatusCode::BAD_REQUEST, "invalid service name")?;
Ok(ServiceId(inner))
}
14 changes: 11 additions & 3 deletions ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ use std::{
sync::Arc,
};

use axum::handler::Handler;
use clap::Parser;
use proto::well_known::{CTL_BALANCER_PORT, CTL_HTTP_PORT};
use tokio::task::JoinSet;
use tracing::info;
use utils::server::mk_listener;

use crate::{args::CtlArgs, discovery::Discovery, http::HttpState};
use crate::{args::CtlArgs, balancer::BalancerState, discovery::Discovery, http::HttpState};

mod args;
mod balancer;
mod discovery;
mod http;
mod balancer;

const ANY_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));

Expand All @@ -25,7 +26,7 @@ async fn main() -> eyre::Result<()> {
let args = Arc::new(CtlArgs::parse());
info!(?args, "started ctl");

let _balancer_listener = mk_listener(ANY_IP, CTL_BALANCER_PORT).await?;
let balancer_listener = mk_listener(ANY_IP, CTL_BALANCER_PORT).await?;
let http_listener = mk_listener(ANY_IP, CTL_HTTP_PORT).await?;

let mut bag = JoinSet::new();
Expand All @@ -35,6 +36,13 @@ async fn main() -> eyre::Result<()> {
discovery.run().await;
});

let balancer_state = BalancerState::new();
bag.spawn(async move {
let app = balancer::proxy.with_state(balancer_state);
info!("balancer http listening at {ANY_IP}:{CTL_BALANCER_PORT}");
axum::serve(balancer_listener, app).await.unwrap();
});

bag.spawn(async move {
let state = HttpState {
discovery: discovery_handle.clone(),
Expand Down

0 comments on commit 5cef969

Please sign in to comment.