Skip to content

Commit

Permalink
finished container_runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
lemosep authored and lffg committed Jun 18, 2024
1 parent d2a639f commit 58625d8
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ chrono = { version = "0.4.38", default-features = false, features = [
"serde",
] }
eyre = "0.6"
futures-util = "0.3.30"
reqwest = { version = "0.12", features = ["json"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down
2 changes: 2 additions & 0 deletions proto/src/common/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ pub enum Status {
Crashed { error: String },
/// The instance was killed by the System due to an error.
Killed { reason: String },
/// The instance failed during attempted execution.
FailedToStart { error: String },
}
1 change: 1 addition & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ axum.workspace = true
bollard.workspace = true
clap.workspace = true
eyre.workspace = true
futures-util.workspace = true
reqwest.workspace = true
sysinfo.workspace = true
tokio.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use bollard::Docker;
use eyre::Result;
use http::HttpState;
use runner::Runner;
Expand All @@ -26,7 +27,8 @@ async fn main() -> Result<()> {
}
});

let (runner, runner_handle) = Runner::new();
let docker = Arc::new(Docker::connect_with_http_defaults().unwrap());
let (runner, runner_handle) = Runner::new(docker);
let runner_actor_handle = tokio::spawn(async move {
runner.run().await;
});
Expand Down
119 changes: 91 additions & 28 deletions worker/src/runner/container_rt.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,117 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use bollard::{
container::{Config, CreateContainerOptions, StartContainerOptions},
secret::HostConfig,
container::{Config, CreateContainerOptions, StartContainerOptions, WaitContainerOptions},
secret::{ContainerCreateResponse, ContainerWaitExitError, ContainerWaitResponse, HostConfig},
Docker,
};
use proto::common::instance::{InstanceId, InstanceSpec};
use futures_util::stream::StreamExt;
use proto::common::instance::{InstanceSpec, Status};

use super::RunnerHandle;

#[derive(Clone)]
pub struct ContainerRuntime {
docker: Docker,
containers: HashMap<InstanceId, String>,
docker: Arc<Docker>,
}

impl ContainerRuntime {
pub fn new() -> Self {
let d = Docker::connect_with_defaults().unwrap();
ContainerRuntime {
docker: d,
containers: HashMap::default(),
}
pub fn new(docker: Arc<Docker>) -> Self {
ContainerRuntime { docker }
}

#[allow(clippy::unused_async)]
pub async fn spawn_instance(&mut self, spec: InstanceSpec, port: u16, _handle: RunnerHandle) {
let mut rt_clone = self.clone();
pub fn spawn_instance(&self, spec: InstanceSpec, port: u16, handle: RunnerHandle) {
let this = self.clone();
tokio::spawn(async move {
match rt_clone.run_instance(spec, port).await {
Ok(()) => todo!(),
Err(_) => todo!(),
let container_name = Self::create_container_name(&spec);

if let Err(e) = this
.create_and_run(&spec, port, container_name.clone())
.await
{
let error = e.to_string();
handle
.report_instance_status(Status::FailedToStart { error })
.await;
return;
}

// healthcheck verifies if service is running on established `PORT`
handle.report_instance_status(Status::Started).await;

if let Err(e) = this.wait_container(&container_name).await {
let error = e.to_string();
handle
.report_instance_status(Status::Crashed { error })
.await;
return;
}

handle.report_instance_status(Status::Terminated).await;
});
todo!()
}

#[allow(clippy::unused_async)]
async fn run_instance(&mut self, spec: InstanceSpec, port: u16) -> eyre::Result<()> {
async fn create_and_run(
&self,
spec: &InstanceSpec,
port: u16,
name: String,
) -> eyre::Result<()> {
let create_response = self.create_container(spec, port, name.clone()).await?;

self.run_container(create_response).await?;
Ok(())
}

async fn run_container(&self, create_response: ContainerCreateResponse) -> eyre::Result<()> {
self.docker
.start_container(&create_response.id, None::<StartContainerOptions<String>>)
.await?;

Ok(())
}

async fn create_container(
&self,
spec: &InstanceSpec,
port: u16,
name: String,
) -> eyre::Result<ContainerCreateResponse> {
let config = self.create_container_config(spec.clone(), port);
let container_name = format!("instance-{}", spec.instance_id.0);

let options = Some(CreateContainerOptions {
name: container_name,
name,
platform: Some("linux/x86_64".to_string()),
});
let create_response = self.docker.create_container(options, config).await?;

self.docker
.start_container(&create_response.id, None::<StartContainerOptions<String>>)
.await?;
Ok(create_response)
}

self.containers.insert(spec.instance_id, create_response.id);
async fn wait_container(&self, name: &str) -> eyre::Result<()> {
let options = Some(WaitContainerOptions {
condition: "not-running",
});

Ok(())
let mut response_stream = self.docker.wait_container(name, options);
let Some(result) = response_stream.next().await else {
eyre::bail!("wait_container didn't respond");
};

match result {
Ok(res) if res.status_code == 0 => Ok(()),
Ok(ContainerWaitResponse {
status_code,
error: Some(ContainerWaitExitError { message: Some(m) }),
}) => Err(eyre::eyre!("Container exited due to: {m} - {status_code}")),
Ok(ContainerWaitResponse {
status_code,
error: _,
}) => Err(eyre::eyre!(
"Container exited due to unknown error - {status_code}"
)),
Err(e) => Err(e.into()),
}
}

#[allow(clippy::unused_self)]
Expand All @@ -66,6 +124,7 @@ impl ContainerRuntime {
map.insert(format!("{port}/tcp"), HashMap::default());
map
}),
env: Some(vec![format!("PORT={port}")]),
host_config: Some(HostConfig {
cpu_shares: Some(spec.resource_config.cpu_shares),
memory: Some(spec.resource_config.memory_limit),
Expand All @@ -85,4 +144,8 @@ impl ContainerRuntime {
..Default::default()
}
}

fn create_container_name(spec: &InstanceSpec) -> String {
format!("instance-{}", spec.instance_id.0)
}
}
58 changes: 42 additions & 16 deletions worker/src/runner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use bollard::Docker;
use eyre::{Context as _, Ok, Report};
use proto::common::instance::{InstanceId, InstanceSpec};
use proto::common::instance::{self, InstanceId, InstanceSpec};
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
Expand All @@ -21,15 +25,15 @@ pub struct Runner {

impl Runner {
#[must_use]
pub fn new() -> (Runner, RunnerHandle) {
pub fn new(docker: Arc<Docker>) -> (Runner, RunnerHandle) {
let (tx, rx) = mpsc::channel(16);
let handle = RunnerHandle(tx);
let actor = Runner {
rx,
instances: HashMap::default(),
ports: HashSet::default(),
handle: handle.clone(),
container_runtime: ContainerRuntime::new(),
container_runtime: ContainerRuntime::new(docker),
};
(actor, handle)
}
Expand All @@ -42,20 +46,20 @@ impl Runner {

async fn handle_msg(&mut self, msg: Msg) {
match msg {
Msg::InstanceDeploy(spec, reply) => {
Msg::DeployInstance(spec, reply) => {
let res = self.instance_deploy(spec).await;
_ = reply.send(res);
}
Msg::InstanceTerminate(_id, _reply) => todo!(),
Msg::InstanceKill(_id, _report) => todo!(),
Msg::TerminateInstance(_id, _reply) => todo!(),
Msg::KillInstance(_id, _report) => todo!(),
Msg::ReportInstanceStatus(_) => todo!(),
}
}

async fn instance_deploy(&mut self, spec: InstanceSpec) -> eyre::Result<()> {
let port = self.get_port_for_instance(spec.instance_id).await?;
self.container_runtime
.spawn_instance(spec, port, self.handle.clone())
.await;
.spawn_instance(spec, port, self.handle.clone());
Ok(())
}

Expand All @@ -81,20 +85,42 @@ impl RunnerHandle {
_ = self.0.send(msg).await;
}

/// Sends a message and waits for a reply.
async fn send_wait<F, R>(&self, f: F) -> R
where
F: FnOnce(oneshot::Sender<R>) -> Msg,
{
let (tx, rx) = oneshot::channel();
self.send(f(tx)).await;
rx.await.expect("actor must be alive")
}

#[allow(dead_code)]
pub async fn deploy_instance(&self, spec: InstanceSpec) -> Result<(), Report> {
let (tx, rx) = oneshot::channel();
self.send(Msg::InstanceDeploy(spec, tx)).await;
rx.await.unwrap()
self.send_wait(|tx| Msg::DeployInstance(spec, tx)).await
}

#[allow(dead_code)]
pub async fn terminate_instance(&self, id: InstanceId) -> Result<(), Report> {
self.send_wait(|tx| Msg::TerminateInstance(id, tx)).await
}

#[allow(dead_code)]
pub async fn kill_instance(&self, id: InstanceId) -> Result<(), Report> {
self.send_wait(|tx| Msg::KillInstance(id, tx)).await
}

pub async fn report_instance_status(&self, status: instance::Status) {
self.send(Msg::ReportInstanceStatus(status)).await;
}
}

#[allow(clippy::enum_variant_names)] // remove this once more variants are added
#[allow(dead_code)]
pub enum Msg {
InstanceDeploy(InstanceSpec, oneshot::Sender<Result<(), Report>>),
InstanceTerminate(InstanceId, oneshot::Sender<Result<(), Report>>),
InstanceKill(InstanceId, oneshot::Sender<Result<(), Report>>),
DeployInstance(InstanceSpec, oneshot::Sender<Result<(), Report>>),
TerminateInstance(InstanceId, oneshot::Sender<Result<(), Report>>),
KillInstance(InstanceId, oneshot::Sender<Result<(), Report>>),
ReportInstanceStatus(instance::Status),
}

async fn get_port() -> eyre::Result<u16> {
Expand Down

0 comments on commit 58625d8

Please sign in to comment.