Skip to content

Commit

Permalink
feat(worker-runner): added runner actor & handle (#43)
Browse files Browse the repository at this point in the history
Co-authored-by: Luiz Felipe Gonçalves <git@luizfelipe.dev>
  • Loading branch information
lemosep and lffg committed Jun 14, 2024
1 parent 115a599 commit 68ae363
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions ctl/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(dead_code)]

use std::{collections::HashMap, net::SocketAddr};

use chrono::{DateTime, Utc};
Expand All @@ -12,11 +14,8 @@ pub struct Discovery {
rx: mpsc::Receiver<Msg>,
// TODO: Add more information on workers
workers: HashMap<SocketAddr, Metrics>,
#[allow(dead_code)]
services: HashMap<ServiceId, ServiceInfo>,
#[allow(dead_code)]
instances: HashMap<InstanceId, InstanceInfo>,
#[allow(dead_code)]
deployments: HashMap<DeploymentId, DeploymentInfo>,
}

Expand Down
3 changes: 2 additions & 1 deletion ctl/src/http/deployer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use proto::ctl::deployer::{DeployReq, DeployRes};
use crate::http::HttpState;

pub async fn deploy(
State(_state): State<HttpState>,
State(state): State<HttpState>,
Json(_payload): Json<DeployReq>,
) -> Json<DeployRes> {
_ = state.discovery;
todo!();
}
2 changes: 1 addition & 1 deletion proto/src/common/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use uuid::Uuid;

use crate::common::service::ServiceImage;

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct InstanceId(Uuid);

#[derive(Debug, Serialize, Deserialize)]
Expand Down
18 changes: 18 additions & 0 deletions proto/src/worker/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,27 @@
//! an instance on a given worker node.

use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::common::instance::{InstanceId, InstanceSpec};

///

/// Starts a new deploy in the system
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployInstanceReq {
pub id: DeployReqId,
pub instance_spec: InstanceSpec,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DeployInstanceRes {
pub id: DeployReqId,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DeployReqId(Uuid);

/// Starts a new deploy in the system.
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployReq {
Expand Down
5 changes: 4 additions & 1 deletion worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ edition.workspace = true
workspace = true

[dependencies]
# Internal deps
setup.workspace = true
proto.workspace = true

# External deps (keep alphabetically sorted)
axum.workspace = true
clap.workspace = true
eyre.workspace = true
reqwest.workspace = true
sysinfo.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
1 change: 1 addition & 0 deletions worker/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use clap::{value_parser, Parser};
#[derive(Debug)]
pub struct WorkerArgs {
/// Controller's address.
#[allow(dead_code)]
pub controller_addr: SocketAddr,

/// Interval at which metrics are pushed to the controller.
Expand Down
19 changes: 19 additions & 0 deletions worker/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use axum::{routing::post, Router};

use crate::runner::RunnerHandle;

mod runner;

#[derive(Clone)]
pub struct HttpState {
pub runner: RunnerHandle,
}

pub async fn run_server(state: HttpState) {
let app = Router::new()
.route("/instance/new", post(runner::new_instance))
.with_state(state);

let listener = tokio::net::TcpListener::bind("0.0.0.0:6969").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
12 changes: 12 additions & 0 deletions worker/src/http/runner/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use axum::{extract::State, Json};
use proto::worker::runner::{DeployInstanceReq, DeployInstanceRes};

use crate::http::HttpState;

pub async fn new_instance(
State(state): State<HttpState>,
Json(_payload): Json<DeployInstanceReq>,
) -> Json<DeployInstanceRes> {
_ = state.runner;
todo!();
}
26 changes: 23 additions & 3 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,47 @@
use std::sync::Arc;

use eyre::Result;
use http::HttpState;
use runner::Runner;
use tracing::info;

use crate::{args::WorkerArgs, monitor::pusher};

mod args;
mod http;
mod monitor;
mod runner;

#[tokio::main]
async fn main() -> Result<()> {
setup::tracing();

let args = Arc::new(WorkerArgs::parse());
let args = WorkerArgs::parse();
info!(?args, "started worker");

let pusher_handle = tokio::spawn({
let args = Arc::clone(&args);
async move {
pusher::start_pusher(Arc::new(args)).await;
}
});

let (runner, runner_handle) = Runner::new();
let runner_actor_handle = tokio::spawn(async move {
runner.run().await;
});

let http_handle = tokio::spawn({
let state = HttpState {
runner: runner_handle.clone(),
};
async {
pusher::start_pusher(args).await;
http::run_server(state).await;
}
});

pusher_handle.await.unwrap();
runner_actor_handle.await.unwrap();
http_handle.await.unwrap();

Ok(())
}
17 changes: 17 additions & 0 deletions worker/src/runner/container_rt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use proto::common::instance::InstanceSpec;

use super::RunnerHandle;
#[allow(clippy::unused_async)]
pub async fn spawn_instance(spec: InstanceSpec, port: u16, _handle: RunnerHandle) {
tokio::spawn(async move {
match run_instance(spec, port).await {
Ok(()) => todo!(),
Err(_) => todo!(),
}
});
todo!()
}
#[allow(clippy::unused_async)]
async fn run_instance(_spec: InstanceSpec, _port: u16) -> eyre::Result<()> {
todo!();
}
103 changes: 103 additions & 0 deletions worker/src/runner/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use std::collections::{HashMap, HashSet};

use eyre::{Context as _, Ok, Report};
use proto::common::instance::{InstanceId, InstanceSpec};
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
task,
};

mod container_rt;

pub struct Runner {
rx: mpsc::Receiver<Msg>,
instances: HashMap<InstanceId, u16>,
ports: HashSet<u16>,
handle: RunnerHandle,
}

impl Runner {
#[must_use]
pub fn new() -> (Runner, RunnerHandle) {
let (tx, rx) = mpsc::channel(16);
let handle = RunnerHandle(tx);
let actor = Runner {
rx,
instances: HashMap::default(),
ports: HashSet::default(),
handle: handle.clone(),
};
(actor, handle)
}

pub async fn run(mut self) {
while let Some(msg) = self.rx.recv().await {
self.handle_msg(msg).await;
}
}

async fn handle_msg(&mut self, msg: Msg) {
match msg {
Msg::InstanceDeploy(spec, reply) => {
let res = self.instance_deploy(spec).await;
_ = reply.send(res);
}
Msg::InstanceTerminate(_id, _reply) => todo!(),
Msg::InstanceKill(_id, _report) => todo!(),
}
}

async fn instance_deploy(&mut self, spec: InstanceSpec) -> eyre::Result<()> {
let port = self.get_port_for_instance(spec.instance_id).await?;
container_rt::spawn_instance(spec, port, self.handle.clone()).await;
Ok(())
}

async fn get_port_for_instance(&mut self, id: InstanceId) -> eyre::Result<u16> {
let port = loop {
let port = get_port().await?;
if !self.ports.contains(&port) {
break port;
}
};
self.instances.insert(id, port);
self.ports.insert(port);
Ok(port)
}
}

#[derive(Clone)]
pub struct RunnerHandle(mpsc::Sender<Msg>);

impl RunnerHandle {
#[allow(dead_code)]
async fn send(&self, msg: Msg) {
_ = self.0.send(msg).await;
}

#[allow(dead_code)]
pub async fn deploy_instance(&self, spec: InstanceSpec) -> Result<(), Report> {
let (tx, rx) = oneshot::channel();
self.send(Msg::InstanceDeploy(spec, tx)).await;
rx.await.unwrap()
}
}

#[allow(clippy::enum_variant_names)] // remove this once more variants are added
#[allow(dead_code)]
pub enum Msg {
InstanceDeploy(InstanceSpec, oneshot::Sender<Result<(), Report>>),
InstanceTerminate(InstanceId, oneshot::Sender<Result<(), Report>>),
InstanceKill(InstanceId, oneshot::Sender<Result<(), Report>>),
}

async fn get_port() -> eyre::Result<u16> {
let listener = TcpListener::bind(("0.0.0.0", 0))
.await
.wrap_err("failed to bind while deciding port")?;
let port = listener.local_addr().expect("must have local_addr").port();
drop(listener);
task::yield_now().await;
Ok(port)
}

0 comments on commit 68ae363

Please sign in to comment.