From 6ef500f490b1870c3729a605165ff01febf49830 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Thu, 14 Mar 2024 17:23:19 +0100 Subject: [PATCH 1/8] feat: runtime healthcheck, start runtime on 0.0.0.0 running on unspecified ip was necessary for the runner to be able to reach the runtime when they are running in separate containers --- proto/runtime.proto | 5 +++ proto/src/generated/runtime.rs | 63 ++++++++++++++++++++++++++++++++++ runtime/src/alpha.rs | 17 ++++++--- 3 files changed, 80 insertions(+), 5 deletions(-) diff --git a/proto/runtime.proto b/proto/runtime.proto index 3748e6129..fd0c974b0 100644 --- a/proto/runtime.proto +++ b/proto/runtime.proto @@ -13,6 +13,8 @@ service Runtime { // Channel to notify a service has been stopped rpc SubscribeStop(SubscribeStopRequest) returns (stream SubscribeStopResponse); + + rpc HealthCheck(Ping) returns (Pong); } message LoadRequest { @@ -78,3 +80,6 @@ enum StopReason { // Service crashed Crash = 2; } + +message Ping {} +message Pong {} diff --git a/proto/src/generated/runtime.rs b/proto/src/generated/runtime.rs index 09eb3cca5..fa7097a29 100644 --- a/proto/src/generated/runtime.rs +++ b/proto/src/generated/runtime.rs @@ -73,6 +73,12 @@ pub struct SubscribeStopResponse { #[prost(string, tag = "2")] pub message: ::prost::alloc::string::String, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Ping {} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Pong {} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum StopReason { @@ -264,6 +270,23 @@ pub mod runtime_client { .insert(GrpcMethod::new("runtime.Runtime", "SubscribeStop")); self.inner.server_streaming(req, path, codec).await } + pub async fn health_check( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/runtime.Runtime/HealthCheck"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("runtime.Runtime", "HealthCheck")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -298,6 +321,10 @@ pub mod runtime_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn health_check( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct RuntimeServer { @@ -534,6 +561,42 @@ pub mod runtime_server { }; Box::pin(fut) } + "/runtime.Runtime/HealthCheck" => { + #[allow(non_camel_case_types)] + struct HealthCheckSvc(pub Arc); + impl tonic::server::UnaryService for HealthCheckSvc { + type Response = super::Pong; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = + async move { ::health_check(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = HealthCheckSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => Box::pin(async move { Ok(http::Response::builder() .status(200) diff --git a/runtime/src/alpha.rs b/runtime/src/alpha.rs index 51c593878..cddec6601 100644 --- a/runtime/src/alpha.rs +++ b/runtime/src/alpha.rs @@ -12,10 +12,13 @@ use anyhow::Context; use async_trait::async_trait; use core::future::Future; use shuttle_common::{extract_propagation::ExtractPropagationLayer, secrets::Secret}; -use shuttle_proto::runtime::{ - runtime_server::{Runtime, RuntimeServer}, - LoadRequest, LoadResponse, StartRequest, StartResponse, StopReason, StopRequest, StopResponse, - SubscribeStopRequest, SubscribeStopResponse, +use shuttle_proto::{ + runtime::{ + runtime_server::{Runtime, RuntimeServer}, + LoadRequest, LoadResponse, StartRequest, StartResponse, StopReason, StopRequest, + StopResponse, SubscribeStopRequest, SubscribeStopResponse, + }, + runtime::{Ping, Pong}, }; use shuttle_service::{ResourceFactory, Service}; use tokio::sync::{ @@ -99,7 +102,7 @@ pub async fn start(loader: impl Loader + Send + 'static, runner: impl Runner + S } // where to serve the gRPC control layer - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), args.port); + let addr = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), args.port); let mut server_builder = Server::builder() .http2_keepalive_interval(Some(Duration::from_secs(60))) @@ -413,4 +416,8 @@ where Ok(Response::new(ReceiverStream::new(rx))) } + + async fn health_check(&self, _request: Request) -> Result, Status> { + Ok(Response::new(Pong {})) + } } From 98bf1026bb8ee5a53175dfdc3153a63cea37cbf2 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 19 Mar 2024 13:22:29 +0100 Subject: [PATCH 2/8] feat(proto): update runtime::get_client to work with --- proto/src/lib.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 0a8adce4c..4e9fd97c9 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -149,16 +149,14 @@ mod _runtime_client { use tracing::{info, trace}; pub type Client = runtime_client::RuntimeClient< - shuttle_common::claims::ClaimService< - shuttle_common::claims::InjectPropagation, - >, + shuttle_common::claims::InjectPropagation, >; /// Get a runtime client that is correctly configured #[cfg(feature = "client")] pub async fn get_client(port: &str) -> anyhow::Result { info!("connecting runtime client"); - let conn = Endpoint::new(format!("http://127.0.0.1:{port}")) + let conn = Endpoint::new(format!("http://0.0.0.0:{port}")) .context("creating runtime client endpoint")? .connect_timeout(Duration::from_secs(5)); @@ -180,7 +178,6 @@ mod _runtime_client { .context("runtime control port did not open in time")?; let runtime_service = tower::ServiceBuilder::new() - .layer(shuttle_common::claims::ClaimLayer) .layer(shuttle_common::claims::InjectPropagationLayer) .service(channel); From b9879a6b678403254692ee9b973f391677e9c5c1 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 19 Mar 2024 13:29:15 +0100 Subject: [PATCH 3/8] misc(proto): get client takes u16 port --- proto/src/lib.rs | 2 +- service/src/runner.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 4e9fd97c9..f6b7968e5 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -154,7 +154,7 @@ mod _runtime_client { /// Get a runtime client that is correctly configured #[cfg(feature = "client")] - pub async fn get_client(port: &str) -> anyhow::Result { + pub async fn get_client(port: u16) -> anyhow::Result { info!("connecting runtime client"); let conn = Endpoint::new(format!("http://0.0.0.0:{port}")) .context("creating runtime client endpoint")? diff --git a/service/src/runner.rs b/service/src/runner.rs index 5cb949558..94a4b4d73 100644 --- a/service/src/runner.rs +++ b/service/src/runner.rs @@ -13,8 +13,8 @@ pub async fn start( runtime_executable: PathBuf, project_path: &Path, ) -> anyhow::Result<(process::Child, runtime::Client)> { - let port = &port.to_string(); - let args = vec!["--port", port]; + let port_str = port.to_string(); + let args = vec!["--port", &port_str]; info!( args = %format!("{} {}", runtime_executable.display(), args.join(" ")), From 8691c981d08e76be9369c08a75c2d297671b554c Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Thu, 21 Mar 2024 08:51:48 +0100 Subject: [PATCH 4/8] feat: add health toggle to runtime --- runtime/src/alpha.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/runtime/src/alpha.rs b/runtime/src/alpha.rs index f2da0d83f..96aba75b6 100644 --- a/runtime/src/alpha.rs +++ b/runtime/src/alpha.rs @@ -2,7 +2,7 @@ use std::{ collections::BTreeMap, iter::FromIterator, net::{Ipv4Addr, SocketAddr}, - ops::DerefMut, + ops::{Deref, DerefMut}, str::FromStr, sync::Mutex, time::Duration, @@ -112,6 +112,8 @@ pub struct Alpha { kill_tx: Mutex>>, loader: Mutex>, runner: Mutex>, + /// Whether or not the runtime is healthy, which is checked by the ECS task. + healthy: Mutex, } impl Alpha { @@ -123,6 +125,7 @@ impl Alpha { kill_tx: Mutex::new(None), loader: Mutex::new(Some(loader)), runner: Mutex::new(Some(runner)), + healthy: Mutex::new(false), } } } @@ -226,6 +229,9 @@ where } }; + println!("setting current state to healthy"); + *self.healthy.lock().unwrap() = true; + Ok(Response::new(LoadResponse { success: true, message: String::new(), @@ -403,6 +409,13 @@ where } async fn health_check(&self, _request: Request) -> Result, Status> { + if !self.healthy.lock().unwrap().deref() { + println!("responded negatively to health check"); + return Err(Status::unavailable( + "runtime has not reached a healthy state", + )); + } + println!("responded positively to health check"); Ok(Response::new(Pong {})) } } From 22eda79e53cd7ad7c7695ac2297e0aea0261e079 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Fri, 22 Mar 2024 12:57:36 +0100 Subject: [PATCH 5/8] feat: set runtime to unhealthy if it doesn't start within 60s --- runtime/src/alpha.rs | 40 ++++++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/runtime/src/alpha.rs b/runtime/src/alpha.rs index 96aba75b6..6d9ad3e9b 100644 --- a/runtime/src/alpha.rs +++ b/runtime/src/alpha.rs @@ -4,7 +4,7 @@ use std::{ net::{Ipv4Addr, SocketAddr}, ops::{Deref, DerefMut}, str::FromStr, - sync::Mutex, + sync::{Arc, Mutex}, time::Duration, }; @@ -106,14 +106,21 @@ pub async fn start(loader: impl Loader + Send + 'static, runner: impl Runner + S }; } +pub enum State { + Unhealthy, + Loading, + Running, +} + pub struct Alpha { // Mutexes are for interior mutability stopped_tx: Sender<(StopReason, String)>, kill_tx: Mutex>>, loader: Mutex>, runner: Mutex>, - /// Whether or not the runtime is healthy, which is checked by the ECS task. - healthy: Mutex, + /// The current state of the runtime, which is used by the ECS task to determine if the runtime + /// is healthy. + state: Arc>, } impl Alpha { @@ -125,7 +132,7 @@ impl Alpha { kill_tx: Mutex::new(None), loader: Mutex::new(Some(loader)), runner: Mutex::new(Some(runner)), - healthy: Mutex::new(false), + state: Arc::new(Mutex::new(State::Unhealthy)), } } } @@ -230,7 +237,22 @@ where }; println!("setting current state to healthy"); - *self.healthy.lock().unwrap() = true; + *self.state.lock().unwrap() = State::Loading; + + let state = self.state.clone(); + + // Ensure that the runtime is set to unhealthy if it doesn't reach the running state after + // it has sent a load response, so that the ECS task will fail. + tokio::spawn(async move { + // Note: The timeout is quite low as we are not actually provisioning resources after + // sending the load response. + tokio::time::sleep(Duration::from_secs(60)).await; + let mut state = state.lock().unwrap(); + if !matches!(state.deref(), State::Running) { + println!("the runtime failed to enter the running state before timing out"); + *state = State::Unhealthy; + } + }); Ok(Response::new(LoadResponse { success: true, @@ -364,6 +386,8 @@ where ..Default::default() }; + *self.state.lock().unwrap() = State::Running; + Ok(Response::new(message)) } @@ -409,13 +433,13 @@ where } async fn health_check(&self, _request: Request) -> Result, Status> { - if !self.healthy.lock().unwrap().deref() { - println!("responded negatively to health check"); + if matches!(self.state.lock().unwrap().deref(), State::Unhealthy) { + println!("runtime health check failed"); return Err(Status::unavailable( "runtime has not reached a healthy state", )); } - println!("responded positively to health check"); + Ok(Response::new(Pong {})) } } From 03efbd5cb939c924a1174ddf9a49ed97a532733d Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Fri, 22 Mar 2024 13:44:00 +0100 Subject: [PATCH 6/8] feat: change runtime::get_client to take address --- proto/src/lib.rs | 4 ++-- service/src/runner.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/proto/src/lib.rs b/proto/src/lib.rs index f6b7968e5..55d6cbc28 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -154,9 +154,9 @@ mod _runtime_client { /// Get a runtime client that is correctly configured #[cfg(feature = "client")] - pub async fn get_client(port: u16) -> anyhow::Result { + pub async fn get_client(address: String) -> anyhow::Result { info!("connecting runtime client"); - let conn = Endpoint::new(format!("http://0.0.0.0:{port}")) + let conn = Endpoint::new(address) .context("creating runtime client endpoint")? .connect_timeout(Duration::from_secs(5)); diff --git a/service/src/runner.rs b/service/src/runner.rs index 94a4b4d73..b772e573f 100644 --- a/service/src/runner.rs +++ b/service/src/runner.rs @@ -30,7 +30,7 @@ pub async fn start( .spawn() .context("spawning runtime process")?; - let runtime_client = runtime::get_client(port).await?; + let runtime_client = runtime::get_client(format!("http://0.0.0.0:{port}")).await?; Ok((runtime, runtime_client)) } From 862ffd81160852e3d030a32572696dc332f3d5a9 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Fri, 22 Mar 2024 18:58:19 +0100 Subject: [PATCH 7/8] feat: kill runtime if it doesn't become healthy in time --- runtime/src/alpha.rs | 40 +++++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/runtime/src/alpha.rs b/runtime/src/alpha.rs index 6d9ad3e9b..82111b2bd 100644 --- a/runtime/src/alpha.rs +++ b/runtime/src/alpha.rs @@ -93,17 +93,30 @@ pub async fn start(loader: impl Loader + Send + 'static, runner: impl Runner + S .http2_keepalive_interval(Some(Duration::from_secs(60))) .layer(ExtractPropagationLayer); + // A channel we can use to kill the runtime if it does not become healthy in time. + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let router = { - let alpha = Alpha::new(loader, runner); + let alpha = Alpha::new(loader, runner, tx); let svc = RuntimeServer::new(alpha); server_builder.add_service(svc) }; - match router.serve(addr).await { - Ok(_) => {} - Err(e) => panic!("Error while serving address {addr}: {e}"), - }; + tokio::select! { + res = router.serve(addr) => { + match res{ + Ok(_) => {} + Err(e) => panic!("Error while serving address {addr}: {e}") + } + } + res = rx => { + match res{ + Ok(_) => panic!("Received runtime kill signal"), + Err(e) => panic!("Receiver error: {e}") + } + } + } } pub enum State { @@ -121,10 +134,11 @@ pub struct Alpha { /// The current state of the runtime, which is used by the ECS task to determine if the runtime /// is healthy. state: Arc>, + runtime_kill_tx: Mutex>>, } impl Alpha { - pub fn new(loader: L, runner: R) -> Self { + pub fn new(loader: L, runner: R, runtime_kill_tx: tokio::sync::oneshot::Sender<()>) -> Self { let (stopped_tx, _stopped_rx) = broadcast::channel(10); Self { @@ -133,6 +147,7 @@ impl Alpha { loader: Mutex::new(Some(loader)), runner: Mutex::new(Some(runner)), state: Arc::new(Mutex::new(State::Unhealthy)), + runtime_kill_tx: Mutex::new(Some(runtime_kill_tx)), } } } @@ -240,6 +255,13 @@ where *self.state.lock().unwrap() = State::Loading; let state = self.state.clone(); + let runtime_kill_tx = self + .runtime_kill_tx + .lock() + .unwrap() + .deref_mut() + .take() + .unwrap(); // Ensure that the runtime is set to unhealthy if it doesn't reach the running state after // it has sent a load response, so that the ECS task will fail. @@ -247,10 +269,10 @@ where // Note: The timeout is quite low as we are not actually provisioning resources after // sending the load response. tokio::time::sleep(Duration::from_secs(60)).await; - let mut state = state.lock().unwrap(); - if !matches!(state.deref(), State::Running) { + if !matches!(state.lock().unwrap().deref(), State::Running) { println!("the runtime failed to enter the running state before timing out"); - *state = State::Unhealthy; + + runtime_kill_tx.send(()).unwrap(); } }); From e80da4b60245cc0866198f9ccd0f52ecd7218af3 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 2 Apr 2024 12:17:39 +0200 Subject: [PATCH 8/8] feat: increase provisioning timeout duration --- runtime/src/alpha.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/alpha.rs b/runtime/src/alpha.rs index 82111b2bd..a3d5070bd 100644 --- a/runtime/src/alpha.rs +++ b/runtime/src/alpha.rs @@ -268,7 +268,7 @@ where tokio::spawn(async move { // Note: The timeout is quite low as we are not actually provisioning resources after // sending the load response. - tokio::time::sleep(Duration::from_secs(60)).await; + tokio::time::sleep(Duration::from_secs(180)).await; if !matches!(state.lock().unwrap().deref(), State::Running) { println!("the runtime failed to enter the running state before timing out");