From 230e2c46482c0b4828d3abd9bf7b79d69ebfa70e Mon Sep 17 00:00:00 2001 From: Andre da Silva Date: Wed, 22 Nov 2023 12:36:45 -0300 Subject: [PATCH] linera net up --kubernetes multiple validators (#1255) ## Motivation We need to be able to run multiple validator for feature parity with `linera net up` as well as being able to pass the end to end tests eventually ## Proposal Implement that functionality ## Test Plan Ran it: https://gist.github.com/andresilva91/8cb04b2918b4cbad771ff76ac09174f2 Then ran `sync-balance` against it, and it succeeded. --- .../linera-validator/templates/scylladb.yaml | 10 ++ .../linera-validator/templates/server.yaml | 2 +- kubernetes/linera-validator/values-local.yaml | 1 + linera-service/src/cli_wrappers/helm.rs | 13 +- linera-service/src/cli_wrappers/kind.rs | 26 ++-- linera-service/src/cli_wrappers/kubectl.rs | 35 ++++-- .../src/cli_wrappers/local_kubernetes_net.rs | 119 ++++++++++++------ linera-service/src/linera.rs | 11 +- 8 files changed, 136 insertions(+), 81 deletions(-) diff --git a/kubernetes/linera-validator/templates/scylladb.yaml b/kubernetes/linera-validator/templates/scylladb.yaml index 6c80cf824f9..83ee162a92c 100644 --- a/kubernetes/linera-validator/templates/scylladb.yaml +++ b/kubernetes/linera-validator/templates/scylladb.yaml @@ -29,6 +29,16 @@ spec: labels: app: scylladb spec: + # In local runs we noticed that subsequent validators don't reset this value + # so we need to set it to a large enough value before initializing ScyllaDB, + # or the initialization will fail. 1048576 seems to be a commonly used constant + # for this for ScyllaDB + initContainers: + - name: increase-aio-max-nr + image: busybox + command: ['sh', '-c', 'echo 1048576 > /proc/sys/fs/aio-max-nr'] + securityContext: + privileged: true containers: - name: scylladb image: scylladb/scylla:5.2 diff --git a/kubernetes/linera-validator/templates/server.yaml b/kubernetes/linera-validator/templates/server.yaml index d32076f6796..fec6301c4b8 100644 --- a/kubernetes/linera-validator/templates/server.yaml +++ b/kubernetes/linera-validator/templates/server.yaml @@ -20,7 +20,7 @@ metadata: name: shards spec: serviceName: "shards" - replicas: 10 + replicas: {{ .Values.numShards }} selector: matchLabels: app: shards diff --git a/kubernetes/linera-validator/values-local.yaml b/kubernetes/linera-validator/values-local.yaml index e54b3179ed6..3c851e7302f 100644 --- a/kubernetes/linera-validator/values-local.yaml +++ b/kubernetes/linera-validator/values-local.yaml @@ -5,6 +5,7 @@ lineraImage: linera-test:latest lineraImagePullPolicy: Never logLevel: "debug" proxyPort: 19100 +numShards: 10 # Loki loki-stack: diff --git a/linera-service/src/cli_wrappers/helm.rs b/linera-service/src/cli_wrappers/helm.rs index a0194a46bbc..b72d678b1fe 100644 --- a/linera-service/src/cli_wrappers/helm.rs +++ b/linera-service/src/cli_wrappers/helm.rs @@ -12,16 +12,15 @@ pub struct HelmRelease { } impl HelmRelease { - pub fn new(name: String) -> Self { - Self { name } - } - pub async fn install( - &self, + name: String, configs_dir: &PathBuf, server_config_id: usize, github_root: &Path, + num_shards: usize, + cluster_id: u32, ) -> Result<()> { + let helm_release = Self { name }; let execution_dir = format!("{}/kubernetes/linera-validator", github_root.display()); let configs_dir = diff_paths(configs_dir, execution_dir.clone()) @@ -31,7 +30,7 @@ impl HelmRelease { Command::new("helm") .current_dir(&execution_dir) .arg("install") - .arg(&self.name) + .arg(&helm_release.name) .arg(".") .args(["--values", "values-local.yaml"]) .arg("--wait") @@ -44,6 +43,8 @@ impl HelmRelease { "--set", &format!("validator.genesisConfig={configs_dir}/genesis.json"), ]) + .args(["--set", &format!("numShards={num_shards}")]) + .args(["--kube-context", &format!("kind-{}", cluster_id)]) .spawn_and_wait_for_stdout() .await?; Ok(()) diff --git a/linera-service/src/cli_wrappers/kind.rs b/linera-service/src/cli_wrappers/kind.rs index 1454d0c5140..3ef0bf6c98a 100644 --- a/linera-service/src/cli_wrappers/kind.rs +++ b/linera-service/src/cli_wrappers/kind.rs @@ -11,29 +11,25 @@ pub struct KindCluster { } impl KindCluster { - pub async fn new(cluster_id: Option) -> Result { - let cluster = match cluster_id { - Some(id) => Self { id }, - None => Self { - id: Self::get_random_cluster_id(), - }, - }; - - cluster.create().await?; - Ok(cluster) - } - fn get_random_cluster_id() -> u32 { rand::thread_rng().gen_range(0..99999) } - async fn create(&self) -> Result<()> { + pub async fn create() -> Result { + let cluster = Self { + id: Self::get_random_cluster_id(), + }; + Command::new("kind") .args(["create", "cluster"]) - .args(["--name", self.id.to_string().as_str()]) + .args(["--name", cluster.id().to_string().as_str()]) .spawn_and_wait_for_stdout() .await?; - Ok(()) + Ok(cluster) + } + + pub fn id(&self) -> u32 { + self.id } pub async fn delete(&self) -> Result<()> { diff --git a/linera-service/src/cli_wrappers/kubectl.rs b/linera-service/src/cli_wrappers/kubectl.rs index da80c81e220..99bda126b0b 100644 --- a/linera-service/src/cli_wrappers/kubectl.rs +++ b/linera-service/src/cli_wrappers/kubectl.rs @@ -6,30 +6,39 @@ use anyhow::{Context, Result}; use tokio::process::{Child, Command}; pub struct KubectlInstance { - pub port_forward_child: Option, + pub port_forward_children: Vec, } impl KubectlInstance { - pub fn new(port_forward_child: Option) -> Self { - Self { port_forward_child } + pub fn new(port_forward_children: Vec) -> Self { + Self { + port_forward_children, + } } - pub async fn port_forward(&mut self, pod_name: &str, ports: &str) -> Result<()> { - self.port_forward_child = Some( - Command::new("kubectl") - .arg("port-forward") - .arg(pod_name) - .arg(ports) - .spawn() - .context("Port forwarding failed")?, - ); + pub async fn port_forward( + &mut self, + pod_name: &str, + ports: &str, + cluster_id: u32, + ) -> Result<()> { + let port_forward_child = Command::new("kubectl") + .arg("port-forward") + .arg(pod_name) + .arg(ports) + .args(["--context", &format!("kind-{}", cluster_id)]) + .spawn() + .context("Port forwarding failed")?; + + self.port_forward_children.push(port_forward_child); Ok(()) } - pub async fn get_pods(&mut self) -> Result { + pub async fn get_pods(&mut self, cluster_id: u32) -> Result { Command::new("kubectl") .arg("get") .arg("pods") + .args(["--context", &format!("kind-{}", cluster_id)]) .spawn_and_wait_for_stdout() .await } diff --git a/linera-service/src/cli_wrappers/local_kubernetes_net.rs b/linera-service/src/cli_wrappers/local_kubernetes_net.rs index ef8b493a28a..ee06dee7805 100644 --- a/linera-service/src/cli_wrappers/local_kubernetes_net.rs +++ b/linera-service/src/cli_wrappers/local_kubernetes_net.rs @@ -9,8 +9,12 @@ use crate::{ cli_wrappers::{ClientWrapper, LineraNet, LineraNetConfig, Network}, util::{self, current_binary_parent, CommandExt}, }; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, ensure, Result}; use async_trait::async_trait; +use futures::{ + future::{self, join_all}, + FutureExt, +}; use k8s_openapi::api::core::v1::Pod; use kube::{ api::{Api, ListParams}, @@ -24,8 +28,9 @@ use tokio::process::Command; pub struct LocalKubernetesNetConfig { pub network: Network, pub testing_prng_seed: Option, + pub num_initial_validators: usize, + pub num_shards: usize, pub binaries_dir: Option, - pub cluster_id: Option, } /// A set of Linera validators running locally as native processes. @@ -36,7 +41,9 @@ pub struct LocalKubernetesNet { tmp_dir: Arc, binaries_dir: Option, kubectl_instance: KubectlInstance, - kind_cluster: KindCluster, + kind_clusters: Vec, + num_initial_validators: usize, + num_shards: usize, } #[async_trait] @@ -48,10 +55,24 @@ impl LineraNetConfig for LocalKubernetesNetConfig { self.network, self.testing_prng_seed, self.binaries_dir, - KubectlInstance::new(None), - KindCluster::new(self.cluster_id).await?, + KubectlInstance::new(Vec::new()), + future::ready( + (0..self.num_initial_validators) + .map(|_| async { KindCluster::create().await }) + .collect::>(), + ) + .then(join_all) + .await + .into_iter() + .collect::, _>>()?, + self.num_initial_validators, + self.num_shards, )?; let client = net.make_client(); + ensure!( + self.num_initial_validators > 0, + "There should be at least one initial validator" + ); net.generate_initial_validator_config().await.unwrap(); client.create_genesis_config().await.unwrap(); net.run().await.unwrap(); @@ -115,13 +136,13 @@ impl LineraNet for LocalKubernetesNet { } async fn terminate(mut self) -> Result<()> { - let mut port_forward_child = self - .kubectl_instance - .port_forward_child - .expect("Child should be set"); - port_forward_child.kill().await?; + for mut port_forward_child in self.kubectl_instance.port_forward_children { + port_forward_child.kill().await?; + } - self.kind_cluster.delete().await?; + for kind_cluster in self.kind_clusters { + kind_cluster.delete().await?; + } Ok(()) } } @@ -132,7 +153,9 @@ impl LocalKubernetesNet { testing_prng_seed: Option, binaries_dir: Option, kubectl_instance: KubectlInstance, - kind_cluster: KindCluster, + kind_clusters: Vec, + num_initial_validators: usize, + num_shards: usize, ) -> Result { Ok(Self { network, @@ -141,7 +164,9 @@ impl LocalKubernetesNet { tmp_dir: Arc::new(tempdir()?), binaries_dir, kubectl_instance, - kind_cluster, + kind_clusters, + num_initial_validators, + num_shards, }) } @@ -155,9 +180,9 @@ impl LocalKubernetesNet { fn configuration_string(&self, server_number: usize) -> Result { let n = server_number; let path = self.tmp_dir.path().join(format!("validator_{n}.toml")); - let port = 19100; - let internal_port = 20100; - let metrics_port = 21100; + let port = 19100 + server_number; + let internal_port = 20100 + server_number; + let metrics_port = 21100 + server_number; let mut content = format!( r#" server_config_path = "server_{n}.json" @@ -173,9 +198,9 @@ impl LocalKubernetesNet { Grpc = "ClearText" "# ); - for k in 0..10 { - let shard_port = 19100; - let shard_metrics_port = 21100; + for k in 0..self.num_shards { + let shard_port = 19100 + server_number; + let shard_metrics_port = 21100 + server_number; content.push_str(&format!( r#" @@ -204,7 +229,9 @@ impl LocalKubernetesNet { self.testing_prng_seed = Some(seed + 1); } command.arg("--validators"); - command.arg(&self.configuration_string(0)?); + for i in 0..self.num_initial_validators { + command.arg(&self.configuration_string(i)?); + } command .args(["--committee", "committee.json"]) .spawn_and_wait_for_stdout() @@ -230,36 +257,54 @@ impl LocalKubernetesNet { ) .await?; - self.kind_cluster - .load_docker_image(docker_image.get_name()) - .await?; - let base_dir = github_root .join("kubernetes") .join("linera-validator") .join("working"); - fs::copy( - self.tmp_dir.path().join("server_0.json"), - base_dir.join("server_0.json"), - )?; fs::copy( self.tmp_dir.path().join("genesis.json"), base_dir.join("genesis.json"), )?; - let helm_release = HelmRelease::new(String::from("linera-core")); - helm_release.install(&base_dir, 0, &github_root).await?; + for i in 0..self.num_initial_validators { + let cluster_id = self.kind_clusters[i].id(); + self.kind_clusters[i] + .load_docker_image(docker_image.get_name()) + .await?; - let output = self.kubectl_instance.get_pods().await?; - let validator_pod_name = output - .split_whitespace() - .find(|&t| t.contains("proxy")) - .expect("Getting validator pod name should not fail"); + let server_config_filename = format!("server_{}.json", i); + fs::copy( + self.tmp_dir.path().join(&server_config_filename), + base_dir.join(&server_config_filename), + )?; - self.kubectl_instance - .port_forward(validator_pod_name, "19100:19100") + HelmRelease::install( + String::from("linera-core"), + &base_dir, + i, + &github_root, + self.num_shards, + cluster_id, + ) .await?; + // Query the cluster + let output = self.kubectl_instance.get_pods(cluster_id).await?; + let validator_pod_name = output + .split_whitespace() + .find(|&t| t.contains("proxy")) + .expect("Getting validator pod name should not fail"); + + let local_port = 19100 + i; + self.kubectl_instance + .port_forward( + validator_pod_name, + &format!("{local_port}:{local_port}"), + cluster_id, + ) + .await?; + } + Ok(()) } } diff --git a/linera-service/src/linera.rs b/linera-service/src/linera.rs index b4119c24ae0..9923b32cdc0 100644 --- a/linera-service/src/linera.rs +++ b/linera-service/src/linera.rs @@ -1153,12 +1153,6 @@ enum NetCommand { #[cfg(feature = "kubernetes")] #[structopt(long)] binaries_dir: Option, - - /// Kind name for the cluster that will be created for the Kubernetes deployment. - /// Must be a number. If not specified, a random number will be generated. - #[cfg(feature = "kubernetes")] - #[structopt(long)] - cluster_id: Option, }, /// Print a bash helper script to make `linera net up` easier to use. The script is @@ -2281,8 +2275,6 @@ async fn run(options: ClientOptions) -> Result<(), anyhow::Error> { kubernetes, #[cfg(feature = "kubernetes")] binaries_dir, - #[cfg(feature = "kubernetes")] - cluster_id, } => { if *validators < 1 { panic!("The local test network must have at least one validator."); @@ -2296,8 +2288,9 @@ async fn run(options: ClientOptions) -> Result<(), anyhow::Error> { let config = LocalKubernetesNetConfig { network: Network::Grpc, testing_prng_seed: *testing_prng_seed, + num_initial_validators: *validators, + num_shards: *shards, binaries_dir: binaries_dir.clone(), - cluster_id: *cluster_id, }; let (net, client1) = config.instantiate().await?; Ok(net_up(extra_wallets, net, client1).await?)