diff --git a/keramik/src/simulation.md b/keramik/src/simulation.md index ac5f4880..01201517 100644 --- a/keramik/src/simulation.md +++ b/keramik/src/simulation.md @@ -7,8 +7,7 @@ To run a simulation, first define a simulation. Available simulation types are - `ceramic-write-only` - A simulation that only performs updates on two different streams - `ceramic-new-streams` - A simulation that only creates new streams - `ceramic-model-reuse` - A simulation that reuses the same model and queries instances across workers -- `recon-event-key-sync` - A simulation that creates event keys for Recon to sync at a fixed rate (~300/s by default). Designed for a 2 node network but should work on any. -- `recon-event-sync` - A simulation that creates events for Recon to sync at a fixed rate (~300/s by default). Designed for a 2 node network but should work on any. Choosing between recon scenarios depends on what version of rust-ceramic you have. Some versions support keys only, some support key/value (maybe we'll support both someday). If you're not sure, try this one first. +- `recon-event-sync` - A simulation that creates events for Recon to sync at a fixed rate (~300/s by default). Designed for a 2 node network but should work on any. Using one of these scenarios, we can then define the configuration for that scenario: diff --git a/runner/src/scenario/ceramic/mod.rs b/runner/src/scenario/ceramic/mod.rs index 4d0bf811..5133cf31 100644 --- a/runner/src/scenario/ceramic/mod.rs +++ b/runner/src/scenario/ceramic/mod.rs @@ -160,7 +160,7 @@ impl From for CeramicScenarioParameters { number_of_documents: 0, store_mids: false, }, - Scenario::CeramicNewStreamsBenchmark => Self { + Scenario::ReconEventSync | Scenario::CeramicNewStreamsBenchmark => Self { did_type: DidType::UserDidKey, model_reuse: ReuseType::Shared, model_instance_reuse: ReuseType::PerUser, @@ -174,10 +174,7 @@ impl From for CeramicScenarioParameters { number_of_documents: 3, store_mids: false, }, - Scenario::IpfsRpc - | Scenario::ReconEventSync - | Scenario::ReconEventKeySync - | Scenario::CASBenchmark => { + Scenario::IpfsRpc | Scenario::CASBenchmark => { panic!("Not supported for non ceramic scenarios") } Scenario::CeramicAnchoringBenchmark => Self { diff --git a/runner/src/scenario/mod.rs b/runner/src/scenario/mod.rs index 75e3c658..4984b331 100644 --- a/runner/src/scenario/mod.rs +++ b/runner/src/scenario/mod.rs @@ -29,8 +29,3 @@ pub(crate) fn is_goose_lead_user() -> bool { pub(crate) fn is_goose_global_leader(lead_user: bool) -> bool { is_goose_lead_worker() && lead_user } - -/// Reset the lead user flag so another process can act as the lead user in the future -pub(crate) fn reset_goose_lead_user() { - FIRST_USER.store(true, std::sync::atomic::Ordering::SeqCst); -} diff --git a/runner/src/scenario/recon_sync.rs b/runner/src/scenario/recon_sync.rs index 64700c2c..eeebeb7e 100644 --- a/runner/src/scenario/recon_sync.rs +++ b/runner/src/scenario/recon_sync.rs @@ -1,7 +1,6 @@ use crate::scenario::ceramic::model_instance::{loop_until_key_value_set, set_key_to_stream_id}; use crate::scenario::{ get_redis_client, is_goose_global_leader, is_goose_lead_user, is_goose_lead_worker, - reset_goose_lead_user, }; use ceramic_core::{Cid, EventId}; use ceramic_http_client::ceramic_event::{StreamId, StreamIdType}; @@ -66,26 +65,6 @@ pub async fn event_sync_scenario() -> Result { .register_transaction(stop)) } -// accept option as goose manager builds the scenario as well, but doesn't need any peers and won't run it so it will always be Some in execution -pub async fn event_key_sync_scenario() -> Result { - let test_start = init_scenario(false).await?; - - let create_new_event = transaction!(create_new_event).set_name(CREATE_EVENT_TX_NAME); - let reset_single_user = transaction!(reset_first_user) - .set_name("reset_first_user") - .set_on_start(); - - Ok(scenario!("ReconKeySync") - .register_transaction(test_start) - .register_transaction(reset_single_user) - .register_transaction(create_new_event)) -} - -async fn reset_first_user(_user: &mut GooseUser) -> TransactionResult { - reset_goose_lead_user(); - Ok(()) -} - /// One user on one node creates a model. /// One user on each node subscribes to the model via Recon #[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)] @@ -104,6 +83,9 @@ async fn setup( cid: random_cid(), }; set_key_to_stream_id(&mut conn, MODEL_ID_KEY, &model_id).await; + + // TODO: set a real model + model_id } else { loop_until_key_value_set(&mut conn, MODEL_ID_KEY).await diff --git a/runner/src/simulate.rs b/runner/src/simulate.rs index 18e76e44..006ba92e 100644 --- a/runner/src/simulate.rs +++ b/runner/src/simulate.rs @@ -161,11 +161,6 @@ pub enum Scenario { CeramicQuery, /// Nodes subscribe to same model. One node generates new events, recon syncs event keys and data to peers. ReconEventSync, - /// Nodes subscribe to same model. One node generates new events, recon syncs event keys to peers. - /// Which of the Recon scenarios you should choose is dictated by the API of the ceramic-one instance - /// being used. Previously, it only supported keys but newer versions support keys and data. - /// This scenario is for the keys only version and will fail on new verisons. - ReconEventKeySync, // Scenario that creates model instance documents and verifies that they have been anchored at the desired rate. // This is a benchmark scenario for e2e testing, simliar to the recon event sync scenario, // but covering using js-ceramic rather than talking directly to the ipfs API. @@ -188,7 +183,6 @@ impl Scenario { Scenario::CeramicQuery => "ceramic_query", Scenario::CeramicModelReuse => "ceramic_model_reuse", Scenario::ReconEventSync => "recon_event_sync", - Scenario::ReconEventKeySync => "recon_event_key_sync", Scenario::CeramicAnchoringBenchmark => "ceramic_anchoring_benchmark", Scenario::CASBenchmark => "cas_benchmark", } @@ -196,9 +190,7 @@ impl Scenario { fn target_addr(&self, peer: &Peer) -> Result { match self { - Self::IpfsRpc | Self::ReconEventSync | Self::ReconEventKeySync => { - Ok(peer.ipfs_rpc_addr().to_owned()) - } + Self::IpfsRpc | Self::ReconEventSync => Ok(peer.ipfs_rpc_addr().to_owned()), Self::CeramicSimple | Self::CeramicWriteOnly | Self::CeramicNewStreams @@ -446,7 +438,6 @@ impl ScenarioState { } Scenario::CeramicQuery => ceramic::query::scenario(self.scenario.into()).await?, Scenario::ReconEventSync => recon_sync::event_sync_scenario().await?, - Scenario::ReconEventKeySync => recon_sync::event_key_sync_scenario().await?, Scenario::CASBenchmark => ceramic::anchor::cas_benchmark().await?, }; self.collect_before_metrics().await?; @@ -519,7 +510,7 @@ impl ScenarioState { // we collect things in the scenario and use redis to decide success/failure of the test Ok(()) } - Scenario::ReconEventSync | Scenario::ReconEventKeySync => { + Scenario::ReconEventSync => { let peers = self .get_peers_counter_metric(EVENT_SYNC_METRIC_NAME, IPFS_SERVICE_METRICS_PORT) .await?; @@ -534,9 +525,11 @@ impl ScenarioState { /// For now, most scenarios are successful if they complete without error and only EventIdSync has a criteria. /// Not a result to ensure we always proceed with cleanup, even if we fail to validate the scenario. /// Should return the Minimum RPS of all peers as the f64 + /// Sometimes goose metrics fail to be collected, so we can try to validate the scenario without them in some cases. pub async fn validate_scenario_success( &self, metrics: &GooseMetrics, + backup_runtime: std::time::Duration, ) -> (CommandResult, Option) { if !self.manager { return (CommandResult::Success, None); @@ -591,7 +584,7 @@ impl ScenarioState { (CommandResult::Failure(anyhow!(errors.join("\n"))), min) } } - Scenario::ReconEventSync | Scenario::ReconEventKeySync => { + Scenario::ReconEventSync => { // It'd be easy to make work for other scenarios if they defined a rate and metric. However, the scenario we're // interested in is asymmetrical in what the workers do, and we're trying to look at what happens to other nodes, // which is not how most scenarios work. It also uses the IPFS metrics endpoint. We could parameterize or use a @@ -599,19 +592,27 @@ impl ScenarioState { // entirely different. Anyway, to avoid generalizing the exception we keep it simple. let req_name = recon_sync::CREATE_EVENT_REQ_NAME; - let metric = match metrics + match metrics .requests .get(req_name) .ok_or_else(|| anyhow!("failed to find goose metrics for request {}", req_name)) .map_err(CommandResult::Failure) { - Ok(v) => v, - Err(e) => return (e, None), + Ok(v) => { + info!("Create events success count: {}", v.success_count); + } + Err(e) => { + warn!( + error=?e, "Failed to get create events success count from goose metrics" + ); + } }; - self.validate_recon_scenario_success_int( - metrics.duration as u64, - metric.success_count as u64, + self.validate_recon_scenario_success( + metrics + .duration + .try_into() + .unwrap_or(backup_runtime.as_secs()), ) .await } @@ -754,10 +755,9 @@ impl ScenarioState { /// Removed from `validate_scenario_success` to make testing easier as constructing the GooseMetrics appropriately is difficult /// Should return the Minimum RPS of all peers as the f64 - async fn validate_recon_scenario_success_int( + async fn validate_recon_scenario_success( &self, run_time_seconds: u64, - request_cnt: u64, ) -> (CommandResult, Option) { if !self.manager { return (CommandResult::Success, None); @@ -772,7 +772,7 @@ impl ScenarioState { | Scenario::CeramicAnchoringBenchmark | Scenario::CASBenchmark | Scenario::CeramicNewStreamsBenchmark => (CommandResult::Success, None), - Scenario::ReconEventSync | Scenario::ReconEventKeySync => { + Scenario::ReconEventSync => { let default_rate = 300; let metric_name = EVENT_SYNC_METRIC_NAME; @@ -787,7 +787,6 @@ impl ScenarioState { // There is no `f64::try_from::` but if these values don't fit, we have bigger problems let threshold = self.target_request_rate.unwrap_or(default_rate) as f64; - let create_rps = request_cnt as f64 / run_time_seconds as f64; let before_metrics = match self .before_metrics @@ -814,7 +813,7 @@ impl ScenarioState { Err(e) => return (CommandResult::Failure(e), None), }; - let mut errors = peer_metrics.iter().flat_map(|p| { + let errors = peer_metrics.iter().flat_map(|p| { let rps = p.rps(); if rps < threshold { warn!(current_req_cnt=%p.count, run_time_seconds=?p.runtime, %threshold, %rps, "rps less than threshold"); @@ -837,26 +836,19 @@ impl ScenarioState { Some(PeerRps::new(peer_metrics)) }; - if create_rps < threshold { - warn!( - ?create_rps, - ?threshold, - "create rps less than threshold on writer node" - ); - errors.push(format!( - "Create event RPS less than threshold on writer node: {} < {}", - create_rps, threshold - )); - } if errors.is_empty() { info!( - ?create_rps, + ?peer_rps, ?threshold, "SUCCESS! All peers met the threshold" ); (CommandResult::Success, peer_rps) } else { - warn!(?errors, "FAILURE! Not all peers met the threshold"); + warn!( + ?errors, + ?peer_rps, + "FAILURE! Not all peers met the threshold" + ); (CommandResult::Failure(anyhow!(errors.join("\n"))), peer_rps) } } @@ -931,6 +923,7 @@ pub async fn simulate(opts: Opts) -> Result { let scenario = state.build_goose_scenario().await?; let config: GooseConfiguration = state.goose_config()?; + let start = std::time::Instant::now(); let goose_metrics = match GooseAttack::initialize_with_config(config)? .register_scenario(scenario) .execute() @@ -942,8 +935,11 @@ pub async fn simulate(opts: Opts) -> Result { return Err(e.into()); } }; + let elapsed = start.elapsed(); - let (success, peer_rps) = state.validate_scenario_success(&goose_metrics).await; + let (success, peer_rps) = state + .validate_scenario_success(&goose_metrics, elapsed) + .await; metrics.record(goose_metrics, peer_rps); Ok(success) @@ -1324,7 +1320,6 @@ mod test { async fn run_event_id_sync_test( manager: bool, run_time: u64, - request_cnt: u64, target_request_rate: Option, metric_start_value: u64, metric_end_value: u64, @@ -1338,10 +1333,7 @@ mod test { .unwrap(); state.collect_before_metrics().await.unwrap(); - state - .validate_recon_scenario_success_int(run_time, run_time * request_cnt) - .await - .0 + state.validate_recon_scenario_success(run_time).await.0 } #[test(tokio::test)] @@ -1355,7 +1347,6 @@ mod test { match run_event_id_sync_test( manager, run_time, - request_cnt, target_rps, metric_start_value, metric_end_value, @@ -1378,7 +1369,6 @@ mod test { match run_event_id_sync_test( manager, run_time, - request_cnt, target_rps, metric_start_value, metric_end_value, @@ -1401,7 +1391,6 @@ mod test { match run_event_id_sync_test( manager, run_time, - request_cnt, target_rps, metric_start_value, metric_end_value, @@ -1426,7 +1415,6 @@ mod test { match run_event_id_sync_test( manager, run_time, - request_cnt, target_rps, metric_start_value, metric_end_value, @@ -1451,7 +1439,6 @@ mod test { match run_event_id_sync_test( manager, run_time, - request_cnt, target_rps, metric_start_value, metric_end_value, @@ -1474,7 +1461,6 @@ mod test { match run_event_id_sync_test( manager, run_time, - request_cnt, target_rps, metric_start_value, metric_end_value,