Skip to content

Commit

Permalink
Create new API for local cluster runner CLI using Pkl
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Oct 10, 2024
1 parent 56e9aca commit d99b1b8
Show file tree
Hide file tree
Showing 23 changed files with 1,691 additions and 104 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ RUST_TEST_THREADS = "1"

[alias]
xtask = "run --package xtask --"
local-cluster-runner = "run --package local-cluster-runner --"
lcr = "run --package local-cluster-runner --"

[build]
rustflags = [
Expand Down
25 changes: 14 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"server",
"benchmarks",
"tools/bifrost-benchpress",
"tools/local-cluster-runner",
"tools/mock-service-endpoint",
"tools/restatectl",
"tools/service-protocol-wireshark-dissector",
Expand Down
5 changes: 0 additions & 5 deletions crates/local-cluster-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@ restate-metadata-store = { workspace = true }
restate-types = { workspace = true, features = ["unsafe-mutable-config"] }

arc-swap = { workspace = true }
clap = { workspace = true }
clap-verbosity-flag = { workspace = true }
futures = { workspace = true }
enumset = { workspace = true }
http = { workspace = true }
nix = { version = "0.29.0", features = ["signal"] }
regex = "1.1"
reqwest = { workspace = true }
rlimit = { workspace = true }
serde = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["process", "fs"] }
toml = "0.8"
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
typed-builder = "0.20.0"
test_bin = "0.4.0"
35 changes: 20 additions & 15 deletions crates/local-cluster-runner/src/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::BTreeMap,
io,
path::{Path, PathBuf},
sync::Arc,
Expand All @@ -19,7 +20,10 @@ pub struct Cluster {
#[builder(setter(into), default = default_cluster_name())]
#[serde(default = "default_cluster_name")]
cluster_name: String,
nodes: Vec<Node>,
#[builder(setter(transform = |nodes: impl IntoIterator<Item = Node>| {
nodes.into_iter().map(|n| (n.node_name().to_owned(), n)).collect()
}))]
nodes: BTreeMap<String, Node>,
#[builder(setter(into), default = default_base_dir())]
#[serde(default = "default_base_dir")]
base_dir: MaybeTempDir,
Expand All @@ -45,13 +49,13 @@ fn default_base_dir() -> MaybeTempDir {
}

fn default_cluster_name() -> String {
"local-cluster".to_owned()
"localcluster".to_owned()
}

#[derive(Debug, thiserror::Error)]
pub enum ClusterStartError {
#[error("Failed to start node {0}: {1}")]
NodeStartError(usize, NodeStartError),
NodeStartError(String, NodeStartError),
#[error("Admin node is not healthy after waiting 60 seconds")]
AdminUnhealthy,
#[error("Failed to create cluster base directory: {0}")]
Expand Down Expand Up @@ -79,26 +83,26 @@ impl Cluster {
.map_err(ClusterStartError::CreateDirectory)?;
}

let mut started_nodes = Vec::with_capacity(nodes.len());
let mut started_nodes = BTreeMap::new();

info!(
"Starting cluster {} in {}",
&cluster_name,
base_dir.as_path().display()
);

for (i, node) in nodes.into_iter().enumerate() {
for (node_name, node) in nodes.into_iter() {
let node = node
.start_clustered(base_dir.as_path(), &cluster_name)
.await
.map_err(|err| ClusterStartError::NodeStartError(i, err))?;
.map_err(|err| ClusterStartError::NodeStartError(node_name.clone(), err))?;
if node.admin_address().is_some() {
// admin nodes are needed for later nodes to bootstrap. we should wait until they are serving
if !node.wait_admin_healthy(Duration::from_secs(30)).await {
return Err(ClusterStartError::AdminUnhealthy);
}
}
started_nodes.push(node)
started_nodes.insert(node_name, node);
}

Ok(StartedCluster {
Expand All @@ -112,7 +116,7 @@ impl Cluster {
pub struct StartedCluster {
cluster_name: String,
base_dir: MaybeTempDir,
pub nodes: Vec<StartedNode>,
pub nodes: BTreeMap<String, StartedNode>,
}

impl StartedCluster {
Expand All @@ -126,14 +130,14 @@ impl StartedCluster {

/// Send a SIGKILL to every node in the cluster
pub async fn kill(&mut self) -> io::Result<()> {
future::try_join_all(self.nodes.iter_mut().map(|n| n.kill()))
future::try_join_all(self.nodes.values_mut().map(|n| n.kill()))
.await
.map(drop)
}

/// Send a SIGTERM to every node in the cluster
pub fn terminate(&self) -> io::Result<()> {
for node in &self.nodes {
for node in self.nodes.values() {
node.terminate()?
}
Ok(())
Expand All @@ -142,7 +146,7 @@ impl StartedCluster {
/// Send a SIGTERM to every node in the cluster, then wait for `dur` for them to exit,
/// otherwise send a SIGKILL to nodes that are still running.
pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result<()> {
future::try_join_all(self.nodes.iter_mut().map(|n| n.graceful_shutdown(dur)))
future::try_join_all(self.nodes.values_mut().map(|n| n.graceful_shutdown(dur)))
.await
.map(drop)
}
Expand All @@ -152,7 +156,7 @@ impl StartedCluster {
pub async fn wait_admins_healthy(&self, dur: Duration) -> bool {
future::join_all(
self.nodes
.iter()
.values()
.filter(|n| n.admin_address().is_some())
.map(|n| n.wait_admin_healthy(dur)),
)
Expand All @@ -166,7 +170,7 @@ impl StartedCluster {
pub async fn wait_ingresses_healthy(&self, dur: Duration) -> bool {
future::join_all(
self.nodes
.iter()
.values()
.filter(|n| n.ingress_address().is_some())
.map(|n| n.wait_ingress_healthy(dur)),
)
Expand All @@ -180,7 +184,7 @@ impl StartedCluster {
pub async fn wait_logservers_provisioned(&self, dur: Duration) -> bool {
future::join_all(
self.nodes
.iter()
.values()
.filter(|n| n.config().has_role(Role::LogServer))
.map(|n| n.wait_logserver_provisioned(dur)),
)
Expand All @@ -199,7 +203,8 @@ impl StartedCluster {
}

pub async fn push_node(&mut self, node: Node) -> Result<(), NodeStartError> {
self.nodes.push(
self.nodes.insert(
node.node_name().to_owned(),
node.start_clustered(self.base_dir.as_path(), self.cluster_name.clone())
.await?,
);
Expand Down
4 changes: 3 additions & 1 deletion crates/local-cluster-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ pub fn shutdown() -> impl Future<Output = &'static str> {
}
}

const RANDOM_SOCKET_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);

pub fn random_socket_address() -> io::Result<SocketAddr> {
let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::LOCALHOST), 0))?;
let listener = TcpListener::bind(RANDOM_SOCKET_ADDRESS)?;
let socket_addr = listener.local_addr()?;

Ok(socket_addr)
Expand Down
59 changes: 0 additions & 59 deletions crates/local-cluster-runner/src/main.rs

This file was deleted.

38 changes: 30 additions & 8 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use restate_types::{
PlainNodeId,
};

use crate::random_socket_address;
use crate::{random_socket_address, RANDOM_SOCKET_ADDRESS};

#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
pub struct Node {
Expand All @@ -56,12 +56,9 @@ pub struct Node {
}
pub fn with_random_ports(self) {
self.base_config.admin.bind_address =
random_socket_address().expect("to find a random port for the admin server");
self.base_config.admin.query_engine.pgsql_bind_address =
random_socket_address().expect("to find a random port for the pgsql server");
self.base_config.ingress.bind_address =
random_socket_address().expect("to find a random port for the ingress server");
self.base_config.admin.bind_address = RANDOM_SOCKET_ADDRESS;
self.base_config.admin.query_engine.pgsql_bind_address = RANDOM_SOCKET_ADDRESS;
self.base_config.ingress.bind_address = RANDOM_SOCKET_ADDRESS;
}
pub fn with_node_name(self, node_name: impl Into<String>) {
Expand All @@ -79,10 +76,12 @@ pub struct Node {
base_config: Configuration,
binary_source: BinarySource,
#[builder(default)]
#[serde(default)]
args: Vec<String>,
#[builder(default = true)]
inherit_env: bool,
#[builder(default)]
#[serde(default)]
env: Vec<(String, String)>,
}

Expand All @@ -92,6 +91,8 @@ pub enum NodeStartError {
Absolute(io::Error),
#[error(transparent)]
BinarySourceError(#[from] BinarySourceError),
#[error("Failed to find a random socket address: {0}")]
BindRandomSocketAddress(io::Error),
#[error("Failed to create node base directory: {0}")]
CreateDirectory(io::Error),
#[error("Failed to create or truncate node config file: {0}")]
Expand Down Expand Up @@ -217,7 +218,7 @@ impl Node {
/// spawned to process output logs and watch for exit.
pub async fn start(self) -> Result<StartedNode, NodeStartError> {
let Self {
base_config,
mut base_config,
binary_source,
args,
inherit_env,
Expand All @@ -232,6 +233,27 @@ impl Node {
)
.map_err(NodeStartError::Absolute)?;

for addr in [
&mut base_config.admin.bind_address,
&mut base_config.admin.query_engine.pgsql_bind_address,
&mut base_config.ingress.bind_address,
] {
if *addr == RANDOM_SOCKET_ADDRESS {
*addr = random_socket_address().map_err(NodeStartError::BindRandomSocketAddress)?;
}
}

if base_config.common.bind_address == BindAddress::Socket(RANDOM_SOCKET_ADDRESS) {
base_config.common.bind_address = BindAddress::Socket(
random_socket_address().map_err(NodeStartError::BindRandomSocketAddress)?,
);
base_config.common.advertised_address = AdvertisedAddress::Http(
format!("http://{}/", base_config.common.bind_address)
.parse()
.expect("random bind address to be a valid http advertise address"),
)
}

if !node_base_dir.exists() {
std::fs::create_dir_all(&node_base_dir).map_err(NodeStartError::CreateDirectory)?;
}
Expand Down
Loading

0 comments on commit d99b1b8

Please sign in to comment.