Skip to content

Commit

Permalink
refac: extract tcp listener creation to utils crate
Browse files Browse the repository at this point in the history
  • Loading branch information
lffg committed Jun 20, 2024
1 parent 46d8d11 commit 3501bcd
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 60 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tokio = { version = "1.36", features = [
"time",
"sync",
] }
tower = "0.4.13"
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1", features = ["serde", "v7"] }
Expand All @@ -48,3 +49,4 @@ cast_precision_loss = "allow"
unused_async = "allow"
enum_glob_use = "allow"
missing_errors_doc = "allow"
missing_panics_doc = "allow"
7 changes: 3 additions & 4 deletions ctl/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use axum::{routing::post, Router};
use tracing::info;
use proto::well_known::{CTL_BALANCER_PORT, CTL_HTTP_PORT};
use utils::server;

use crate::discovery::DiscoveryHandle;

Expand All @@ -23,7 +24,5 @@ pub async fn run_server(state: HttpState) {
)
.with_state(state);

let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
info!("HTTP listening at port 3000");
axum::serve(listener, app).await.unwrap();
server::listen("controller http", app, ("0.0.0.0", CTL_HTTP_PORT)).await;
}
40 changes: 3 additions & 37 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
packages = let
commonPackages = with pkgs; [
# The usual Rust profile
(rust-bin.stable."1.76.0".default.override {
(rust-bin.stable."1.79.0".default.override {
extensions = ["rust-src" "rust-analyzer" "llvm-tools"];
})
# We need a nightly version of rustfmt to format this crate
Expand All @@ -52,6 +52,7 @@
CoreFoundation
CoreServices
SystemConfiguration
IOKit
]);
in (commonPackages ++ darwinPackages);
};
Expand Down
7 changes: 6 additions & 1 deletion proto/src/clients/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod ctl;
use std::time::Duration;

pub use ctl::CtlClient;

mod worker;
Expand All @@ -14,7 +16,10 @@ pub struct BaseClient {
impl BaseClient {
#[must_use]
fn new() -> Self {
let client = reqwest::Client::new();
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
.unwrap();
Self { client }
}

Expand Down
8 changes: 4 additions & 4 deletions proto/src/well_known.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pub const GRACEFUL_SHUTDOWN_DEADLINE: Duration = Duration::from_secs(20);

pub const PROXY_INSTANCE_HEADER_NAME: &str = "X-Tuc-Inst";

pub const CTL_HTTP_PORT: u16 = 6968;
// TODO: These should be parameterized through args
pub const CTL_HTTP_PORT: u16 = 7070;
pub const CTL_BALANCER_PORT: u16 = 8080;

pub const WORKER_PROXY_PORT: u16 = 8080;
pub const WORKER_HTTP_PORT: u16 = 6969;
pub const WORKER_HTTP_PORT: u16 = 7071;
pub const WORKER_PROXY_PORT: u16 = 8081;
2 changes: 2 additions & 0 deletions utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ edition.workspace = true
axum.workspace = true
eyre.workspace = true
serde_json.workspace = true
tokio.workspace = true
tower.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true

Expand Down
1 change: 1 addition & 0 deletions utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod http;
pub mod server;
pub mod setup;
24 changes: 24 additions & 0 deletions utils/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::{convert::Infallible, future::IntoFuture, io};

use axum::{
extract::Request,
response::Response,
serve::{IncomingStream, Serve},
};
use tokio::net::{TcpListener, ToSocketAddrs};
use tower::Service;
use tracing::info;

pub async fn listen<A, M, S>(name: &'static str, mk_svc: M, addr: A)
where
A: ToSocketAddrs,
M: for<'a> Service<IncomingStream<'a>, Error = Infallible, Response = S>,
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
S::Future: Send,
Serve<M, S>: IntoFuture<Output = io::Result<()>>,
{
let listener = TcpListener::bind(addr).await.unwrap();
let addr = listener.local_addr().unwrap();
info!("{name} listening at {addr}");
axum::serve(listener, mk_svc).await.unwrap();
}
5 changes: 3 additions & 2 deletions worker/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use axum::{routing::post, Router};
use proto::well_known::WORKER_HTTP_PORT;
use utils::server;

use crate::runner::RunnerHandle;

Expand All @@ -19,6 +21,5 @@ pub async fn run_server(state: HttpState) {
)
.with_state(state);

let listener = tokio::net::TcpListener::bind("0.0.0.0:6969").await.unwrap();
axum::serve(listener, app).await.unwrap();
server::listen("worker http", app, ("0.0.0.0", WORKER_HTTP_PORT)).await;
}
13 changes: 3 additions & 10 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use axum::handler::Handler;
use bollard::Docker;
use eyre::Result;
use http::HttpState;
use proto::{clients::CtlClient, well_known};
use proto::{clients::CtlClient, well_known::WORKER_PROXY_PORT};
use runner::Runner;
use tracing::info;
use utils::server;

use crate::{args::WorkerArgs, monitor::pusher, proxy::ProxyState};

Expand Down Expand Up @@ -37,15 +38,7 @@ async fn main() -> Result<()> {

let proxy_server = tokio::spawn(async {
let app = proxy::proxy.with_state(proxy_state);
let listener = tokio::net::TcpListener::bind(("0.0.0.0", well_known::WORKER_PROXY_PORT))
.await
.unwrap();

tracing::info!(
"Proxy server listening at port {}",
well_known::WORKER_PROXY_PORT
);
axum::serve(listener, app).await.unwrap();
server::listen("worker proxy", app, ("0.0.0.0", WORKER_PROXY_PORT)).await;
});

let docker = Arc::new(Docker::connect_with_defaults().unwrap());
Expand Down
4 changes: 3 additions & 1 deletion worker/src/monitor/pusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use std::sync::Arc;
use chrono::Utc;
use proto::clients::CtlClient;
use tokio::time::sleep;
use tracing::error;
use tracing::{debug, error};

use crate::{args::WorkerArgs, monitor::collector::MetricsCollector};

pub async fn start_pusher(args: Arc<WorkerArgs>, ctl_client: CtlClient) {
let mut metrics_report: MetricsCollector = MetricsCollector::new();
debug!("pusher started");
loop {
sleep(args.metrics_report_interval).await;
debug!("sending metrics");
let metrics = metrics_report.get_metrics();
let now = Utc::now();
if let Err(error) = ctl_client.push_metrics(metrics, now).await {
Expand Down

0 comments on commit 3501bcd

Please sign in to comment.