Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subsystem-bench: Prepare CI output #3158

Merged
merged 11 commits into from
Feb 6, 2024
32 changes: 19 additions & 13 deletions polkadot/node/subsystem-bench/src/availability/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{core::mock::ChainApiState, TestEnvironment};
use crate::{
core::{environment::CollectedResourceUsage, mock::ChainApiState},
TestEnvironment,
};
use av_store::NetworkAvailabilityState;
use bitvec::bitvec;
use colored::Colorize;
Expand Down Expand Up @@ -417,7 +420,11 @@ impl TestState {
}
}

pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: TestState) {
pub async fn benchmark_availability_read(
benchmark_name: &str,
env: &mut TestEnvironment,
mut state: TestState,
) -> CollectedResourceUsage {
let config = env.config().clone();

env.import_block(new_block_import_info(Hash::repeat_byte(1), 1)).await;
Expand Down Expand Up @@ -477,12 +484,15 @@ pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: T
format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
);

env.display_network_usage();
env.display_cpu_usage(&["availability-recovery"]);
env.stop().await;
env.collect_resource_usage(benchmark_name, &["availability-recovery"])
}

pub async fn benchmark_availability_write(env: &mut TestEnvironment, mut state: TestState) {
pub async fn benchmark_availability_write(
benchmark_name: &str,
env: &mut TestEnvironment,
mut state: TestState,
) -> CollectedResourceUsage {
let config = env.config().clone();

env.metrics().set_n_validators(config.n_validators);
Expand Down Expand Up @@ -634,15 +644,11 @@ pub async fn benchmark_availability_write(env: &mut TestEnvironment, mut state:
format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
);

env.display_network_usage();

env.display_cpu_usage(&[
"availability-distribution",
"bitfield-distribution",
"availability-store",
]);

env.stop().await;
env.collect_resource_usage(
benchmark_name,
&["availability-distribution", "bitfield-distribution", "availability-store"],
)
}

pub fn peer_bitfield_message_v2(
Expand Down
116 changes: 79 additions & 37 deletions polkadot/node/subsystem-bench/src/core/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use colored::Colorize;
use core::time::Duration;
use futures::{Future, FutureExt};
use polkadot_overseer::{BlockInfo, Handle as OverseerHandle};
use serde::{Deserialize, Serialize};

use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt};
use polkadot_node_subsystem_types::Hash;
Expand Down Expand Up @@ -328,57 +329,98 @@ impl TestEnvironment {
}
}

/// Display network usage stats.
pub fn display_network_usage(&self) {
let stats = self.network().peer_stats(0);

let total_node_received = stats.received() / 1024;
let total_node_sent = stats.sent() / 1024;

println!(
"\nPayload bytes received from peers: {}, {}",
format!("{:.2} KiB total", total_node_received).blue(),
format!("{:.2} KiB/block", total_node_received / self.config().num_blocks)
.bright_blue()
);
pub fn collect_resource_usage(
&self,
benchmark_name: &str,
subsystems_under_test: &[&str],
) -> CollectedResourceUsage {
CollectedResourceUsage {
benchmark_name: benchmark_name.to_string(),
network: self.network_usage(),
cpu: self.cpu_usage(subsystems_under_test),
}
}

println!(
"Payload bytes sent to peers: {}, {}",
format!("{:.2} KiB total", total_node_sent).blue(),
format!("{:.2} KiB/block", total_node_sent / self.config().num_blocks).bright_blue()
);
fn network_usage(&self) -> Vec<ResourceUsage> {
let stats = self.network().peer_stats(0);
let total_node_received = (stats.received() / 1024) as f64;
let total_node_sent = (stats.sent() / 1024) as f64;
let num_blocks = self.config().num_blocks as f64;

vec![
ResourceUsage {
resource: "Received from peers".to_string(),
total: total_node_received,
per_block: total_node_received / num_blocks,
},
ResourceUsage {
resource: "Sent to peers".to_string(),
total: total_node_sent,
per_block: total_node_sent / num_blocks,
},
]
}

/// Print CPU usage stats in the CLI.
pub fn display_cpu_usage(&self, subsystems_under_test: &[&str]) {
fn cpu_usage(&self, subsystems_under_test: &[&str]) -> Vec<ResourceUsage> {
let test_metrics = super::display::parse_metrics(self.registry());
let mut usage = vec![];
let num_blocks = self.config().num_blocks as f64;

for subsystem in subsystems_under_test.iter() {
let subsystem_cpu_metrics =
test_metrics.subset_with_label_value("task_group", subsystem);
let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
println!(
"{} CPU usage {}",
subsystem.to_string().bright_green(),
format!("{:.3}s", total_cpu).bright_purple()
);
println!(
"{} CPU usage per block {}",
subsystem.to_string().bright_green(),
format!("{:.3}s", total_cpu / self.config().num_blocks as f64).bright_purple()
);
usage.push(ResourceUsage {
resource: subsystem.to_string(),
total: total_cpu,
per_block: total_cpu / num_blocks,
});
}

let test_env_cpu_metrics =
test_metrics.subset_with_label_value("task_group", "test-environment");
let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
println!(
"Total test environment CPU usage {}",
format!("{:.3}s", total_cpu).bright_purple()
);
println!(
"Test environment CPU usage per block {}",
format!("{:.3}s", total_cpu / self.config().num_blocks as f64).bright_purple()

usage.push(ResourceUsage {
resource: "Test environment".to_string(),
total: total_cpu,
per_block: total_cpu / num_blocks,
});

usage
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CollectedResourceUsage {
benchmark_name: String,
network: Vec<ResourceUsage>,
cpu: Vec<ResourceUsage>,
}

impl std::fmt::Display for CollectedResourceUsage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"\n{}\n\n{}\n{}\n\n{}\n{}\n",
self.benchmark_name.purple(),
format!("{:<32}{:>12}{:>12}", "Network usage, KiB", "total", "per block").blue(),
self.network.iter().map(|v| v.to_string()).collect::<Vec<String>>().join("\n"),
format!("{:<32}{:>12}{:>12}", "CPU usage, s", "total", "per block").blue(),
AndreiEres marked this conversation as resolved.
Show resolved Hide resolved
self.cpu.iter().map(|v| v.to_string()).collect::<Vec<String>>().join("\n")
)
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ResourceUsage {
resource: String,
total: f64,
per_block: f64,
}

impl std::fmt::Display for ResourceUsage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:<32}{:>12.3}{:>12.3}", self.resource.cyan(), self.total, self.per_block)
}
}
63 changes: 45 additions & 18 deletions polkadot/node/subsystem-bench/src/subsystem-bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ struct BenchCli {
/// Enable Cache Misses Profiling with Valgrind. Linux only, Valgrind must be in the PATH
pub cache_misses: bool,

#[clap(long, default_value_t = false)]
/// Shows the output in YAML format
pub ci: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we can find a better name here like:

--output-format=yaml
--output-format=json

or even simpler

--yaml-output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can start with more obvious --yaml-output, because I have no plans for additional json one :-)


#[command(subcommand)]
pub objective: cli::TestObjective,
}
Expand Down Expand Up @@ -163,27 +167,43 @@ impl BenchCli {
format!("Sequence contains {} step(s)", num_steps).bright_purple()
);
for (index, test_config) in test_sequence.into_iter().enumerate() {
let benchmark_name = format!("{} #{}", &options.path, index + 1);
gum::info!(target: LOG_TARGET, "{}", format!("Step {}/{}", index + 1, num_steps).bright_purple(),);
display_configuration(&test_config);

match test_config.objective {
let usage = match test_config.objective {
TestObjective::DataAvailabilityRead(ref _opts) => {
let mut state = TestState::new(&test_config);
let (mut env, _protocol_config) = prepare_test(test_config, &mut state);
env.runtime().block_on(availability::benchmark_availability_read(
&mut env, state,
));
&benchmark_name,
&mut env,
state,
))
},
TestObjective::DataAvailabilityWrite => {
let mut state = TestState::new(&test_config);
let (mut env, _protocol_config) = prepare_test(test_config, &mut state);
env.runtime().block_on(availability::benchmark_availability_write(
&mut env, state,
));
&benchmark_name,
&mut env,
state,
))
},
_ => {
gum::error!("Invalid test objective in sequence");
continue;
},
_ => gum::error!("Invalid test objective in sequence"),
}
};

let output = if self.ci {
serde_yaml::to_string(&vec![usage])?
} else {
usage.to_string()
};
println!("{}", output);
}

return Ok(())
},
TestObjective::DataAvailabilityRead(ref _options) => self.create_test_configuration(),
Expand Down Expand Up @@ -222,23 +242,30 @@ impl BenchCli {
let mut state = TestState::new(&test_config);
let (mut env, _protocol_config) = prepare_test(test_config, &mut state);

match self.objective {
TestObjective::DataAvailabilityRead(_options) => {
env.runtime()
.block_on(availability::benchmark_availability_read(&mut env, state));
},
TestObjective::DataAvailabilityWrite => {
env.runtime()
.block_on(availability::benchmark_availability_write(&mut env, state));
},
TestObjective::TestSequence(_options) => {},
}
let usage = match self.objective {
TestObjective::DataAvailabilityRead(_options) =>
env.runtime().block_on(availability::benchmark_availability_read(
"benchmark_availability_read",
&mut env,
state,
)),
TestObjective::DataAvailabilityWrite =>
env.runtime().block_on(availability::benchmark_availability_write(
"benchmark_availability_write",
&mut env,
state,
)),
TestObjective::TestSequence(_options) => todo!(),
};

if let Some(agent_running) = agent_running {
let agent_ready = agent_running.stop()?;
agent_ready.shutdown();
}

let output = if self.ci { serde_yaml::to_string(&vec![usage])? } else { usage.to_string() };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This logic is duplicated you can move it in a small funciton.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. But we're going to remove the last branch and keep only run with test files. So this duplication is going to be removed as well.

println!("{}", output);

Ok(())
}
}
Expand Down
Loading