Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Less flaky results from scenario validation #166

Merged
merged 3 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions keramik/src/simulation.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ To run a simulation, first define a simulation. Available simulation types are
- `ceramic-write-only` - A simulation that only performs updates on two different streams
- `ceramic-new-streams` - A simulation that only creates new streams
- `ceramic-model-reuse` - A simulation that reuses the same model and queries instances across workers
- `recon-event-key-sync` - A simulation that creates event keys for Recon to sync at a fixed rate (~300/s by default). Designed for a 2 node network but should work on any.
- `recon-event-sync` - A simulation that creates events for Recon to sync at a fixed rate (~300/s by default). Designed for a 2 node network but should work on any. Choosing between recon scenarios depends on what version of rust-ceramic you have. Some versions support keys only, some support key/value (maybe we'll support both someday). If you're not sure, try this one first.
- `recon-event-sync` - A simulation that creates events for Recon to sync at a fixed rate (~300/s by default). Designed for a 2 node network but should work on any.

Using one of these scenarios, we can then define the configuration for that scenario:

Expand Down
7 changes: 2 additions & 5 deletions runner/src/scenario/ceramic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl From<Scenario> for CeramicScenarioParameters {
number_of_documents: 0,
store_mids: false,
},
Scenario::CeramicNewStreamsBenchmark => Self {
Scenario::ReconEventSync | Scenario::CeramicNewStreamsBenchmark => Self {
did_type: DidType::UserDidKey,
model_reuse: ReuseType::Shared,
model_instance_reuse: ReuseType::PerUser,
Expand All @@ -174,10 +174,7 @@ impl From<Scenario> for CeramicScenarioParameters {
number_of_documents: 3,
store_mids: false,
},
Scenario::IpfsRpc
| Scenario::ReconEventSync
| Scenario::ReconEventKeySync
| Scenario::CASBenchmark => {
Scenario::IpfsRpc | Scenario::CASBenchmark => {
panic!("Not supported for non ceramic scenarios")
}
Scenario::CeramicAnchoringBenchmark => Self {
Expand Down
5 changes: 0 additions & 5 deletions runner/src/scenario/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,3 @@ pub(crate) fn is_goose_lead_user() -> bool {
pub(crate) fn is_goose_global_leader(lead_user: bool) -> bool {
is_goose_lead_worker() && lead_user
}

/// Reset the lead user flag so another process can act as the lead user in the future
pub(crate) fn reset_goose_lead_user() {
FIRST_USER.store(true, std::sync::atomic::Ordering::SeqCst);
}
24 changes: 3 additions & 21 deletions runner/src/scenario/recon_sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::scenario::ceramic::model_instance::{loop_until_key_value_set, set_key_to_stream_id};
use crate::scenario::{
get_redis_client, is_goose_global_leader, is_goose_lead_user, is_goose_lead_worker,
reset_goose_lead_user,
};
use ceramic_core::{Cid, EventId};
use ceramic_http_client::ceramic_event::{StreamId, StreamIdType};
Expand Down Expand Up @@ -66,26 +65,6 @@ pub async fn event_sync_scenario() -> Result<Scenario, GooseError> {
.register_transaction(stop))
}

// accept option as goose manager builds the scenario as well, but doesn't need any peers and won't run it so it will always be Some in execution
pub async fn event_key_sync_scenario() -> Result<Scenario, GooseError> {
let test_start = init_scenario(false).await?;

let create_new_event = transaction!(create_new_event).set_name(CREATE_EVENT_TX_NAME);
let reset_single_user = transaction!(reset_first_user)
.set_name("reset_first_user")
.set_on_start();

Ok(scenario!("ReconKeySync")
.register_transaction(test_start)
.register_transaction(reset_single_user)
.register_transaction(create_new_event))
}

async fn reset_first_user(_user: &mut GooseUser) -> TransactionResult {
reset_goose_lead_user();
Ok(())
}

/// One user on one node creates a model.
/// One user on each node subscribes to the model via Recon
#[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)]
Expand All @@ -104,6 +83,9 @@ async fn setup(
cid: random_cid(),
};
set_key_to_stream_id(&mut conn, MODEL_ID_KEY, &model_id).await;

// TODO: set a real model

model_id
} else {
loop_until_key_value_set(&mut conn, MODEL_ID_KEY).await
Expand Down
82 changes: 34 additions & 48 deletions runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ pub enum Scenario {
CeramicQuery,
/// Nodes subscribe to same model. One node generates new events, recon syncs event keys and data to peers.
ReconEventSync,
/// Nodes subscribe to same model. One node generates new events, recon syncs event keys to peers.
/// Which of the Recon scenarios you should choose is dictated by the API of the ceramic-one instance
/// being used. Previously, it only supported keys but newer versions support keys and data.
/// This scenario is for the keys only version and will fail on new verisons.
ReconEventKeySync,
// Scenario that creates model instance documents and verifies that they have been anchored at the desired rate.
// This is a benchmark scenario for e2e testing, simliar to the recon event sync scenario,
// but covering using js-ceramic rather than talking directly to the ipfs API.
Expand All @@ -188,17 +183,14 @@ impl Scenario {
Scenario::CeramicQuery => "ceramic_query",
Scenario::CeramicModelReuse => "ceramic_model_reuse",
Scenario::ReconEventSync => "recon_event_sync",
Scenario::ReconEventKeySync => "recon_event_key_sync",
Scenario::CeramicAnchoringBenchmark => "ceramic_anchoring_benchmark",
Scenario::CASBenchmark => "cas_benchmark",
}
}

fn target_addr(&self, peer: &Peer) -> Result<String> {
match self {
Self::IpfsRpc | Self::ReconEventSync | Self::ReconEventKeySync => {
Ok(peer.ipfs_rpc_addr().to_owned())
}
Self::IpfsRpc | Self::ReconEventSync => Ok(peer.ipfs_rpc_addr().to_owned()),
Self::CeramicSimple
| Self::CeramicWriteOnly
| Self::CeramicNewStreams
Expand Down Expand Up @@ -446,7 +438,6 @@ impl ScenarioState {
}
Scenario::CeramicQuery => ceramic::query::scenario(self.scenario.into()).await?,
Scenario::ReconEventSync => recon_sync::event_sync_scenario().await?,
Scenario::ReconEventKeySync => recon_sync::event_key_sync_scenario().await?,
Scenario::CASBenchmark => ceramic::anchor::cas_benchmark().await?,
};
self.collect_before_metrics().await?;
Expand Down Expand Up @@ -519,7 +510,7 @@ impl ScenarioState {
// we collect things in the scenario and use redis to decide success/failure of the test
Ok(())
}
Scenario::ReconEventSync | Scenario::ReconEventKeySync => {
Scenario::ReconEventSync => {
let peers = self
.get_peers_counter_metric(EVENT_SYNC_METRIC_NAME, IPFS_SERVICE_METRICS_PORT)
.await?;
Expand All @@ -534,9 +525,11 @@ impl ScenarioState {
/// For now, most scenarios are successful if they complete without error and only EventIdSync has a criteria.
/// Not a result to ensure we always proceed with cleanup, even if we fail to validate the scenario.
/// Should return the Minimum RPS of all peers as the f64
/// Sometimes goose metrics fail to be collected, so we can try to validate the scenario without them in some cases.
pub async fn validate_scenario_success(
&self,
metrics: &GooseMetrics,
backup_runtime: std::time::Duration,
) -> (CommandResult, Option<PeerRps>) {
if !self.manager {
return (CommandResult::Success, None);
Expand Down Expand Up @@ -591,27 +584,35 @@ impl ScenarioState {
(CommandResult::Failure(anyhow!(errors.join("\n"))), min)
}
}
Scenario::ReconEventSync | Scenario::ReconEventKeySync => {
Scenario::ReconEventSync => {
// It'd be easy to make work for other scenarios if they defined a rate and metric. However, the scenario we're
// interested in is asymmetrical in what the workers do, and we're trying to look at what happens to other nodes,
// which is not how most scenarios work. It also uses the IPFS metrics endpoint. We could parameterize or use a
// trait, but we don't yet have a use case, and might need to use transactions, or multiple requests, or something
// entirely different. Anyway, to avoid generalizing the exception we keep it simple.
let req_name = recon_sync::CREATE_EVENT_REQ_NAME;

let metric = match metrics
match metrics
.requests
.get(req_name)
.ok_or_else(|| anyhow!("failed to find goose metrics for request {}", req_name))
.map_err(CommandResult::Failure)
{
Ok(v) => v,
Err(e) => return (e, None),
Ok(v) => {
info!("Create events success count: {}", v.success_count);
}
Err(e) => {
warn!(
error=?e, "Failed to get create events success count from goose metrics"
);
}
};

self.validate_recon_scenario_success_int(
metrics.duration as u64,
metric.success_count as u64,
self.validate_recon_scenario_success(
metrics
.duration
.try_into()
.unwrap_or(backup_runtime.as_secs()),
)
.await
}
Expand Down Expand Up @@ -754,10 +755,9 @@ impl ScenarioState {

/// Removed from `validate_scenario_success` to make testing easier as constructing the GooseMetrics appropriately is difficult
/// Should return the Minimum RPS of all peers as the f64
async fn validate_recon_scenario_success_int(
async fn validate_recon_scenario_success(
&self,
run_time_seconds: u64,
request_cnt: u64,
) -> (CommandResult, Option<PeerRps>) {
if !self.manager {
return (CommandResult::Success, None);
Expand All @@ -772,7 +772,7 @@ impl ScenarioState {
| Scenario::CeramicAnchoringBenchmark
| Scenario::CASBenchmark
| Scenario::CeramicNewStreamsBenchmark => (CommandResult::Success, None),
Scenario::ReconEventSync | Scenario::ReconEventKeySync => {
Scenario::ReconEventSync => {
let default_rate = 300;
let metric_name = EVENT_SYNC_METRIC_NAME;

Expand All @@ -787,7 +787,6 @@ impl ScenarioState {

// There is no `f64::try_from::<u64 or usize>` but if these values don't fit, we have bigger problems
let threshold = self.target_request_rate.unwrap_or(default_rate) as f64;
let create_rps = request_cnt as f64 / run_time_seconds as f64;

let before_metrics = match self
.before_metrics
Expand All @@ -814,7 +813,7 @@ impl ScenarioState {
Err(e) => return (CommandResult::Failure(e), None),
};

let mut errors = peer_metrics.iter().flat_map(|p| {
let errors = peer_metrics.iter().flat_map(|p| {
let rps = p.rps();
if rps < threshold {
warn!(current_req_cnt=%p.count, run_time_seconds=?p.runtime, %threshold, %rps, "rps less than threshold");
Expand All @@ -837,26 +836,19 @@ impl ScenarioState {
Some(PeerRps::new(peer_metrics))
};

if create_rps < threshold {
warn!(
?create_rps,
?threshold,
"create rps less than threshold on writer node"
);
errors.push(format!(
"Create event RPS less than threshold on writer node: {} < {}",
create_rps, threshold
));
}
if errors.is_empty() {
info!(
?create_rps,
?peer_rps,
?threshold,
"SUCCESS! All peers met the threshold"
);
(CommandResult::Success, peer_rps)
} else {
warn!(?errors, "FAILURE! Not all peers met the threshold");
warn!(
?errors,
?peer_rps,
"FAILURE! Not all peers met the threshold"
);
(CommandResult::Failure(anyhow!(errors.join("\n"))), peer_rps)
}
}
Expand Down Expand Up @@ -931,6 +923,7 @@ pub async fn simulate(opts: Opts) -> Result<CommandResult> {
let scenario = state.build_goose_scenario().await?;
let config: GooseConfiguration = state.goose_config()?;

let start = std::time::Instant::now();
let goose_metrics = match GooseAttack::initialize_with_config(config)?
.register_scenario(scenario)
.execute()
Expand All @@ -942,8 +935,11 @@ pub async fn simulate(opts: Opts) -> Result<CommandResult> {
return Err(e.into());
}
};
let elapsed = start.elapsed();

let (success, peer_rps) = state.validate_scenario_success(&goose_metrics).await;
let (success, peer_rps) = state
.validate_scenario_success(&goose_metrics, elapsed)
.await;
metrics.record(goose_metrics, peer_rps);

Ok(success)
Expand Down Expand Up @@ -1324,7 +1320,6 @@ mod test {
async fn run_event_id_sync_test(
manager: bool,
run_time: u64,
request_cnt: u64,
target_request_rate: Option<usize>,
metric_start_value: u64,
metric_end_value: u64,
Expand All @@ -1338,10 +1333,7 @@ mod test {
.unwrap();

state.collect_before_metrics().await.unwrap();
state
.validate_recon_scenario_success_int(run_time, run_time * request_cnt)
.await
.0
state.validate_recon_scenario_success(run_time).await.0
}

#[test(tokio::test)]
Expand All @@ -1355,7 +1347,6 @@ mod test {
match run_event_id_sync_test(
manager,
run_time,
request_cnt,
target_rps,
metric_start_value,
metric_end_value,
Expand All @@ -1378,7 +1369,6 @@ mod test {
match run_event_id_sync_test(
manager,
run_time,
request_cnt,
target_rps,
metric_start_value,
metric_end_value,
Expand All @@ -1401,7 +1391,6 @@ mod test {
match run_event_id_sync_test(
manager,
run_time,
request_cnt,
target_rps,
metric_start_value,
metric_end_value,
Expand All @@ -1426,7 +1415,6 @@ mod test {
match run_event_id_sync_test(
manager,
run_time,
request_cnt,
target_rps,
metric_start_value,
metric_end_value,
Expand All @@ -1451,7 +1439,6 @@ mod test {
match run_event_id_sync_test(
manager,
run_time,
request_cnt,
target_rps,
metric_start_value,
metric_end_value,
Expand All @@ -1474,7 +1461,6 @@ mod test {
match run_event_id_sync_test(
manager,
run_time,
request_cnt,
target_rps,
metric_start_value,
metric_end_value,
Expand Down
Loading