Skip to content

Commit

Permalink
feature: add generator code
Browse files Browse the repository at this point in the history
  • Loading branch information
Samika Kashyap authored and Samika Kashyap committed Jul 12, 2024
1 parent 54d1633 commit bfdd077
Show file tree
Hide file tree
Showing 6 changed files with 533 additions and 12 deletions.
223 changes: 223 additions & 0 deletions runner/src/load_generator/gen.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use std::collections::HashMap;
use std::path::PathBuf;

use anyhow::Result;
use ceramic_core::StreamId;
use clap::Args;
use keramik_common::peer_info::Peer;
use tokio::time::{Duration, Instant};use crate::utils::parse_peers_info;
use crate::CommandResult;
use crate::load_generator::utils::generator_utils::StableLoadUser;
use crate::load_generator::utils::generator_utils::CeramicConfig;
use crate::load_generator::utils::generator_utils::CeramicDidType;
use crate::load_generator::utils::generator_utils::CeramicScenarioParameters;

// TODO : Use this to envoke a particular scenario, currently we only have one
// so this is unused
pub enum WeekLongSimulationScenarios {
CreateModelInstancesSynced,
}

/// Options to Simulate command
#[derive(Args, Debug)]
pub struct WeekLongSimulationOpts {

/// Simulation scenario to run.
#[arg(long, env = "GENERATOR_SCENARIO")]
scenario: String,


/// Path to file containing the list of peers.
/// File should contian JSON encoding of Vec<Peer>.
#[arg(long, env = "GENERATOR_PEERS_PATH")]
peers: PathBuf,

/// Implmentation details: A task corresponds to a tokio task responsible
/// for making requests. They should have low memory overhead, so you can
/// create many tasks and then use `throttle_requests_rate` to constrain the overall
/// throughput on the node (specifically the HTTP requests made).
#[arg(long, default_value_t = 4, env = "GENERATOR_TASKS")]
tasks: usize,

/// Duration of the simulation in hours
#[arg(long, env = "GENERATOR_RUN_TIME", default_value = "5h")]
run_time: String,


/// Unique value per test run to ensure uniqueness across different generator runs
#[arg(long, env = "GENERATOR_NONCE")]
nonce: u64,

/// Option to throttle requests (per second) for load control
#[arg(long, env = "GENERATOR_THROTTLE_REQUESTS_RATE")]
throttle_requests_rate: Option<usize>,

}

//TODO : Use week long simulation scenario and separate out the logic which is ties to a particular scenario
// TODO : This specific behavior is for createModelInstancesSynced scenario
pub async fn simulate_load(opts: WeekLongSimulationOpts) -> Result<CommandResult> {
let state = WeekLongSimulationState::try_from_opts(opts).await?;

// Create two configs to simulate two independent nodes, each having it's own ceramic client
let config_1 = state.initialize_config().await?;
let config_2 = state.initialize_config().await?;

let peer_addr_1 = state.peers[0].ceramic_addr().expect("Peer does not have a ceramic address");
let peer_addr_2 = state.peers[1].ceramic_addr().expect("Peer does not have a ceramic address");

// Create two users to simulate two independent nodes
let stable_load_user_1 = StableLoadUser::setup_stability_test(config_1.admin_cli, Some(peer_addr_1.to_string())).await;
let stable_load_user_2 = StableLoadUser::setup_stability_test(config_2.admin_cli, Some(peer_addr_2.to_string())).await;

// Generate a model for the users to create
let model = stable_load_user_1.ceramic_utils.generate_random_model().await?;

// Index the model on the second node
stable_load_user_2.ceramic_utils.index_model(&model).await?;

let run_time: u64 = state.run_time.parse().expect("Failed to parse run_time as u64");

println!("Model: {:?}", model);
let model_instance_creation_result = create_model_instances_continuously(stable_load_user_1, model, run_time).await;
println!("Model instance creation result: {:?}", model_instance_creation_result);

Ok(CommandResult::Success)
}

/**
* Create model instances continuously
*
* @param stable_load_user The user to create the model instances
* @param model The model schema to create model instances from
* @param duration_in_hours The duration to run the simulation in hours
* @return The result of the simulation
*/
pub async fn create_model_instances_continuously(
stable_load_user: StableLoadUser,
model: StreamId,
duration_in_hours: u64,
) -> Result<()> {
let start_time = Instant::now();

let duration = Duration::from_secs(duration_in_hours * 60 * 60);
let mut count = 0;
let mut error_map: HashMap<String, u64> = HashMap::new();
// TODO : Make the rps configurable
// TODO : Make the channel size configurable
// TODO : Make the number of tasks configurable : tasks are currently 100 -
// increasing tasks can help increase throughput
let (tx, mut rx) = tokio::sync::mpsc::channel(10000);
let mut tasks = tokio::task::JoinSet::new();
for i in 0..100 {
let user_clone = stable_load_user.clone();
let model = model.clone();
let tx = tx.clone();
tasks.spawn(async move {
loop {
if start_time.elapsed() > duration {
println!("loop {i} Duration expired");
break;
}
match tokio::time::timeout(
Duration::from_secs(5),
user_clone.ceramic_utils.create_random_mid(&model),
)
.await
{
Ok(Ok(mid)) => {
match tx.send(Ok(mid.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send MID: {}", e);
}
}
}
Ok(Err(e)) => {
match tx.send(Err(e.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send error: {}", e);
}
}
}
Err(e) => {
match tx.send(Err(e.to_string())).await {
Ok(_) => {}
Err(e) => {
println!("Failed to send error: {}", e);
}
}
}
}
}
});
}
drop(tx);
loop {
let mut mid_vec: Vec<Result<String, String>> = Vec::new();
if rx.recv_many(&mut mid_vec, 10).await > 0 {
for mid in mid_vec {
match mid {
Ok(_) => {
count += 1;
}
Err(err) => {
*error_map.entry(err).or_insert(0) += 1;
}
}
}
}
if start_time.elapsed() > duration {
tasks.abort_all();
break;
}
}
// After the loop, print the error map
// TODO : Add observability to this, report these errors/counts
println!("Error counts:");
for (error, count) in &error_map {
println!("Error: {}, Count: {}", error, count);
}
println!("Created {} MIDs in {} hours", count, duration_in_hours);
println!(
"Failed to create {} MIDs in {} hours",
error_map.values().sum::<u64>(), duration_in_hours
);
Ok(())
}

struct WeekLongSimulationState {
pub peers: Vec<Peer>,
pub run_time: String,
}


impl WeekLongSimulationState {
/**
* Try to create a new instance of the WeekLongSimulationState from the given options
*
* @param opts The options to use
* @return The created instance
*/
async fn try_from_opts(opts: WeekLongSimulationOpts) -> Result<Self> {
Ok(Self {
peers: parse_peers_info(opts.peers.clone()).await?,
run_time: opts.run_time,
})
}

/**
* Initialize the configuration for the WeekLongSimulationState
*
* @return The created configuration
*/
async fn initialize_config(&self) -> Result<CeramicConfig> {
// Create a CeramicScenarioParameters instance with default values
let params = CeramicScenarioParameters {
did_type: CeramicDidType::EnvInjected,
};

CeramicConfig::initialize_config(params).await
}
}
3 changes: 3 additions & 0 deletions runner/src/load_generator/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

pub mod gen;
pub mod utils;
162 changes: 162 additions & 0 deletions runner/src/load_generator/utils/ceramic_models_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use anyhow::Result;
use ceramic_http_client::{
api::{self},
ceramic_event::StreamId,
ModelAccountRelation, ModelDefinition,
};
use reqwest::Client;
use crate::scenario::ceramic::models::{RandomModelInstance, SmallModel};
use crate::scenario::ceramic::CeramicClient;

#[derive(Clone, Debug)]
pub struct CeramicModelUtil {
/**
* The ceramic client
*/
pub ceramic_client: CeramicClient,
/**
* The http client
*/
pub http_client: Client,
/**
* The base URL
*/
pub base_url: Option<String>,
}


impl CeramicModelUtil {
/**
* Index a model
*
* @param model_id The model to index
*/
pub async fn index_model(&self, model_id: &StreamId) -> Result<()> {
let admin_code = self.get_admin_code().await?;
println!("Admin code: {:?}", admin_code);
let url = self.build_url(&self.ceramic_client.index_endpoint()).await.unwrap();
let req = self
.ceramic_client
.create_index_model_request(model_id, &admin_code)
.unwrap();
let resp = self.http_client.post(url).json(&req).send().await?;
if resp.status().is_success() {
Ok(())
} else {
Err(anyhow::anyhow!("Failed to index model"))
}
}

/**
* Generate a random model
*
* @return The stream id of the created model
*/
pub async fn generate_random_model(&self) -> Result<StreamId, anyhow::Error> {
let small_model =
ModelDefinition::new::<SmallModel>("load_test_small_model", ModelAccountRelation::List)
.unwrap();
self.setup_model(small_model).await
}

/**
* Setup a model
*
* @param model The model to setup
* @return The stream id of the created model
*/
async fn setup_model(&self, model: ModelDefinition) -> Result<StreamId, anyhow::Error> {
let url = self
.build_url(&self.ceramic_client.streams_endpoint())
.await
.unwrap();
let req = self.ceramic_client.create_model_request(&model).await.unwrap();
let req = self.http_client.post(url).json(&req);
let resp: reqwest::Response = req.send().await?;
if resp.status() == reqwest::StatusCode::OK {
let streams_response: api::StreamsResponse = resp.json().await?;
Ok(streams_response.stream_id)
} else {
Err(anyhow::anyhow!(
"Failed to setup model: status {:?} , resp_text {:?}",
resp.status(),
resp.text().await
))
}
}

/**
* Create a random model instance
*
* @param model The model which defines the schema of the model instance
* @return The stream id of the created model instance
*/
pub async fn create_random_mid(&self, model: &StreamId) -> Result<StreamId> {
let data = SmallModel::random();
return self.create_mid(model, &data).await;
}

/**
* Create a model instance
*
* @param model The model which defines the schema of the model instance
* @param data The data to create
* @return The stream id of the created model instance
*/
async fn create_mid(&self, model: &StreamId, data: &SmallModel) -> Result<StreamId> {
let url = self
.build_url(&self.ceramic_client.streams_endpoint())
.await
.unwrap();
let req = self
.ceramic_client
.create_list_instance_request(model, data)
.await
.unwrap();
let req = self.http_client.post(url).json(&req);
let resp: reqwest::Response = req.send().await?;
if resp.status() == reqwest::StatusCode::OK {
let parsed_resp: api::StreamsResponse = resp.json().await?;
Ok(parsed_resp.stream_id)
} else {
Err(anyhow::anyhow!(
"Failed to create model: status {:?} , resp_text {:?}",
resp.status(),
resp.text().await
))
}
}

/**
* Get the admin code
*
* @return The admin code
*/
async fn get_admin_code(&self) -> Result<String, anyhow::Error> {
let url = self.build_url(&self.ceramic_client.admin_code_endpoint()).await.unwrap();
let resp = self.http_client.get(url).send().await?;
let admin_code_resp: api::AdminCodeResponse = resp.json().await?;
let code = &admin_code_resp.code;
Ok(code.to_string())
}

/**
* Build a URL
*
* @param path The path to build the URL from
* @return The built URL
*/
async fn build_url(&self, path: &str) -> Result<String, anyhow::Error> {
let base = self
.base_url
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Base URL is not set"))?;
let separator = if path.starts_with('/') || base.ends_with('/') {
""
} else {
"/"
};
let full_url = format!("{}{}{}", base, separator, path);
Ok(full_url)
}
}
Loading

0 comments on commit bfdd077

Please sign in to comment.