Skip to content

Commit

Permalink
feat(worker-runner): implemented instance graceful termination (#46)
Browse files Browse the repository at this point in the history
Containers cannot be killed by request, except when termination fails.
  • Loading branch information
lemosep authored Jun 19, 2024
1 parent d861a9f commit 47ca457
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 89 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ wildcard_imports = "allow"
module_name_repetitions = "allow"
cast_precision_loss = "allow"
unused_async = "allow"
enum_glob_use = "allow"
5 changes: 2 additions & 3 deletions proto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod common;

pub mod ctl;
pub mod worker;

pub mod etc;
pub mod well_known;
pub mod worker;
3 changes: 3 additions & 0 deletions proto/src/well_known.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use std::time::Duration;

pub const GRACEFUL_SHUTDOWN_DEADLINE: Duration = Duration::from_secs(20);
2 changes: 1 addition & 1 deletion worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() -> Result<()> {
}
});

let docker = Arc::new(Docker::connect_with_http_defaults().unwrap());
let docker = Arc::new(Docker::connect_with_defaults().unwrap());
let (runner, runner_handle) = Runner::new(docker, sender);
let runner_actor_handle = tokio::spawn(async move {
runner.run().await;
Expand Down
132 changes: 95 additions & 37 deletions worker/src/runner/container_rt.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
use std::{collections::HashMap, sync::Arc};

use bollard::{
container::{Config, CreateContainerOptions, StartContainerOptions, WaitContainerOptions},
container::{
Config, CreateContainerOptions, KillContainerOptions, StartContainerOptions,
WaitContainerOptions,
},
errors::Error as BollardError,
secret::{ContainerCreateResponse, ContainerWaitExitError, ContainerWaitResponse, HostConfig},
Docker,
};
use futures_util::stream::StreamExt;
use proto::common::instance::{InstanceSpec, Status};
use proto::{
common::instance::{InstanceId, InstanceSpec, Status},
well_known::GRACEFUL_SHUTDOWN_DEADLINE,
};
use tracing::error;

use super::RunnerHandle;

Expand All @@ -20,39 +28,66 @@ impl ContainerRuntime {
ContainerRuntime { docker }
}

pub fn spawn_instance(&self, spec: InstanceSpec, port: u16, handle: RunnerHandle) {
let this = self.clone();
tokio::spawn(async move {
let container_name = Self::create_container_name(&spec);
pub async fn run_instance_lifecycle(
&self,
spec: InstanceSpec,
port: u16,
handle: RunnerHandle,
) {
let container_name = Self::create_container_name(spec.instance_id);

if let Err(e) = self
.create_and_run(&spec, port, container_name.clone())
.await
{
let error = e.to_string();
handle
.report_instance_status(spec.instance_id, Status::FailedToStart { error })
.await;
return;
}

if let Err(e) = this
.create_and_run(&spec, port, container_name.clone())
.await
{
let error = e.to_string();
// TODO: Add health check to verify whether the service is running
handle
.report_instance_status(spec.instance_id, Status::Started)
.await;

match self
.wait_container(spec.instance_id)
.await
.expect("infallible operation")
{
ExitStatus::Terminated => {
handle
.report_instance_status(spec.instance_id, Status::FailedToStart { error })
.report_instance_status(spec.instance_id, Status::Terminated)
.await;
return;
}

// TODO: Add health check to verify whether the service is running
handle
.report_instance_status(spec.instance_id, Status::Started)
.await;

if let Err(e) = this.wait_container(&container_name).await {
let error = e.to_string();
ExitStatus::Crashed { status, error } => {
handle
.report_instance_status(spec.instance_id, Status::Crashed { error })
.await;
return;
error!(status, instance_id = %spec.instance_id, "Process exited");
}
}
}

handle
.report_instance_status(spec.instance_id, Status::Terminated)
.await;
});
pub async fn terminate_instance(&self, id: InstanceId) {
if let Err(e) = self.kill_container(id, "SIGTERM").await {
error!(%e, "error when killing instance (term)");
}

let timeout_res = tokio::time::timeout(GRACEFUL_SHUTDOWN_DEADLINE, self.wait_container(id));

match timeout_res.await {
// Container has been gracefully terminated.
Ok(_) => (),
// Container failed to terminate within given deadline.
Err(_) => {
if let Err(e) = self.kill_container(id, "SIGKILL").await {
error!(%e, "error when killing instance (kill)");
}
}
}
}

async fn create_and_run(
Expand Down Expand Up @@ -93,32 +128,49 @@ impl ContainerRuntime {
Ok(create_response)
}

async fn wait_container(&self, name: &str) -> eyre::Result<()> {
async fn wait_container(&self, id: InstanceId) -> eyre::Result<ExitStatus> {
let ct_name = Self::create_container_name(id);
let options = Some(WaitContainerOptions {
condition: "not-running",
});

let mut response_stream = self.docker.wait_container(name, options);
let mut response_stream = self.docker.wait_container(&ct_name, options);
let Some(result) = response_stream.next().await else {
eyre::bail!("wait_container didn't respond");
};

match result {
Ok(res) if res.status_code == 0 => Ok(()),
Ok(res) if res.status_code == 0 => Ok(ExitStatus::Terminated),
// Although this `Ok` variant is impossible as per the library's
// source code, the type signature still allows it, so we handle
// it here. The library maps the `Ok` with non-0 exit status code
// to the OR-ed `Err` case.
Ok(ContainerWaitResponse {
status_code,
status_code: status,
error: Some(ContainerWaitExitError { message: Some(m) }),
}) => Err(eyre::eyre!("Container exited due to: {m} - {status_code}")),
})
| Err(BollardError::DockerContainerWaitError {
error: m,
code: status,
}) => Ok(ExitStatus::Crashed { status, error: m }),
Ok(ContainerWaitResponse {
status_code,
error: _,
}) => Err(eyre::eyre!(
"Container exited due to unknown error - {status_code}"
)),
}) => Ok(ExitStatus::Crashed {
status: status_code,
error: "unknown".into(),
}),
Err(e) => Err(e.into()),
}
}

async fn kill_container(&self, id: InstanceId, signal: &str) -> eyre::Result<()> {
let ct_name = Self::create_container_name(id);
self.docker
.kill_container(&ct_name, Some(KillContainerOptions { signal }))
.await?;
Ok(())
}

fn create_container_config(spec: InstanceSpec, port: u16) -> Config<String> {
const HOST: &str = "0.0.0.0";

Expand All @@ -131,6 +183,7 @@ impl ContainerRuntime {
)])),
env: Some(vec![format!("PORT={port}"), format!("HOST={HOST}")]),
host_config: Some(HostConfig {
auto_remove: Some(true),
cpu_shares: Some(spec.resource_config.cpu_shares),
memory: Some(spec.resource_config.memory_limit),
port_bindings: Some({
Expand All @@ -150,7 +203,12 @@ impl ContainerRuntime {
}
}

fn create_container_name(spec: &InstanceSpec) -> String {
format!("instance-{}", spec.instance_id.0)
fn create_container_name(id: InstanceId) -> String {
format!("instance-{id}")
}
}

enum ExitStatus {
Terminated,
Crashed { status: i64, error: String },
}
38 changes: 38 additions & 0 deletions worker/src/runner/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use eyre::Report;
use proto::common::instance::{self, InstanceId, InstanceSpec};
use tokio::sync::{mpsc, oneshot};

use super::Msg;

#[derive(Clone)]
pub struct RunnerHandle(pub mpsc::Sender<Msg>);

impl RunnerHandle {
async fn send(&self, msg: Msg) {
_ = self.0.send(msg).await;
}

/// Sends a message and waits for a reply.
async fn send_wait<F, R>(&self, f: F) -> R
where
F: FnOnce(oneshot::Sender<R>) -> Msg,
{
let (tx, rx) = oneshot::channel();
self.send(f(tx)).await;
rx.await.expect("actor must be alive")
}

#[allow(dead_code)]
pub async fn deploy_instance(&self, spec: InstanceSpec) -> Result<(), Report> {
self.send_wait(|tx| Msg::DeployInstance(spec, tx)).await
}

#[allow(dead_code)]
pub async fn terminate_instance(&self, id: InstanceId) -> Result<(), Report> {
self.send_wait(|tx| Msg::TerminateInstance(id, tx)).await
}

pub async fn report_instance_status(&self, id: InstanceId, status: instance::Status) {
self.send(Msg::ReportInstanceStatus(id, status)).await;
}
}
Loading

0 comments on commit 47ca457

Please sign in to comment.