Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it easier to debug failing local cluster runner tests #2059

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/local-cluster-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ clap-verbosity-flag = { workspace = true }
futures = { workspace = true }
enumset = { workspace = true }
http = { workspace = true }
itertools = { workspace = true }
nix = { version = "0.29.0", features = ["signal"] }
regex = "1.1"
reqwest = { workspace = true }
rev_lines = "0.3.0"
rlimit = { workspace = true }
serde = { workspace = true }
tempfile = { workspace = true }
Expand Down
72 changes: 24 additions & 48 deletions crates/local-cluster-runner/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use serde::{Deserialize, Serialize};
use tracing::info;
use typed_builder::TypedBuilder;

use restate_types::{errors::GenericError, nodes_config::Role};
use restate_types::errors::GenericError;

use crate::node::{Node, NodeStartError, StartedNode};
use crate::node::{HealthCheck, HealthError, Node, NodeStartError, StartedNode};

#[derive(Debug, Serialize, Deserialize, TypedBuilder)]
pub struct Cluster {
Expand Down Expand Up @@ -53,7 +53,7 @@ pub enum ClusterStartError {
#[error("Failed to start node {0}: {1}")]
NodeStartError(usize, NodeStartError),
#[error("Admin node is not healthy after waiting 60 seconds")]
AdminUnhealthy,
AdminUnhealthy(#[from] HealthError),
#[error("Failed to create cluster base directory: {0}")]
CreateDirectory(io::Error),
#[error("Failed to create metadata client: {0}")]
Expand Down Expand Up @@ -94,9 +94,9 @@ impl Cluster {
.map_err(|err| ClusterStartError::NodeStartError(i, err))?;
if node.admin_address().is_some() {
// admin nodes are needed for later nodes to bootstrap. we should wait until they are serving
if !node.wait_admin_healthy(Duration::from_secs(30)).await {
return Err(ClusterStartError::AdminUnhealthy);
}
HealthCheck::Admin
.wait_healthy(&node, Duration::from_secs(30))
.await?;
}
started_nodes.push(node)
}
Expand Down Expand Up @@ -147,55 +147,31 @@ impl StartedCluster {
.map(drop)
}

/// For every node in the cluster with an admin role, wait for up to dur for the admin endpoint
/// to respond to health checks, otherwise return false.
pub async fn wait_admins_healthy(&self, dur: Duration) -> bool {
future::join_all(
/// For every relevant node in the cluster for this check, wait for up to dur for the check
/// to pass
pub async fn wait_check_healthy(
&self,
check: HealthCheck,
dur: Duration,
) -> Result<(), HealthError> {
future::try_join_all(
self.nodes
.iter()
.filter(|n| n.admin_address().is_some())
.map(|n| n.wait_admin_healthy(dur)),
.filter(|n| check.applicable(n))
.map(|n| check.wait_healthy(n, dur)),
)
.await
.into_iter()
.all(|b| b)
}

/// For every node in the cluster with an ingress role, wait for up to dur for the ingress endpoint
/// to respond to health checks, otherwise return false.
pub async fn wait_ingresses_healthy(&self, dur: Duration) -> bool {
future::join_all(
self.nodes
.iter()
.filter(|n| n.ingress_address().is_some())
.map(|n| n.wait_ingress_healthy(dur)),
)
.await
.into_iter()
.all(|b| b)
}

/// For every node in the cluster with a logserver role, wait for up to dur for the logserver
/// to be provisioned, otherwise return false.
pub async fn wait_logservers_provisioned(&self, dur: Duration) -> bool {
future::join_all(
self.nodes
.iter()
.filter(|n| n.config().has_role(Role::LogServer))
.map(|n| n.wait_logserver_provisioned(dur)),
)
.await
.into_iter()
.all(|b| b)
.map(drop)
}

/// Wait for all ingress, admin, logserver roles in the cluster to be healthy/provisioned
pub async fn wait_healthy(&self, dur: Duration) -> bool {
tokio::join!(
self.wait_admins_healthy(dur),
self.wait_ingresses_healthy(dur),
self.wait_logservers_provisioned(dur),
) == (true, true, true)
pub async fn wait_healthy(&self, dur: Duration) -> Result<(), HealthError> {
tokio::try_join!(
self.wait_check_healthy(HealthCheck::Admin, dur),
self.wait_check_healthy(HealthCheck::Ingress, dur),
self.wait_check_healthy(HealthCheck::Logserver, dur),
)?;
Ok(())
}

pub async fn push_node(&mut self, node: Node) -> Result<(), NodeStartError> {
Expand Down
Loading
Loading