diff --git a/Cargo.toml b/Cargo.toml index 88ccdbb..42680eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,3 +44,4 @@ wildcard_imports = "allow" module_name_repetitions = "allow" cast_precision_loss = "allow" unused_async = "allow" +enum_glob_use = "allow" diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 8e14142..eba42be 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -1,6 +1,5 @@ pub mod common; - pub mod ctl; -pub mod worker; - pub mod etc; +pub mod well_known; +pub mod worker; diff --git a/proto/src/well_known.rs b/proto/src/well_known.rs new file mode 100644 index 0000000..1e2837d --- /dev/null +++ b/proto/src/well_known.rs @@ -0,0 +1,3 @@ +use std::time::Duration; + +pub const GRACEFUL_SHUTDOWN_DEADLINE: Duration = Duration::from_secs(20); diff --git a/worker/src/main.rs b/worker/src/main.rs index a736891..84f2bc8 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -31,7 +31,7 @@ async fn main() -> Result<()> { } }); - let docker = Arc::new(Docker::connect_with_http_defaults().unwrap()); + let docker = Arc::new(Docker::connect_with_defaults().unwrap()); let (runner, runner_handle) = Runner::new(docker, sender); let runner_actor_handle = tokio::spawn(async move { runner.run().await; diff --git a/worker/src/runner/container_rt.rs b/worker/src/runner/container_rt.rs index 2ff454b..079afcd 100644 --- a/worker/src/runner/container_rt.rs +++ b/worker/src/runner/container_rt.rs @@ -1,12 +1,20 @@ use std::{collections::HashMap, sync::Arc}; use bollard::{ - container::{Config, CreateContainerOptions, StartContainerOptions, WaitContainerOptions}, + container::{ + Config, CreateContainerOptions, KillContainerOptions, StartContainerOptions, + WaitContainerOptions, + }, + errors::Error as BollardError, secret::{ContainerCreateResponse, ContainerWaitExitError, ContainerWaitResponse, HostConfig}, Docker, }; use futures_util::stream::StreamExt; -use proto::common::instance::{InstanceSpec, Status}; +use proto::{ + common::instance::{InstanceId, InstanceSpec, Status}, + well_known::GRACEFUL_SHUTDOWN_DEADLINE, +}; +use tracing::error; use super::RunnerHandle; @@ -20,39 +28,66 @@ impl ContainerRuntime { ContainerRuntime { docker } } - pub fn spawn_instance(&self, spec: InstanceSpec, port: u16, handle: RunnerHandle) { - let this = self.clone(); - tokio::spawn(async move { - let container_name = Self::create_container_name(&spec); + pub async fn run_instance_lifecycle( + &self, + spec: InstanceSpec, + port: u16, + handle: RunnerHandle, + ) { + let container_name = Self::create_container_name(spec.instance_id); + + if let Err(e) = self + .create_and_run(&spec, port, container_name.clone()) + .await + { + let error = e.to_string(); + handle + .report_instance_status(spec.instance_id, Status::FailedToStart { error }) + .await; + return; + } - if let Err(e) = this - .create_and_run(&spec, port, container_name.clone()) - .await - { - let error = e.to_string(); + // TODO: Add health check to verify whether the service is running + handle + .report_instance_status(spec.instance_id, Status::Started) + .await; + + match self + .wait_container(spec.instance_id) + .await + .expect("infallible operation") + { + ExitStatus::Terminated => { handle - .report_instance_status(spec.instance_id, Status::FailedToStart { error }) + .report_instance_status(spec.instance_id, Status::Terminated) .await; - return; } - - // TODO: Add health check to verify whether the service is running - handle - .report_instance_status(spec.instance_id, Status::Started) - .await; - - if let Err(e) = this.wait_container(&container_name).await { - let error = e.to_string(); + ExitStatus::Crashed { status, error } => { handle .report_instance_status(spec.instance_id, Status::Crashed { error }) .await; - return; + error!(status, instance_id = %spec.instance_id, "Process exited"); } + } + } - handle - .report_instance_status(spec.instance_id, Status::Terminated) - .await; - }); + pub async fn terminate_instance(&self, id: InstanceId) { + if let Err(e) = self.kill_container(id, "SIGTERM").await { + error!(%e, "error when killing instance (term)"); + } + + let timeout_res = tokio::time::timeout(GRACEFUL_SHUTDOWN_DEADLINE, self.wait_container(id)); + + match timeout_res.await { + // Container has been gracefully terminated. + Ok(_) => (), + // Container failed to terminate within given deadline. + Err(_) => { + if let Err(e) = self.kill_container(id, "SIGKILL").await { + error!(%e, "error when killing instance (kill)"); + } + } + } } async fn create_and_run( @@ -93,32 +128,49 @@ impl ContainerRuntime { Ok(create_response) } - async fn wait_container(&self, name: &str) -> eyre::Result<()> { + async fn wait_container(&self, id: InstanceId) -> eyre::Result { + let ct_name = Self::create_container_name(id); let options = Some(WaitContainerOptions { condition: "not-running", }); - - let mut response_stream = self.docker.wait_container(name, options); + let mut response_stream = self.docker.wait_container(&ct_name, options); let Some(result) = response_stream.next().await else { eyre::bail!("wait_container didn't respond"); }; match result { - Ok(res) if res.status_code == 0 => Ok(()), + Ok(res) if res.status_code == 0 => Ok(ExitStatus::Terminated), + // Although this `Ok` variant is impossible as per the library's + // source code, the type signature still allows it, so we handle + // it here. The library maps the `Ok` with non-0 exit status code + // to the OR-ed `Err` case. Ok(ContainerWaitResponse { - status_code, + status_code: status, error: Some(ContainerWaitExitError { message: Some(m) }), - }) => Err(eyre::eyre!("Container exited due to: {m} - {status_code}")), + }) + | Err(BollardError::DockerContainerWaitError { + error: m, + code: status, + }) => Ok(ExitStatus::Crashed { status, error: m }), Ok(ContainerWaitResponse { status_code, error: _, - }) => Err(eyre::eyre!( - "Container exited due to unknown error - {status_code}" - )), + }) => Ok(ExitStatus::Crashed { + status: status_code, + error: "unknown".into(), + }), Err(e) => Err(e.into()), } } + async fn kill_container(&self, id: InstanceId, signal: &str) -> eyre::Result<()> { + let ct_name = Self::create_container_name(id); + self.docker + .kill_container(&ct_name, Some(KillContainerOptions { signal })) + .await?; + Ok(()) + } + fn create_container_config(spec: InstanceSpec, port: u16) -> Config { const HOST: &str = "0.0.0.0"; @@ -131,6 +183,7 @@ impl ContainerRuntime { )])), env: Some(vec![format!("PORT={port}"), format!("HOST={HOST}")]), host_config: Some(HostConfig { + auto_remove: Some(true), cpu_shares: Some(spec.resource_config.cpu_shares), memory: Some(spec.resource_config.memory_limit), port_bindings: Some({ @@ -150,7 +203,12 @@ impl ContainerRuntime { } } - fn create_container_name(spec: &InstanceSpec) -> String { - format!("instance-{}", spec.instance_id.0) + fn create_container_name(id: InstanceId) -> String { + format!("instance-{id}") } } + +enum ExitStatus { + Terminated, + Crashed { status: i64, error: String }, +} diff --git a/worker/src/runner/handle.rs b/worker/src/runner/handle.rs new file mode 100644 index 0000000..816a991 --- /dev/null +++ b/worker/src/runner/handle.rs @@ -0,0 +1,38 @@ +use eyre::Report; +use proto::common::instance::{self, InstanceId, InstanceSpec}; +use tokio::sync::{mpsc, oneshot}; + +use super::Msg; + +#[derive(Clone)] +pub struct RunnerHandle(pub mpsc::Sender); + +impl RunnerHandle { + async fn send(&self, msg: Msg) { + _ = self.0.send(msg).await; + } + + /// Sends a message and waits for a reply. + async fn send_wait(&self, f: F) -> R + where + F: FnOnce(oneshot::Sender) -> Msg, + { + let (tx, rx) = oneshot::channel(); + self.send(f(tx)).await; + rx.await.expect("actor must be alive") + } + + #[allow(dead_code)] + pub async fn deploy_instance(&self, spec: InstanceSpec) -> Result<(), Report> { + self.send_wait(|tx| Msg::DeployInstance(spec, tx)).await + } + + #[allow(dead_code)] + pub async fn terminate_instance(&self, id: InstanceId) -> Result<(), Report> { + self.send_wait(|tx| Msg::TerminateInstance(id, tx)).await + } + + pub async fn report_instance_status(&self, id: InstanceId, status: instance::Status) { + self.send(Msg::ReportInstanceStatus(id, status)).await; + } +} diff --git a/worker/src/runner/mod.rs b/worker/src/runner/mod.rs index 9608b23..a942cb0 100644 --- a/worker/src/runner/mod.rs +++ b/worker/src/runner/mod.rs @@ -4,6 +4,7 @@ use std::{ }; use bollard::Docker; +use container_rt::ContainerRuntime; use eyre::{Context as _, Ok, Report}; use proto::common::instance::{self, InstanceId, InstanceSpec}; use tokio::{ @@ -12,9 +13,10 @@ use tokio::{ task, }; -mod container_rt; -use container_rt::ContainerRuntime; +mod handle; +pub use handle::RunnerHandle; +mod container_rt; use super::sender; pub struct Runner { @@ -22,7 +24,7 @@ pub struct Runner { instances: HashMap, ports: HashSet, handle: RunnerHandle, - container_runtime: ContainerRuntime, + container_runtime: Arc, ctl_sender: Arc, } @@ -36,7 +38,7 @@ impl Runner { instances: HashMap::default(), ports: HashSet::default(), handle: handle.clone(), - container_runtime: ContainerRuntime::new(docker), + container_runtime: Arc::new(ContainerRuntime::new(docker)), ctl_sender: sender, }; (actor, handle) @@ -54,21 +56,50 @@ impl Runner { let res = self.deploy_instance(spec).await; _ = reply.send(res); } - Msg::TerminateInstance(_id, _reply) => todo!(), - Msg::KillInstance(_id, _report) => todo!(), + Msg::TerminateInstance(id, reply) => { + let res = self.terminate_instance(id); + _ = reply.send(res); + } Msg::ReportInstanceStatus(id, status) => { - let _ = self.ctl_sender.send_status(id, status).await; + self.report_instance_status(id, status); } } } async fn deploy_instance(&mut self, spec: InstanceSpec) -> eyre::Result<()> { let port = self.get_port_for_instance(spec.instance_id).await?; - self.container_runtime - .spawn_instance(spec, port, self.handle.clone()); + let rt = self.container_runtime.clone(); + let handle = self.handle.clone(); + tokio::spawn(async move { + rt.run_instance_lifecycle(spec, port, handle).await; + }); + Ok(()) + } + + fn terminate_instance(&mut self, id: InstanceId) -> eyre::Result<()> { + let rt = self.container_runtime.clone(); + tokio::spawn(async move { + rt.terminate_instance(id).await; + }); Ok(()) } + fn report_instance_status(&mut self, id: InstanceId, status: instance::Status) { + use instance::Status::*; + match &status { + Started => (), + Terminated => self.remove_instance(id), + Crashed { error: _ } | Killed { reason: _ } | FailedToStart { error: _ } => { + self.remove_instance(id); + } + } + + let s = self.ctl_sender.clone(); + tokio::spawn(async move { + let _ = s.send_status(id, status).await; + }); + } + async fn get_port_for_instance(&mut self, id: InstanceId) -> eyre::Result { let port = loop { let port = get_port().await?; @@ -80,44 +111,10 @@ impl Runner { self.ports.insert(port); Ok(port) } -} - -#[derive(Clone)] -pub struct RunnerHandle(mpsc::Sender); - -impl RunnerHandle { - #[allow(dead_code)] - async fn send(&self, msg: Msg) { - _ = self.0.send(msg).await; - } - /// Sends a message and waits for a reply. - async fn send_wait(&self, f: F) -> R - where - F: FnOnce(oneshot::Sender) -> Msg, - { - let (tx, rx) = oneshot::channel(); - self.send(f(tx)).await; - rx.await.expect("actor must be alive") - } - - #[allow(dead_code)] - pub async fn deploy_instance(&self, spec: InstanceSpec) -> Result<(), Report> { - self.send_wait(|tx| Msg::DeployInstance(spec, tx)).await - } - - #[allow(dead_code)] - pub async fn terminate_instance(&self, id: InstanceId) -> Result<(), Report> { - self.send_wait(|tx| Msg::TerminateInstance(id, tx)).await - } - - #[allow(dead_code)] - pub async fn kill_instance(&self, id: InstanceId) -> Result<(), Report> { - self.send_wait(|tx| Msg::KillInstance(id, tx)).await - } - - pub async fn report_instance_status(&self, id: InstanceId, status: instance::Status) { - self.send(Msg::ReportInstanceStatus(id, status)).await; + fn remove_instance(&mut self, id: InstanceId) { + let freed_port = self.instances.remove(&id).unwrap(); + self.ports.remove(&freed_port); } } @@ -125,8 +122,6 @@ impl RunnerHandle { pub enum Msg { DeployInstance(InstanceSpec, oneshot::Sender>), TerminateInstance(InstanceId, oneshot::Sender>), - KillInstance(InstanceId, oneshot::Sender>), - /// Sends a report to `ctl::http` component regarding current /// instance status. Furthermore updating discovery ReportInstanceStatus(InstanceId, instance::Status),