From 68ae363eb27163959bc7d90c9add533909f6409d Mon Sep 17 00:00:00 2001 From: Eduardo Lemos <103197804+lemosep@users.noreply.github.com> Date: Fri, 14 Jun 2024 13:42:59 -0300 Subject: [PATCH] feat(worker-runner): added runner actor & handle (#43) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Luiz Felipe Gonçalves --- Cargo.lock | 2 + ctl/src/discovery/mod.rs | 5 +- ctl/src/http/deployer/mod.rs | 3 +- proto/src/common/instance.rs | 2 +- proto/src/worker/runner.rs | 18 ++++++ worker/Cargo.toml | 5 +- worker/src/args.rs | 1 + worker/src/http/mod.rs | 19 ++++++ worker/src/http/runner/mod.rs | 12 ++++ worker/src/main.rs | 26 +++++++- worker/src/runner/container_rt.rs | 17 +++++ worker/src/runner/mod.rs | 103 ++++++++++++++++++++++++++++++ 12 files changed, 204 insertions(+), 9 deletions(-) create mode 100644 worker/src/http/mod.rs create mode 100644 worker/src/http/runner/mod.rs create mode 100644 worker/src/runner/container_rt.rs create mode 100644 worker/src/runner/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 03dd7f2..40b7009 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1712,6 +1712,7 @@ dependencies = [ name = "worker" version = "0.1.0" dependencies = [ + "axum", "clap", "eyre", "proto", @@ -1720,4 +1721,5 @@ dependencies = [ "sysinfo", "tokio", "tracing", + "uuid", ] diff --git a/ctl/src/discovery/mod.rs b/ctl/src/discovery/mod.rs index ba4bdd3..a878e2b 100644 --- a/ctl/src/discovery/mod.rs +++ b/ctl/src/discovery/mod.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use std::{collections::HashMap, net::SocketAddr}; use chrono::{DateTime, Utc}; @@ -12,11 +14,8 @@ pub struct Discovery { rx: mpsc::Receiver, // TODO: Add more information on workers workers: HashMap, - #[allow(dead_code)] services: HashMap, - #[allow(dead_code)] instances: HashMap, - #[allow(dead_code)] deployments: HashMap, } diff --git a/ctl/src/http/deployer/mod.rs b/ctl/src/http/deployer/mod.rs index 24fe0c8..1104cf0 100644 --- a/ctl/src/http/deployer/mod.rs +++ b/ctl/src/http/deployer/mod.rs @@ -4,8 +4,9 @@ use proto::ctl::deployer::{DeployReq, DeployRes}; use crate::http::HttpState; pub async fn deploy( - State(_state): State, + State(state): State, Json(_payload): Json, ) -> Json { + _ = state.discovery; todo!(); } diff --git a/proto/src/common/instance.rs b/proto/src/common/instance.rs index ddac39b..2b242f4 100644 --- a/proto/src/common/instance.rs +++ b/proto/src/common/instance.rs @@ -3,7 +3,7 @@ use uuid::Uuid; use crate::common::service::ServiceImage; -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct InstanceId(Uuid); #[derive(Debug, Serialize, Deserialize)] diff --git a/proto/src/worker/runner.rs b/proto/src/worker/runner.rs index f5e8a17..08e03c0 100644 --- a/proto/src/worker/runner.rs +++ b/proto/src/worker/runner.rs @@ -3,9 +3,27 @@ //! an instance on a given worker node. use serde::{Deserialize, Serialize}; +use uuid::Uuid; use crate::common::instance::{InstanceId, InstanceSpec}; +/// + +/// Starts a new deploy in the system +#[derive(Debug, Serialize, Deserialize)] +pub struct DeployInstanceReq { + pub id: DeployReqId, + pub instance_spec: InstanceSpec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeployInstanceRes { + pub id: DeployReqId, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeployReqId(Uuid); + /// Starts a new deploy in the system. #[derive(Debug, Serialize, Deserialize)] pub struct DeployReq { diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 266ee19..d2806c7 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -7,12 +7,15 @@ edition.workspace = true workspace = true [dependencies] +# Internal deps setup.workspace = true proto.workspace = true - +# External deps (keep alphabetically sorted) +axum.workspace = true clap.workspace = true eyre.workspace = true reqwest.workspace = true sysinfo.workspace = true tokio.workspace = true tracing.workspace = true +uuid.workspace = true diff --git a/worker/src/args.rs b/worker/src/args.rs index 038e3b6..c40daa8 100644 --- a/worker/src/args.rs +++ b/worker/src/args.rs @@ -5,6 +5,7 @@ use clap::{value_parser, Parser}; #[derive(Debug)] pub struct WorkerArgs { /// Controller's address. + #[allow(dead_code)] pub controller_addr: SocketAddr, /// Interval at which metrics are pushed to the controller. diff --git a/worker/src/http/mod.rs b/worker/src/http/mod.rs new file mode 100644 index 0000000..27d0603 --- /dev/null +++ b/worker/src/http/mod.rs @@ -0,0 +1,19 @@ +use axum::{routing::post, Router}; + +use crate::runner::RunnerHandle; + +mod runner; + +#[derive(Clone)] +pub struct HttpState { + pub runner: RunnerHandle, +} + +pub async fn run_server(state: HttpState) { + let app = Router::new() + .route("/instance/new", post(runner::new_instance)) + .with_state(state); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:6969").await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} diff --git a/worker/src/http/runner/mod.rs b/worker/src/http/runner/mod.rs new file mode 100644 index 0000000..3ed3a1c --- /dev/null +++ b/worker/src/http/runner/mod.rs @@ -0,0 +1,12 @@ +use axum::{extract::State, Json}; +use proto::worker::runner::{DeployInstanceReq, DeployInstanceRes}; + +use crate::http::HttpState; + +pub async fn new_instance( + State(state): State, + Json(_payload): Json, +) -> Json { + _ = state.runner; + todo!(); +} diff --git a/worker/src/main.rs b/worker/src/main.rs index 7460120..8ff18c7 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -1,27 +1,47 @@ use std::sync::Arc; use eyre::Result; +use http::HttpState; +use runner::Runner; use tracing::info; use crate::{args::WorkerArgs, monitor::pusher}; mod args; +mod http; mod monitor; +mod runner; #[tokio::main] async fn main() -> Result<()> { setup::tracing(); - let args = Arc::new(WorkerArgs::parse()); + let args = WorkerArgs::parse(); info!(?args, "started worker"); let pusher_handle = tokio::spawn({ - let args = Arc::clone(&args); + async move { + pusher::start_pusher(Arc::new(args)).await; + } + }); + + let (runner, runner_handle) = Runner::new(); + let runner_actor_handle = tokio::spawn(async move { + runner.run().await; + }); + + let http_handle = tokio::spawn({ + let state = HttpState { + runner: runner_handle.clone(), + }; async { - pusher::start_pusher(args).await; + http::run_server(state).await; } }); + pusher_handle.await.unwrap(); + runner_actor_handle.await.unwrap(); + http_handle.await.unwrap(); Ok(()) } diff --git a/worker/src/runner/container_rt.rs b/worker/src/runner/container_rt.rs new file mode 100644 index 0000000..ac3bb83 --- /dev/null +++ b/worker/src/runner/container_rt.rs @@ -0,0 +1,17 @@ +use proto::common::instance::InstanceSpec; + +use super::RunnerHandle; +#[allow(clippy::unused_async)] +pub async fn spawn_instance(spec: InstanceSpec, port: u16, _handle: RunnerHandle) { + tokio::spawn(async move { + match run_instance(spec, port).await { + Ok(()) => todo!(), + Err(_) => todo!(), + } + }); + todo!() +} +#[allow(clippy::unused_async)] +async fn run_instance(_spec: InstanceSpec, _port: u16) -> eyre::Result<()> { + todo!(); +} diff --git a/worker/src/runner/mod.rs b/worker/src/runner/mod.rs new file mode 100644 index 0000000..faa3855 --- /dev/null +++ b/worker/src/runner/mod.rs @@ -0,0 +1,103 @@ +use std::collections::{HashMap, HashSet}; + +use eyre::{Context as _, Ok, Report}; +use proto::common::instance::{InstanceId, InstanceSpec}; +use tokio::{ + net::TcpListener, + sync::{mpsc, oneshot}, + task, +}; + +mod container_rt; + +pub struct Runner { + rx: mpsc::Receiver, + instances: HashMap, + ports: HashSet, + handle: RunnerHandle, +} + +impl Runner { + #[must_use] + pub fn new() -> (Runner, RunnerHandle) { + let (tx, rx) = mpsc::channel(16); + let handle = RunnerHandle(tx); + let actor = Runner { + rx, + instances: HashMap::default(), + ports: HashSet::default(), + handle: handle.clone(), + }; + (actor, handle) + } + + pub async fn run(mut self) { + while let Some(msg) = self.rx.recv().await { + self.handle_msg(msg).await; + } + } + + async fn handle_msg(&mut self, msg: Msg) { + match msg { + Msg::InstanceDeploy(spec, reply) => { + let res = self.instance_deploy(spec).await; + _ = reply.send(res); + } + Msg::InstanceTerminate(_id, _reply) => todo!(), + Msg::InstanceKill(_id, _report) => todo!(), + } + } + + async fn instance_deploy(&mut self, spec: InstanceSpec) -> eyre::Result<()> { + let port = self.get_port_for_instance(spec.instance_id).await?; + container_rt::spawn_instance(spec, port, self.handle.clone()).await; + Ok(()) + } + + async fn get_port_for_instance(&mut self, id: InstanceId) -> eyre::Result { + let port = loop { + let port = get_port().await?; + if !self.ports.contains(&port) { + break port; + } + }; + self.instances.insert(id, port); + self.ports.insert(port); + Ok(port) + } +} + +#[derive(Clone)] +pub struct RunnerHandle(mpsc::Sender); + +impl RunnerHandle { + #[allow(dead_code)] + async fn send(&self, msg: Msg) { + _ = self.0.send(msg).await; + } + + #[allow(dead_code)] + pub async fn deploy_instance(&self, spec: InstanceSpec) -> Result<(), Report> { + let (tx, rx) = oneshot::channel(); + self.send(Msg::InstanceDeploy(spec, tx)).await; + rx.await.unwrap() + } +} + +#[allow(clippy::enum_variant_names)] // remove this once more variants are added +#[allow(dead_code)] +pub enum Msg { + InstanceDeploy(InstanceSpec, oneshot::Sender>), + InstanceTerminate(InstanceId, oneshot::Sender>), + InstanceKill(InstanceId, oneshot::Sender>), +} + +async fn get_port() -> eyre::Result { + let listener = TcpListener::bind(("0.0.0.0", 0)) + .await + .wrap_err("failed to bind while deciding port")?; + let port = listener.local_addr().expect("must have local_addr").port(); + drop(listener); + task::yield_now().await; + Ok(port) +}