Skip to content

Commit

Permalink
linera net up --kubernetes multiple validators
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Nov 21, 2023
1 parent 85da892 commit 9abb3cb
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 74 deletions.
10 changes: 10 additions & 0 deletions kubernetes/linera-validator/templates/scylladb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ spec:
labels:
app: scylladb
spec:
# In local runs we noticed that subsequent validators don't reset this value
# so we need to set it to a large enough value before initializing ScyllaDB,
# or the initialization will fail. 1048576 seems to be a commonly used constant
# for this for ScyllaDB
initContainers:
- name: increase-aio-max-nr
image: busybox
command: ['sh', '-c', 'echo 1048576 > /proc/sys/fs/aio-max-nr']
securityContext:
privileged: true
containers:
- name: scylladb
image: scylladb/scylla:5.2
Expand Down
2 changes: 1 addition & 1 deletion kubernetes/linera-validator/templates/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ metadata:
name: shards
spec:
serviceName: "shards"
replicas: 10
replicas: {{ .Values.numShards }}
selector:
matchLabels:
app: shards
Expand Down
1 change: 1 addition & 0 deletions kubernetes/linera-validator/values-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ lineraImage: linera-test:latest
lineraImagePullPolicy: Never
logLevel: "debug"
proxyPort: 19100
numShards: 10

# Loki
loki-stack:
Expand Down
2 changes: 2 additions & 0 deletions linera-service/src/cli_wrappers/helm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl HelmRelease {
configs_dir: &PathBuf,
server_config_id: usize,
github_root: &Path,
num_shards: usize,
) -> Result<()> {
let execution_dir = format!("{}/kubernetes/linera-validator", github_root.display());

Expand All @@ -44,6 +45,7 @@ impl HelmRelease {
"--set",
&format!("validator.genesisConfig={configs_dir}/genesis.json"),
])
.args(["--set", &format!("numShards={num_shards}")])
.spawn_and_wait_for_stdout()
.await?;
Ok(())
Expand Down
26 changes: 11 additions & 15 deletions linera-service/src/cli_wrappers/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,25 @@ pub struct KindCluster {
}

impl KindCluster {
pub async fn new(cluster_id: Option<u32>) -> Result<Self> {
let cluster = match cluster_id {
Some(id) => Self { id },
None => Self {
id: Self::get_random_cluster_id(),
},
};

cluster.create().await?;
Ok(cluster)
}

fn get_random_cluster_id() -> u32 {
rand::thread_rng().gen_range(0..99999)
}

async fn create(&self) -> Result<()> {
pub async fn create() -> Result<Self> {
let cluster = Self {
id: Self::get_random_cluster_id(),
};

Command::new("kind")
.args(["create", "cluster"])
.args(["--name", self.id.to_string().as_str()])
.args(["--name", cluster.id().to_string().as_str()])
.spawn_and_wait_for_stdout()
.await?;
Ok(())
Ok(cluster)
}

pub fn id(&self) -> u32 {
self.id
}

pub async fn delete(&self) -> Result<()> {
Expand Down
36 changes: 25 additions & 11 deletions linera-service/src/cli_wrappers/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,37 @@ use anyhow::{Context, Result};
use tokio::process::{Child, Command};

pub struct KubectlInstance {
pub port_forward_child: Option<Child>,
pub port_forward_children: Vec<Child>,
}

impl KubectlInstance {
pub fn new(port_forward_child: Option<Child>) -> Self {
Self { port_forward_child }
pub fn new(port_forward_children: Vec<Child>) -> Self {
Self {
port_forward_children,
}
}

// If there are multiple clusters, make sure you always properly set the context
// before doing kubectl commands, otherwise you might be querying the wrong cluster
pub async fn config_use_context(&mut self, cluster_id: u32) -> Result<()> {
Command::new("kubectl")
.arg("config")
.arg("use-context")
.arg(&format!("kind-{}", cluster_id))
.spawn_and_wait_for_stdout()
.await?;
Ok(())
}

pub async fn port_forward(&mut self, pod_name: &str, ports: &str) -> Result<()> {
self.port_forward_child = Some(
Command::new("kubectl")
.arg("port-forward")
.arg(pod_name)
.arg(ports)
.spawn()
.context("Port forwarding failed")?,
);
let port_forward_child = Command::new("kubectl")
.arg("port-forward")
.arg(pod_name)
.arg(ports)
.spawn()
.context("Port forwarding failed")?;

self.port_forward_children.push(port_forward_child);
Ok(())
}

Expand Down
116 changes: 78 additions & 38 deletions linera-service/src/cli_wrappers/local_kubernetes_net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ use crate::{
cli_wrappers::{ClientWrapper, LineraNet, LineraNetConfig, Network},
util::{self, current_binary_parent, CommandExt},
};
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, ensure, Result};
use async_trait::async_trait;
use futures::{
future::{self, join_all},
FutureExt,
};
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams},
Expand All @@ -24,8 +28,9 @@ use tokio::process::Command;
pub struct LocalKubernetesNetConfig {
pub network: Network,
pub testing_prng_seed: Option<u64>,
pub num_initial_validators: usize,
pub num_shards: usize,
pub binaries_dir: Option<PathBuf>,
pub cluster_id: Option<u32>,
}

/// A set of Linera validators running locally as native processes.
Expand All @@ -36,7 +41,9 @@ pub struct LocalKubernetesNet {
tmp_dir: Arc<TempDir>,
binaries_dir: Option<PathBuf>,
kubectl_instance: KubectlInstance,
kind_cluster: KindCluster,
kind_clusters: Vec<KindCluster>,
num_initial_validators: usize,
num_shards: usize,
}

#[async_trait]
Expand All @@ -48,10 +55,24 @@ impl LineraNetConfig for LocalKubernetesNetConfig {
self.network,
self.testing_prng_seed,
self.binaries_dir,
KubectlInstance::new(None),
KindCluster::new(self.cluster_id).await?,
KubectlInstance::new(Vec::new()),
future::ready(
(0..self.num_initial_validators)
.map(|_| async { KindCluster::create().await })
.collect::<Vec<_>>(),
)
.then(join_all)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?,
self.num_initial_validators,
self.num_shards,
)?;
let client = net.make_client();
ensure!(
self.num_initial_validators > 0,
"There should be at least one initial validator"
);
net.generate_initial_validator_config().await.unwrap();
client.create_genesis_config().await.unwrap();
net.run().await.unwrap();
Expand Down Expand Up @@ -115,13 +136,13 @@ impl LineraNet for LocalKubernetesNet {
}

async fn terminate(mut self) -> Result<()> {
let mut port_forward_child = self
.kubectl_instance
.port_forward_child
.expect("Child should be set");
port_forward_child.kill().await?;
for mut port_forward_child in self.kubectl_instance.port_forward_children {
port_forward_child.kill().await?;
}

self.kind_cluster.delete().await?;
for kind_cluster in self.kind_clusters {
kind_cluster.delete().await?;
}
Ok(())
}
}
Expand All @@ -132,7 +153,9 @@ impl LocalKubernetesNet {
testing_prng_seed: Option<u64>,
binaries_dir: Option<PathBuf>,
kubectl_instance: KubectlInstance,
kind_cluster: KindCluster,
kind_clusters: Vec<KindCluster>,
num_initial_validators: usize,
num_shards: usize,
) -> Result<Self> {
Ok(Self {
network,
Expand All @@ -141,7 +164,9 @@ impl LocalKubernetesNet {
tmp_dir: Arc::new(tempdir()?),
binaries_dir,
kubectl_instance,
kind_cluster,
kind_clusters,
num_initial_validators,
num_shards,
})
}

Expand All @@ -155,9 +180,9 @@ impl LocalKubernetesNet {
fn configuration_string(&self, server_number: usize) -> Result<String> {
let n = server_number;
let path = self.tmp_dir.path().join(format!("validator_{n}.toml"));
let port = 19100;
let internal_port = 20100;
let metrics_port = 21100;
let port = 19100 + server_number;
let internal_port = 20100 + server_number;
let metrics_port = 21100 + server_number;
let mut content = format!(
r#"
server_config_path = "server_{n}.json"
Expand All @@ -173,9 +198,9 @@ impl LocalKubernetesNet {
Grpc = "ClearText"
"#
);
for k in 0..10 {
let shard_port = 19100;
let shard_metrics_port = 21100;
for k in 0..self.num_shards {
let shard_port = 19100 + server_number;
let shard_metrics_port = 21100 + server_number;
content.push_str(&format!(
r#"
Expand Down Expand Up @@ -204,7 +229,9 @@ impl LocalKubernetesNet {
self.testing_prng_seed = Some(seed + 1);
}
command.arg("--validators");
command.arg(&self.configuration_string(0)?);
for i in 0..self.num_initial_validators {
command.arg(&self.configuration_string(i)?);
}
command
.args(["--committee", "committee.json"])
.spawn_and_wait_for_stdout()
Expand All @@ -230,35 +257,48 @@ impl LocalKubernetesNet {
)
.await?;

self.kind_cluster
.load_docker_image(docker_image.get_name())
.await?;

let base_dir = github_root
.join("kubernetes")
.join("linera-validator")
.join("working");
fs::copy(
self.tmp_dir.path().join("server_0.json"),
base_dir.join("server_0.json"),
)?;
fs::copy(
self.tmp_dir.path().join("genesis.json"),
base_dir.join("genesis.json"),
)?;

let helm_release = HelmRelease::new(String::from("linera-core"));
helm_release.install(&base_dir, 0, &github_root).await?;
for i in 0..self.num_initial_validators {
// Make sure we'll be querying the right cluster
self.kubectl_instance
.config_use_context(self.kind_clusters[i].id())
.await?;

let output = self.kubectl_instance.get_pods().await?;
let validator_pod_name = output
.split_whitespace()
.find(|&t| t.contains("proxy"))
.expect("Getting validator pod name should not fail");
self.kind_clusters[i]
.load_docker_image(docker_image.get_name())
.await?;

self.kubectl_instance
.port_forward(validator_pod_name, "19100:19100")
.await?;
let server_config_filename = format!("server_{}.json", i);
fs::copy(
self.tmp_dir.path().join(&server_config_filename),
base_dir.join(&server_config_filename),
)?;

let helm_release = HelmRelease::new(String::from("linera-core"));
helm_release
.install(&base_dir, i, &github_root, self.num_shards)
.await?;

// Query the cluster
let output = self.kubectl_instance.get_pods().await?;
let validator_pod_name = output
.split_whitespace()
.find(|&t| t.contains("proxy"))
.expect("Getting validator pod name should not fail");

let local_port = 19100 + i;
self.kubectl_instance
.port_forward(validator_pod_name, &format!("{local_port}:{local_port}"))
.await?;
}

Ok(())
}
Expand Down
11 changes: 2 additions & 9 deletions linera-service/src/linera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1149,12 +1149,6 @@ enum NetCommand {
#[cfg(feature = "kubernetes")]
#[structopt(long)]
binaries_dir: Option<PathBuf>,

/// Kind name for the cluster that will be created for the Kubernetes deployment.
/// Must be a number. If not specified, a random number will be generated.
#[cfg(feature = "kubernetes")]
#[structopt(long)]
cluster_id: Option<u32>,
},

/// Print a bash helper script to make `linera net up` easier to use. The script is
Expand Down Expand Up @@ -2258,8 +2252,6 @@ async fn main() -> Result<(), anyhow::Error> {
kubernetes,
#[cfg(feature = "kubernetes")]
binaries_dir,
#[cfg(feature = "kubernetes")]
cluster_id,
} => {
if *validators < 1 {
panic!("The local test network must have at least one validator.");
Expand All @@ -2273,8 +2265,9 @@ async fn main() -> Result<(), anyhow::Error> {
let config = LocalKubernetesNetConfig {
network: Network::Grpc,
testing_prng_seed: *testing_prng_seed,
num_initial_validators: *validators,
num_shards: *shards,
binaries_dir: binaries_dir.clone(),
cluster_id: *cluster_id,
};
let (net, client1) = config.instantiate().await?;
Ok(net_up(extra_wallets, net, client1).await?)
Expand Down

0 comments on commit 9abb3cb

Please sign in to comment.