Skip to content

Commit

Permalink
chore: formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Samika Kashyap committed Jul 15, 2024
1 parent bfdd077 commit 025596d
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 113 deletions.
2 changes: 1 addition & 1 deletion runner/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
};

/// Options to Bootstrap command
#[derive(Args, Debug)]
#[derive(Args, Debug, Clone)]
pub struct Opts {
/// Bootstrap method to use.
#[arg(long, value_enum, default_value_t, env = "BOOTSTRAP_METHOD")]
Expand Down
110 changes: 60 additions & 50 deletions runner/src/load_generator/gen.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
use std::collections::HashMap;
use std::path::PathBuf;

use crate::load_generator::utils::generator_utils::CeramicConfig;
use crate::load_generator::utils::generator_utils::CeramicDidType;
use crate::load_generator::utils::generator_utils::CeramicScenarioParameters;
use crate::load_generator::utils::generator_utils::StableLoadUser;
use crate::utils::parse_peers_info;
use crate::CommandResult;
use anyhow::Result;
use ceramic_core::StreamId;
use clap::Args;
use keramik_common::peer_info::Peer;
use tokio::time::{Duration, Instant};use crate::utils::parse_peers_info;
use crate::CommandResult;
use crate::load_generator::utils::generator_utils::StableLoadUser;
use crate::load_generator::utils::generator_utils::CeramicConfig;
use crate::load_generator::utils::generator_utils::CeramicDidType;
use crate::load_generator::utils::generator_utils::CeramicScenarioParameters;
use tokio::time::{Duration, Instant};

// TODO : Use this to envoke a particular scenario, currently we only have one
// TODO : Use this to envoke a particular scenario, currently we only have one
// so this is unused
#[allow(dead_code)]
pub enum WeekLongSimulationScenarios {
CreateModelInstancesSynced,
}

/// Options to Simulate command
#[derive(Args, Debug)]
#[derive(Args, Debug, Clone)]
pub struct WeekLongSimulationOpts {

/// Simulation scenario to run.
#[arg(long, env = "GENERATOR_SCENARIO")]
scenario: String,


/// Path to file containing the list of peers.
/// File should contian JSON encoding of Vec<Peer>.
#[arg(long, env = "GENERATOR_PEERS_PATH")]
Expand All @@ -43,15 +43,13 @@ pub struct WeekLongSimulationOpts {
#[arg(long, env = "GENERATOR_RUN_TIME", default_value = "5h")]
run_time: String,


/// Unique value per test run to ensure uniqueness across different generator runs
#[arg(long, env = "GENERATOR_NONCE")]
nonce: u64,

/// Option to throttle requests (per second) for load control
#[arg(long, env = "GENERATOR_THROTTLE_REQUESTS_RATE")]
throttle_requests_rate: Option<usize>,

}

//TODO : Use week long simulation scenario and separate out the logic which is ties to a particular scenario
Expand All @@ -63,31 +61,49 @@ pub async fn simulate_load(opts: WeekLongSimulationOpts) -> Result<CommandResult
let config_1 = state.initialize_config().await?;
let config_2 = state.initialize_config().await?;

let peer_addr_1 = state.peers[0].ceramic_addr().expect("Peer does not have a ceramic address");
let peer_addr_2 = state.peers[1].ceramic_addr().expect("Peer does not have a ceramic address");
let peer_addr_1 = state.peers[0]
.ceramic_addr()
.expect("Peer does not have a ceramic address");
let peer_addr_2 = state.peers[1]
.ceramic_addr()
.expect("Peer does not have a ceramic address");

// Create two users to simulate two independent nodes
let stable_load_user_1 = StableLoadUser::setup_stability_test(config_1.admin_cli, Some(peer_addr_1.to_string())).await;
let stable_load_user_2 = StableLoadUser::setup_stability_test(config_2.admin_cli, Some(peer_addr_2.to_string())).await;
let stable_load_user_1 =
StableLoadUser::setup_stability_test(config_1.admin_cli, Some(peer_addr_1.to_string()))
.await;
let stable_load_user_2 =
StableLoadUser::setup_stability_test(config_2.admin_cli, Some(peer_addr_2.to_string()))
.await;

// Generate a model for the users to create
let model = stable_load_user_1.ceramic_utils.generate_random_model().await?;
let model = stable_load_user_1
.ceramic_utils
.generate_random_model()
.await?;

// Index the model on the second node
stable_load_user_2.ceramic_utils.index_model(&model).await?;

let run_time: u64 = state.run_time.parse().expect("Failed to parse run_time as u64");
let run_time: u64 = state
.run_time
.parse()
.expect("Failed to parse run_time as u64");

println!("Model: {:?}", model);
let model_instance_creation_result = create_model_instances_continuously(stable_load_user_1, model, run_time).await;
println!("Model instance creation result: {:?}", model_instance_creation_result);

let model_instance_creation_result =
create_model_instances_continuously(stable_load_user_1, model, run_time).await;
println!(
"Model instance creation result: {:?}",
model_instance_creation_result
);

Ok(CommandResult::Success)
}

/**
* Create model instances continuously
*
*
* @param stable_load_user The user to create the model instances
* @param model The model schema to create model instances from
* @param duration_in_hours The duration to run the simulation in hours
Expand All @@ -105,7 +121,7 @@ pub async fn create_model_instances_continuously(
let mut error_map: HashMap<String, u64> = HashMap::new();
// TODO : Make the rps configurable
// TODO : Make the channel size configurable
// TODO : Make the number of tasks configurable : tasks are currently 100 -
// TODO : Make the number of tasks configurable : tasks are currently 100 -
// increasing tasks can help increase throughput
let (tx, mut rx) = tokio::sync::mpsc::channel(10000);
let mut tasks = tokio::task::JoinSet::new();
Expand All @@ -125,30 +141,24 @@ pub async fn create_model_instances_continuously(
)
.await
{
Ok(Ok(mid)) => {
match tx.send(Ok(mid.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send MID: {}", e);
}
Ok(Ok(mid)) => match tx.send(Ok(mid.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send MID: {}", e);
}
}
Ok(Err(e)) => {
match tx.send(Err(e.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send error: {}", e);
}
},
Ok(Err(e)) => match tx.send(Err(e.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send error: {}", e);
}
}
Err(e) => {
match tx.send(Err(e.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send error: {}", e);
}
},
Err(e) => match tx.send(Err(e.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send error: {}", e);
}
}
},
}
}
});
Expand Down Expand Up @@ -182,8 +192,9 @@ pub async fn create_model_instances_continuously(
println!("Created {} MIDs in {} hours", count, duration_in_hours);
println!(
"Failed to create {} MIDs in {} hours",
error_map.values().sum::<u64>(), duration_in_hours
);
error_map.values().sum::<u64>(),
duration_in_hours
);
Ok(())
}

Expand All @@ -192,11 +203,10 @@ struct WeekLongSimulationState {
pub run_time: String,
}


impl WeekLongSimulationState {
/**
* Try to create a new instance of the WeekLongSimulationState from the given options
*
*
* @param opts The options to use
* @return The created instance
*/
Expand All @@ -209,7 +219,7 @@ impl WeekLongSimulationState {

/**
* Initialize the configuration for the WeekLongSimulationState
*
*
* @return The created configuration
*/
async fn initialize_config(&self) -> Result<CeramicConfig> {
Expand All @@ -220,4 +230,4 @@ impl WeekLongSimulationState {

CeramicConfig::initialize_config(params).await
}
}
}
1 change: 0 additions & 1 deletion runner/src/load_generator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@

pub mod gen;
pub mod utils;
49 changes: 29 additions & 20 deletions runner/src/load_generator/utils/ceramic_models_utils.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::scenario::ceramic::models::{RandomModelInstance, SmallModel};
use crate::scenario::ceramic::CeramicClient;
use anyhow::Result;
use ceramic_http_client::{
api::{self},
ceramic_event::StreamId,
ModelAccountRelation, ModelDefinition,
};
use reqwest::Client;
use crate::scenario::ceramic::models::{RandomModelInstance, SmallModel};
use crate::scenario::ceramic::CeramicClient;

#[derive(Clone, Debug)]
pub struct CeramicModelUtil {
pub struct CeramicModelUser {
/**
* The ceramic client
*/
Expand All @@ -24,17 +24,19 @@ pub struct CeramicModelUtil {
pub base_url: Option<String>,
}


impl CeramicModelUtil {
impl CeramicModelUser {
/**
* Index a model
*
*
* @param model_id The model to index
*/
pub async fn index_model(&self, model_id: &StreamId) -> Result<()> {
let admin_code = self.get_admin_code().await?;
println!("Admin code: {:?}", admin_code);
let url = self.build_url(&self.ceramic_client.index_endpoint()).await.unwrap();
let url = self
.build_url(self.ceramic_client.index_endpoint())
.await
.unwrap();
let req = self
.ceramic_client
.create_index_model_request(model_id, &admin_code)
Expand All @@ -49,7 +51,7 @@ impl CeramicModelUtil {

/**
* Generate a random model
*
*
* @return The stream id of the created model
*/
pub async fn generate_random_model(&self) -> Result<StreamId, anyhow::Error> {
Expand All @@ -61,16 +63,20 @@ impl CeramicModelUtil {

/**
* Setup a model
*
*
* @param model The model to setup
* @return The stream id of the created model
*/
async fn setup_model(&self, model: ModelDefinition) -> Result<StreamId, anyhow::Error> {
let url = self
.build_url(&self.ceramic_client.streams_endpoint())
.build_url(self.ceramic_client.streams_endpoint())
.await
.unwrap();
let req = self
.ceramic_client
.create_model_request(&model)
.await
.unwrap();
let req = self.ceramic_client.create_model_request(&model).await.unwrap();
let req = self.http_client.post(url).json(&req);
let resp: reqwest::Response = req.send().await?;
if resp.status() == reqwest::StatusCode::OK {
Expand All @@ -86,26 +92,26 @@ impl CeramicModelUtil {
}

/**
* Create a random model instance
*
* Create a random model instance
*
* @param model The model which defines the schema of the model instance
* @return The stream id of the created model instance
*/
pub async fn create_random_mid(&self, model: &StreamId) -> Result<StreamId> {
let data = SmallModel::random();
return self.create_mid(model, &data).await;
self.create_mid(model, &data).await
}

/**
* Create a model instance
*
*
* @param model The model which defines the schema of the model instance
* @param data The data to create
* @return The stream id of the created model instance
*/
async fn create_mid(&self, model: &StreamId, data: &SmallModel) -> Result<StreamId> {
let url = self
.build_url(&self.ceramic_client.streams_endpoint())
.build_url(self.ceramic_client.streams_endpoint())
.await
.unwrap();
let req = self
Expand All @@ -129,11 +135,14 @@ impl CeramicModelUtil {

/**
* Get the admin code
*
*
* @return The admin code
*/
async fn get_admin_code(&self) -> Result<String, anyhow::Error> {
let url = self.build_url(&self.ceramic_client.admin_code_endpoint()).await.unwrap();
let url = self
.build_url(self.ceramic_client.admin_code_endpoint())
.await
.unwrap();
let resp = self.http_client.get(url).send().await?;
let admin_code_resp: api::AdminCodeResponse = resp.json().await?;
let code = &admin_code_resp.code;
Expand All @@ -142,7 +151,7 @@ impl CeramicModelUtil {

/**
* Build a URL
*
*
* @param path The path to build the URL from
* @return The built URL
*/
Expand All @@ -159,4 +168,4 @@ impl CeramicModelUtil {
let full_url = format!("{}{}{}", base, separator, path);
Ok(full_url)
}
}
}
Loading

0 comments on commit 025596d

Please sign in to comment.