From bfdd07735ec2772cb8173301e3bc4c5136a5a6ba Mon Sep 17 00:00:00 2001 From: Samika Kashyap Date: Fri, 12 Jul 2024 12:37:14 -0700 Subject: [PATCH] feature: add generator code --- runner/src/load_generator/gen.rs | 223 ++++++++++++++++++ runner/src/load_generator/mod.rs | 3 + .../utils/ceramic_models_utils.rs | 162 +++++++++++++ .../load_generator/utils/generator_utils.rs | 114 +++++++++ runner/src/load_generator/utils/mod.rs | 2 + runner/src/main.rs | 41 +++- 6 files changed, 533 insertions(+), 12 deletions(-) create mode 100644 runner/src/load_generator/gen.rs create mode 100644 runner/src/load_generator/mod.rs create mode 100644 runner/src/load_generator/utils/ceramic_models_utils.rs create mode 100644 runner/src/load_generator/utils/generator_utils.rs create mode 100644 runner/src/load_generator/utils/mod.rs diff --git a/runner/src/load_generator/gen.rs b/runner/src/load_generator/gen.rs new file mode 100644 index 00000000..722c66ee --- /dev/null +++ b/runner/src/load_generator/gen.rs @@ -0,0 +1,223 @@ +use std::collections::HashMap; +use std::path::PathBuf; + +use anyhow::Result; +use ceramic_core::StreamId; +use clap::Args; +use keramik_common::peer_info::Peer; +use tokio::time::{Duration, Instant};use crate::utils::parse_peers_info; +use crate::CommandResult; +use crate::load_generator::utils::generator_utils::StableLoadUser; +use crate::load_generator::utils::generator_utils::CeramicConfig; +use crate::load_generator::utils::generator_utils::CeramicDidType; +use crate::load_generator::utils::generator_utils::CeramicScenarioParameters; + +// TODO : Use this to envoke a particular scenario, currently we only have one +// so this is unused +pub enum WeekLongSimulationScenarios { + CreateModelInstancesSynced, +} + +/// Options to Simulate command +#[derive(Args, Debug)] +pub struct WeekLongSimulationOpts { + + /// Simulation scenario to run. + #[arg(long, env = "GENERATOR_SCENARIO")] + scenario: String, + + + /// Path to file containing the list of peers. + /// File should contian JSON encoding of Vec. + #[arg(long, env = "GENERATOR_PEERS_PATH")] + peers: PathBuf, + + /// Implmentation details: A task corresponds to a tokio task responsible + /// for making requests. They should have low memory overhead, so you can + /// create many tasks and then use `throttle_requests_rate` to constrain the overall + /// throughput on the node (specifically the HTTP requests made). + #[arg(long, default_value_t = 4, env = "GENERATOR_TASKS")] + tasks: usize, + + /// Duration of the simulation in hours + #[arg(long, env = "GENERATOR_RUN_TIME", default_value = "5h")] + run_time: String, + + + /// Unique value per test run to ensure uniqueness across different generator runs + #[arg(long, env = "GENERATOR_NONCE")] + nonce: u64, + + /// Option to throttle requests (per second) for load control + #[arg(long, env = "GENERATOR_THROTTLE_REQUESTS_RATE")] + throttle_requests_rate: Option, + +} + +//TODO : Use week long simulation scenario and separate out the logic which is ties to a particular scenario +// TODO : This specific behavior is for createModelInstancesSynced scenario +pub async fn simulate_load(opts: WeekLongSimulationOpts) -> Result { + let state = WeekLongSimulationState::try_from_opts(opts).await?; + + // Create two configs to simulate two independent nodes, each having it's own ceramic client + let config_1 = state.initialize_config().await?; + let config_2 = state.initialize_config().await?; + + let peer_addr_1 = state.peers[0].ceramic_addr().expect("Peer does not have a ceramic address"); + let peer_addr_2 = state.peers[1].ceramic_addr().expect("Peer does not have a ceramic address"); + + // Create two users to simulate two independent nodes + let stable_load_user_1 = StableLoadUser::setup_stability_test(config_1.admin_cli, Some(peer_addr_1.to_string())).await; + let stable_load_user_2 = StableLoadUser::setup_stability_test(config_2.admin_cli, Some(peer_addr_2.to_string())).await; + + // Generate a model for the users to create + let model = stable_load_user_1.ceramic_utils.generate_random_model().await?; + + // Index the model on the second node + stable_load_user_2.ceramic_utils.index_model(&model).await?; + + let run_time: u64 = state.run_time.parse().expect("Failed to parse run_time as u64"); + + println!("Model: {:?}", model); + let model_instance_creation_result = create_model_instances_continuously(stable_load_user_1, model, run_time).await; + println!("Model instance creation result: {:?}", model_instance_creation_result); + + Ok(CommandResult::Success) +} + +/** + * Create model instances continuously + * + * @param stable_load_user The user to create the model instances + * @param model The model schema to create model instances from + * @param duration_in_hours The duration to run the simulation in hours + * @return The result of the simulation + */ +pub async fn create_model_instances_continuously( + stable_load_user: StableLoadUser, + model: StreamId, + duration_in_hours: u64, +) -> Result<()> { + let start_time = Instant::now(); + + let duration = Duration::from_secs(duration_in_hours * 60 * 60); + let mut count = 0; + let mut error_map: HashMap = HashMap::new(); + // TODO : Make the rps configurable + // TODO : Make the channel size configurable + // TODO : Make the number of tasks configurable : tasks are currently 100 - + // increasing tasks can help increase throughput + let (tx, mut rx) = tokio::sync::mpsc::channel(10000); + let mut tasks = tokio::task::JoinSet::new(); + for i in 0..100 { + let user_clone = stable_load_user.clone(); + let model = model.clone(); + let tx = tx.clone(); + tasks.spawn(async move { + loop { + if start_time.elapsed() > duration { + println!("loop {i} Duration expired"); + break; + } + match tokio::time::timeout( + Duration::from_secs(5), + user_clone.ceramic_utils.create_random_mid(&model), + ) + .await + { + Ok(Ok(mid)) => { + match tx.send(Ok(mid.to_string())).await { + Ok(_) => {} + Err(e) => { + println!("Failed to send MID: {}", e); + } + } + } + Ok(Err(e)) => { + match tx.send(Err(e.to_string())).await { + Ok(_) => {} + Err(e) => { + println!("Failed to send error: {}", e); + } + } + } + Err(e) => { + match tx.send(Err(e.to_string())).await { + Ok(_) => {} + Err(e) => { + println!("Failed to send error: {}", e); + } + } + } + } + } + }); + } + drop(tx); + loop { + let mut mid_vec: Vec> = Vec::new(); + if rx.recv_many(&mut mid_vec, 10).await > 0 { + for mid in mid_vec { + match mid { + Ok(_) => { + count += 1; + } + Err(err) => { + *error_map.entry(err).or_insert(0) += 1; + } + } + } + } + if start_time.elapsed() > duration { + tasks.abort_all(); + break; + } + } + // After the loop, print the error map + // TODO : Add observability to this, report these errors/counts + println!("Error counts:"); + for (error, count) in &error_map { + println!("Error: {}, Count: {}", error, count); + } + println!("Created {} MIDs in {} hours", count, duration_in_hours); + println!( + "Failed to create {} MIDs in {} hours", + error_map.values().sum::(), duration_in_hours +); + Ok(()) +} + +struct WeekLongSimulationState { + pub peers: Vec, + pub run_time: String, +} + + +impl WeekLongSimulationState { + /** + * Try to create a new instance of the WeekLongSimulationState from the given options + * + * @param opts The options to use + * @return The created instance + */ + async fn try_from_opts(opts: WeekLongSimulationOpts) -> Result { + Ok(Self { + peers: parse_peers_info(opts.peers.clone()).await?, + run_time: opts.run_time, + }) + } + + /** + * Initialize the configuration for the WeekLongSimulationState + * + * @return The created configuration + */ + async fn initialize_config(&self) -> Result { + // Create a CeramicScenarioParameters instance with default values + let params = CeramicScenarioParameters { + did_type: CeramicDidType::EnvInjected, + }; + + CeramicConfig::initialize_config(params).await + } +} \ No newline at end of file diff --git a/runner/src/load_generator/mod.rs b/runner/src/load_generator/mod.rs new file mode 100644 index 00000000..9dd541b9 --- /dev/null +++ b/runner/src/load_generator/mod.rs @@ -0,0 +1,3 @@ + +pub mod gen; +pub mod utils; diff --git a/runner/src/load_generator/utils/ceramic_models_utils.rs b/runner/src/load_generator/utils/ceramic_models_utils.rs new file mode 100644 index 00000000..1bc143c3 --- /dev/null +++ b/runner/src/load_generator/utils/ceramic_models_utils.rs @@ -0,0 +1,162 @@ +use anyhow::Result; +use ceramic_http_client::{ + api::{self}, + ceramic_event::StreamId, + ModelAccountRelation, ModelDefinition, +}; +use reqwest::Client; +use crate::scenario::ceramic::models::{RandomModelInstance, SmallModel}; +use crate::scenario::ceramic::CeramicClient; + +#[derive(Clone, Debug)] +pub struct CeramicModelUtil { + /** + * The ceramic client + */ + pub ceramic_client: CeramicClient, + /** + * The http client + */ + pub http_client: Client, + /** + * The base URL + */ + pub base_url: Option, +} + + +impl CeramicModelUtil { + /** + * Index a model + * + * @param model_id The model to index + */ + pub async fn index_model(&self, model_id: &StreamId) -> Result<()> { + let admin_code = self.get_admin_code().await?; + println!("Admin code: {:?}", admin_code); + let url = self.build_url(&self.ceramic_client.index_endpoint()).await.unwrap(); + let req = self + .ceramic_client + .create_index_model_request(model_id, &admin_code) + .unwrap(); + let resp = self.http_client.post(url).json(&req).send().await?; + if resp.status().is_success() { + Ok(()) + } else { + Err(anyhow::anyhow!("Failed to index model")) + } + } + + /** + * Generate a random model + * + * @return The stream id of the created model + */ + pub async fn generate_random_model(&self) -> Result { + let small_model = + ModelDefinition::new::("load_test_small_model", ModelAccountRelation::List) + .unwrap(); + self.setup_model(small_model).await + } + + /** + * Setup a model + * + * @param model The model to setup + * @return The stream id of the created model + */ + async fn setup_model(&self, model: ModelDefinition) -> Result { + let url = self + .build_url(&self.ceramic_client.streams_endpoint()) + .await + .unwrap(); + let req = self.ceramic_client.create_model_request(&model).await.unwrap(); + let req = self.http_client.post(url).json(&req); + let resp: reqwest::Response = req.send().await?; + if resp.status() == reqwest::StatusCode::OK { + let streams_response: api::StreamsResponse = resp.json().await?; + Ok(streams_response.stream_id) + } else { + Err(anyhow::anyhow!( + "Failed to setup model: status {:?} , resp_text {:?}", + resp.status(), + resp.text().await + )) + } + } + + /** + * Create a random model instance + * + * @param model The model which defines the schema of the model instance + * @return The stream id of the created model instance + */ + pub async fn create_random_mid(&self, model: &StreamId) -> Result { + let data = SmallModel::random(); + return self.create_mid(model, &data).await; + } + + /** + * Create a model instance + * + * @param model The model which defines the schema of the model instance + * @param data The data to create + * @return The stream id of the created model instance + */ + async fn create_mid(&self, model: &StreamId, data: &SmallModel) -> Result { + let url = self + .build_url(&self.ceramic_client.streams_endpoint()) + .await + .unwrap(); + let req = self + .ceramic_client + .create_list_instance_request(model, data) + .await + .unwrap(); + let req = self.http_client.post(url).json(&req); + let resp: reqwest::Response = req.send().await?; + if resp.status() == reqwest::StatusCode::OK { + let parsed_resp: api::StreamsResponse = resp.json().await?; + Ok(parsed_resp.stream_id) + } else { + Err(anyhow::anyhow!( + "Failed to create model: status {:?} , resp_text {:?}", + resp.status(), + resp.text().await + )) + } + } + + /** + * Get the admin code + * + * @return The admin code + */ + async fn get_admin_code(&self) -> Result { + let url = self.build_url(&self.ceramic_client.admin_code_endpoint()).await.unwrap(); + let resp = self.http_client.get(url).send().await?; + let admin_code_resp: api::AdminCodeResponse = resp.json().await?; + let code = &admin_code_resp.code; + Ok(code.to_string()) + } + + /** + * Build a URL + * + * @param path The path to build the URL from + * @return The built URL + */ + async fn build_url(&self, path: &str) -> Result { + let base = self + .base_url + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Base URL is not set"))?; + let separator = if path.starts_with('/') || base.ends_with('/') { + "" + } else { + "/" + }; + let full_url = format!("{}{}{}", base, separator, path); + Ok(full_url) + } +} \ No newline at end of file diff --git a/runner/src/load_generator/utils/generator_utils.rs b/runner/src/load_generator/utils/generator_utils.rs new file mode 100644 index 00000000..d03d5102 --- /dev/null +++ b/runner/src/load_generator/utils/generator_utils.rs @@ -0,0 +1,114 @@ +use anyhow::Result; +use reqwest::Client; +use std::time::Duration; +use ceramic_http_client::CeramicHttpClient; +use crate::scenario::ceramic::Credentials; +use crate::scenario::ceramic::CeramicClient; + +use super::ceramic_models_utils::CeramicModelUtil; + +pub static HTTP_TIMEOUT: Duration = Duration::from_secs(5); +pub static HTTP_POOL_MAX_IDLE_PER_HOST: usize = 300; + +#[derive(Clone, Debug)] +pub struct CeramicConfig { + pub admin_cli: CeramicClient, + pub user_cli: CeramicClient, + pub params: CeramicScenarioParameters, +} + +#[derive(Clone, Debug)] +pub struct CeramicScenarioParameters { + pub did_type: CeramicDidType, +} + +#[derive(Clone, Debug)] +pub enum CeramicDidType { + // Fetch DID from env + EnvInjected, + // Generate DID from scratch + UserGenerated, +} + + +impl CeramicConfig { + pub async fn initialize_config(params: CeramicScenarioParameters) -> Result { + let creds = Credentials::admin_from_env().await?; + let admin_cli = CeramicHttpClient::new(creds.signer); + + let user_cli = match params.did_type { + CeramicDidType::EnvInjected => { + let creds = Credentials::from_env().await?; + CeramicHttpClient::new(creds.signer) + } + CeramicDidType::UserGenerated => { + let creds = Credentials::new_generate_did_key().await?; + CeramicHttpClient::new(creds.signer) + } + }; + + Ok(Self { + admin_cli, + user_cli, + params, + }) + } +} + +/** + * The StableLoadUser struct with an HTTP client tied to a ceramic client and a throttle rate. + */ +#[derive(Clone)] +pub struct StableLoadUser { + /** + * The ceramic client connected to the target peer + */ + pub ceramic_client: CeramicClient, + /** + * The HTTP client to send the requests + */ + pub http_client: Client, + /** + * Maximum number of requests to send per second + */ + pub throttle_rate: Duration, + /** + * The base URL + */ + pub base_url: Option, + /** + * Methods associated with the ceramic client + */ + pub ceramic_utils: CeramicModelUtil, +} + +// Methods associated with StableLoadUser +impl StableLoadUser { + + // TODO : Write a setup function which creates the struct by accepting a targetPeerAddress and ceramicClient and returns a StabilityTestUtils + pub async fn setup_stability_test( + ceramic_client: CeramicClient, + base_url: Option, + ) -> StableLoadUser { + let http_client = Client::builder() + .timeout(HTTP_TIMEOUT) + .cookie_store(false) + .pool_max_idle_per_host(HTTP_POOL_MAX_IDLE_PER_HOST) + .build() + .unwrap(); + + let ceramic_utils = CeramicModelUtil { + ceramic_client: ceramic_client.clone(), + http_client: http_client.clone(), + base_url: base_url.clone(), + }; + + return StableLoadUser { + ceramic_client, + http_client, + throttle_rate: Duration::from_millis(100), + base_url, + ceramic_utils, + }; + } +} diff --git a/runner/src/load_generator/utils/mod.rs b/runner/src/load_generator/utils/mod.rs new file mode 100644 index 00000000..1657bf8c --- /dev/null +++ b/runner/src/load_generator/utils/mod.rs @@ -0,0 +1,2 @@ +pub mod generator_utils; +pub mod ceramic_models_utils; diff --git a/runner/src/main.rs b/runner/src/main.rs index 7ab68edb..fe0a17d9 100644 --- a/runner/src/main.rs +++ b/runner/src/main.rs @@ -5,16 +5,16 @@ mod bootstrap; mod scenario; mod simulate; mod utils; +mod load_generator; +use crate::gen::simulate_load; use keramik_common::telemetry; - use anyhow::Result; use clap::{Parser, Subcommand}; use opentelemetry::global::{shutdown_meter_provider, shutdown_tracer_provider}; use opentelemetry::{global, KeyValue}; use tracing::info; - -use crate::{bootstrap::bootstrap, simulate::simulate}; +use crate::{bootstrap::bootstrap, simulate::simulate, load_generator::gen}; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -39,6 +39,8 @@ pub enum Command { Simulate(simulate::Opts), /// Do nothing and exit Noop, + // TODO: Generate load, currently this command is not used + GenerateLoad(gen::WeekLongSimulationOpts), } impl Command { @@ -47,6 +49,8 @@ impl Command { Command::Bootstrap(_) => "bootstrap", Command::Simulate(_) => "simulate", Command::Noop => "noop", + // TODO : After making operator changes this command will be used to generate load + Command::GenerateLoad(_) => "generate_load", } } } @@ -63,11 +67,20 @@ pub enum CommandResult { Failure(anyhow::Error), } +// TODO : Enable metrics/tracing for load generator command +// Metrics and tracing have been disabled for load generator due to memory issues. +// Memory grows in the runner when this is enabled not making it live long enough to finish the load generation #[tokio::main] async fn main() -> Result<()> { let args = Cli::parse(); - telemetry::init_tracing(Some(args.otlp_endpoint.clone())).await?; - let metrics_controller = telemetry::init_metrics_otlp(args.otlp_endpoint.clone()).await?; + if !matches!(args.command, Command::GenerateLoad(_)) { + telemetry::init_tracing(Some(args.otlp_endpoint.clone())).await?; + } + let metrics_controller = if matches!(args.command, Command::GenerateLoad(_)) { + None + } else { + Some(telemetry::init_metrics_otlp(args.otlp_endpoint.clone()).await?) + }; info!("starting runner"); let meter = global::meter("keramik"); @@ -79,17 +92,21 @@ async fn main() -> Result<()> { runs.add(1, &[KeyValue::new("command", args.command.name())]); info!(?args.command, ?args.otlp_endpoint, "starting runner"); - let success = match args.command { + let success = match args.command.clone() { Command::Bootstrap(opts) => bootstrap(opts).await?, Command::Simulate(opts) => simulate(opts).await?, + Command::GenerateLoad(opts) => simulate_load(opts).await?, Command::Noop => CommandResult::Success, }; - - // Flush traces and metrics before shutdown - shutdown_tracer_provider(); - metrics_controller.force_flush()?; - drop(metrics_controller); - shutdown_meter_provider(); + if !matches!(args.command, Command::GenerateLoad(_)) { + // Flush traces and metrics before shutdown + shutdown_tracer_provider(); + if let Some(metrics_controller) = metrics_controller { + metrics_controller.force_flush()?; + } + drop(metrics_controller); + shutdown_meter_provider(); + } // This fixes lost metrics not sure why :( // Seems to be related to the inflight gRPC request getting cancelled