Skip to content

Commit

Permalink
linera net up --kubernetes multiple validators (#1255)
Browse files Browse the repository at this point in the history
## Motivation

We need to be able to run multiple validator for feature parity with `linera net up` as well as being able to pass the end to end tests eventually

## Proposal

Implement that functionality

## Test Plan

Ran it:
https://gist.github.com/andresilva91/8cb04b2918b4cbad771ff76ac09174f2

Then ran `sync-balance` against it, and it succeeded.
  • Loading branch information
Andre da Silva authored Nov 22, 2023
1 parent a844665 commit 230e2c4
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 81 deletions.
10 changes: 10 additions & 0 deletions kubernetes/linera-validator/templates/scylladb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ spec:
labels:
app: scylladb
spec:
# In local runs we noticed that subsequent validators don't reset this value
# so we need to set it to a large enough value before initializing ScyllaDB,
# or the initialization will fail. 1048576 seems to be a commonly used constant
# for this for ScyllaDB
initContainers:
- name: increase-aio-max-nr
image: busybox
command: ['sh', '-c', 'echo 1048576 > /proc/sys/fs/aio-max-nr']
securityContext:
privileged: true
containers:
- name: scylladb
image: scylladb/scylla:5.2
Expand Down
2 changes: 1 addition & 1 deletion kubernetes/linera-validator/templates/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ metadata:
name: shards
spec:
serviceName: "shards"
replicas: 10
replicas: {{ .Values.numShards }}
selector:
matchLabels:
app: shards
Expand Down
1 change: 1 addition & 0 deletions kubernetes/linera-validator/values-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ lineraImage: linera-test:latest
lineraImagePullPolicy: Never
logLevel: "debug"
proxyPort: 19100
numShards: 10

# Loki
loki-stack:
Expand Down
13 changes: 7 additions & 6 deletions linera-service/src/cli_wrappers/helm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ pub struct HelmRelease {
}

impl HelmRelease {
pub fn new(name: String) -> Self {
Self { name }
}

pub async fn install(
&self,
name: String,
configs_dir: &PathBuf,
server_config_id: usize,
github_root: &Path,
num_shards: usize,
cluster_id: u32,
) -> Result<()> {
let helm_release = Self { name };
let execution_dir = format!("{}/kubernetes/linera-validator", github_root.display());

let configs_dir = diff_paths(configs_dir, execution_dir.clone())
Expand All @@ -31,7 +30,7 @@ impl HelmRelease {
Command::new("helm")
.current_dir(&execution_dir)
.arg("install")
.arg(&self.name)
.arg(&helm_release.name)
.arg(".")
.args(["--values", "values-local.yaml"])
.arg("--wait")
Expand All @@ -44,6 +43,8 @@ impl HelmRelease {
"--set",
&format!("validator.genesisConfig={configs_dir}/genesis.json"),
])
.args(["--set", &format!("numShards={num_shards}")])
.args(["--kube-context", &format!("kind-{}", cluster_id)])
.spawn_and_wait_for_stdout()
.await?;
Ok(())
Expand Down
26 changes: 11 additions & 15 deletions linera-service/src/cli_wrappers/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,25 @@ pub struct KindCluster {
}

impl KindCluster {
pub async fn new(cluster_id: Option<u32>) -> Result<Self> {
let cluster = match cluster_id {
Some(id) => Self { id },
None => Self {
id: Self::get_random_cluster_id(),
},
};

cluster.create().await?;
Ok(cluster)
}

fn get_random_cluster_id() -> u32 {
rand::thread_rng().gen_range(0..99999)
}

async fn create(&self) -> Result<()> {
pub async fn create() -> Result<Self> {
let cluster = Self {
id: Self::get_random_cluster_id(),
};

Command::new("kind")
.args(["create", "cluster"])
.args(["--name", self.id.to_string().as_str()])
.args(["--name", cluster.id().to_string().as_str()])
.spawn_and_wait_for_stdout()
.await?;
Ok(())
Ok(cluster)
}

pub fn id(&self) -> u32 {
self.id
}

pub async fn delete(&self) -> Result<()> {
Expand Down
35 changes: 22 additions & 13 deletions linera-service/src/cli_wrappers/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,39 @@ use anyhow::{Context, Result};
use tokio::process::{Child, Command};

pub struct KubectlInstance {
pub port_forward_child: Option<Child>,
pub port_forward_children: Vec<Child>,
}

impl KubectlInstance {
pub fn new(port_forward_child: Option<Child>) -> Self {
Self { port_forward_child }
pub fn new(port_forward_children: Vec<Child>) -> Self {
Self {
port_forward_children,
}
}

pub async fn port_forward(&mut self, pod_name: &str, ports: &str) -> Result<()> {
self.port_forward_child = Some(
Command::new("kubectl")
.arg("port-forward")
.arg(pod_name)
.arg(ports)
.spawn()
.context("Port forwarding failed")?,
);
pub async fn port_forward(
&mut self,
pod_name: &str,
ports: &str,
cluster_id: u32,
) -> Result<()> {
let port_forward_child = Command::new("kubectl")
.arg("port-forward")
.arg(pod_name)
.arg(ports)
.args(["--context", &format!("kind-{}", cluster_id)])
.spawn()
.context("Port forwarding failed")?;

self.port_forward_children.push(port_forward_child);
Ok(())
}

pub async fn get_pods(&mut self) -> Result<String> {
pub async fn get_pods(&mut self, cluster_id: u32) -> Result<String> {
Command::new("kubectl")
.arg("get")
.arg("pods")
.args(["--context", &format!("kind-{}", cluster_id)])
.spawn_and_wait_for_stdout()
.await
}
Expand Down
119 changes: 82 additions & 37 deletions linera-service/src/cli_wrappers/local_kubernetes_net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ use crate::{
cli_wrappers::{ClientWrapper, LineraNet, LineraNetConfig, Network},
util::{self, current_binary_parent, CommandExt},
};
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, ensure, Result};
use async_trait::async_trait;
use futures::{
future::{self, join_all},
FutureExt,
};
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams},
Expand All @@ -24,8 +28,9 @@ use tokio::process::Command;
pub struct LocalKubernetesNetConfig {
pub network: Network,
pub testing_prng_seed: Option<u64>,
pub num_initial_validators: usize,
pub num_shards: usize,
pub binaries_dir: Option<PathBuf>,
pub cluster_id: Option<u32>,
}

/// A set of Linera validators running locally as native processes.
Expand All @@ -36,7 +41,9 @@ pub struct LocalKubernetesNet {
tmp_dir: Arc<TempDir>,
binaries_dir: Option<PathBuf>,
kubectl_instance: KubectlInstance,
kind_cluster: KindCluster,
kind_clusters: Vec<KindCluster>,
num_initial_validators: usize,
num_shards: usize,
}

#[async_trait]
Expand All @@ -48,10 +55,24 @@ impl LineraNetConfig for LocalKubernetesNetConfig {
self.network,
self.testing_prng_seed,
self.binaries_dir,
KubectlInstance::new(None),
KindCluster::new(self.cluster_id).await?,
KubectlInstance::new(Vec::new()),
future::ready(
(0..self.num_initial_validators)
.map(|_| async { KindCluster::create().await })
.collect::<Vec<_>>(),
)
.then(join_all)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?,
self.num_initial_validators,
self.num_shards,
)?;
let client = net.make_client();
ensure!(
self.num_initial_validators > 0,
"There should be at least one initial validator"
);
net.generate_initial_validator_config().await.unwrap();
client.create_genesis_config().await.unwrap();
net.run().await.unwrap();
Expand Down Expand Up @@ -115,13 +136,13 @@ impl LineraNet for LocalKubernetesNet {
}

async fn terminate(mut self) -> Result<()> {
let mut port_forward_child = self
.kubectl_instance
.port_forward_child
.expect("Child should be set");
port_forward_child.kill().await?;
for mut port_forward_child in self.kubectl_instance.port_forward_children {
port_forward_child.kill().await?;
}

self.kind_cluster.delete().await?;
for kind_cluster in self.kind_clusters {
kind_cluster.delete().await?;
}
Ok(())
}
}
Expand All @@ -132,7 +153,9 @@ impl LocalKubernetesNet {
testing_prng_seed: Option<u64>,
binaries_dir: Option<PathBuf>,
kubectl_instance: KubectlInstance,
kind_cluster: KindCluster,
kind_clusters: Vec<KindCluster>,
num_initial_validators: usize,
num_shards: usize,
) -> Result<Self> {
Ok(Self {
network,
Expand All @@ -141,7 +164,9 @@ impl LocalKubernetesNet {
tmp_dir: Arc::new(tempdir()?),
binaries_dir,
kubectl_instance,
kind_cluster,
kind_clusters,
num_initial_validators,
num_shards,
})
}

Expand All @@ -155,9 +180,9 @@ impl LocalKubernetesNet {
fn configuration_string(&self, server_number: usize) -> Result<String> {
let n = server_number;
let path = self.tmp_dir.path().join(format!("validator_{n}.toml"));
let port = 19100;
let internal_port = 20100;
let metrics_port = 21100;
let port = 19100 + server_number;
let internal_port = 20100 + server_number;
let metrics_port = 21100 + server_number;
let mut content = format!(
r#"
server_config_path = "server_{n}.json"
Expand All @@ -173,9 +198,9 @@ impl LocalKubernetesNet {
Grpc = "ClearText"
"#
);
for k in 0..10 {
let shard_port = 19100;
let shard_metrics_port = 21100;
for k in 0..self.num_shards {
let shard_port = 19100 + server_number;
let shard_metrics_port = 21100 + server_number;
content.push_str(&format!(
r#"
Expand Down Expand Up @@ -204,7 +229,9 @@ impl LocalKubernetesNet {
self.testing_prng_seed = Some(seed + 1);
}
command.arg("--validators");
command.arg(&self.configuration_string(0)?);
for i in 0..self.num_initial_validators {
command.arg(&self.configuration_string(i)?);
}
command
.args(["--committee", "committee.json"])
.spawn_and_wait_for_stdout()
Expand All @@ -230,36 +257,54 @@ impl LocalKubernetesNet {
)
.await?;

self.kind_cluster
.load_docker_image(docker_image.get_name())
.await?;

let base_dir = github_root
.join("kubernetes")
.join("linera-validator")
.join("working");
fs::copy(
self.tmp_dir.path().join("server_0.json"),
base_dir.join("server_0.json"),
)?;
fs::copy(
self.tmp_dir.path().join("genesis.json"),
base_dir.join("genesis.json"),
)?;

let helm_release = HelmRelease::new(String::from("linera-core"));
helm_release.install(&base_dir, 0, &github_root).await?;
for i in 0..self.num_initial_validators {
let cluster_id = self.kind_clusters[i].id();
self.kind_clusters[i]
.load_docker_image(docker_image.get_name())
.await?;

let output = self.kubectl_instance.get_pods().await?;
let validator_pod_name = output
.split_whitespace()
.find(|&t| t.contains("proxy"))
.expect("Getting validator pod name should not fail");
let server_config_filename = format!("server_{}.json", i);
fs::copy(
self.tmp_dir.path().join(&server_config_filename),
base_dir.join(&server_config_filename),
)?;

self.kubectl_instance
.port_forward(validator_pod_name, "19100:19100")
HelmRelease::install(
String::from("linera-core"),
&base_dir,
i,
&github_root,
self.num_shards,
cluster_id,
)
.await?;

// Query the cluster
let output = self.kubectl_instance.get_pods(cluster_id).await?;
let validator_pod_name = output
.split_whitespace()
.find(|&t| t.contains("proxy"))
.expect("Getting validator pod name should not fail");

let local_port = 19100 + i;
self.kubectl_instance
.port_forward(
validator_pod_name,
&format!("{local_port}:{local_port}"),
cluster_id,
)
.await?;
}

Ok(())
}
}
Loading

0 comments on commit 230e2c4

Please sign in to comment.