Skip to content

Commit

Permalink
test(recovery): add recovery test for nexmark stream (risingwavelabs#…
Browse files Browse the repository at this point in the history
…7623)

Signed-off-by: Runji Wang <wangrunji0408@163.com>
Co-authored-by: Liang <44948473+soundOfDestiny@users.noreply.github.com>
  • Loading branch information
wangrunji0408 and soundOfDestiny authored Mar 10, 2023
1 parent 25499e3 commit 2ae019d
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ script = """
#!/usr/bin/env bash
set -e
cargo check -p risingwave_simulation "$@"
cargo check -p risingwave_simulation --all-targets "$@"
"""

[tasks.sslt]
Expand Down
15 changes: 15 additions & 0 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub struct Cluster {
}

impl Cluster {
/// Start a RisingWave cluster for testing.
///
/// This function should be called exactly once in a test.
pub async fn start(conf: Configuration) -> Result<Self> {
let handle = madsim::runtime::Handle::current();
println!("seed = {}", handle.seed());
Expand Down Expand Up @@ -514,6 +517,7 @@ impl Cluster {
}
}

/// Options for killing nodes.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct KillOpts {
pub kill_rate: f32,
Expand All @@ -522,3 +526,14 @@ pub struct KillOpts {
pub kill_compute: bool,
pub kill_compactor: bool,
}

impl KillOpts {
/// Killing all kind of nodes.
pub const ALL: Self = KillOpts {
kill_rate: 1.0,
kill_meta: true,
kill_frontend: true,
kill_compute: true,
kill_compactor: true,
};
}
1 change: 1 addition & 0 deletions src/tests/simulation/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod dynamic_filter;
mod hello;
mod nexmark_chaos;
mod nexmark_q4;
mod nexmark_recovery;
mod nexmark_source;
mod singleton_migration;
mod streaming_parallelism;
84 changes: 84 additions & 0 deletions src/tests/simulation/tests/it/nexmark_recovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![cfg(madsim)]

use std::time::Duration;

use anyhow::Result;
use madsim::time::{sleep, Instant};
use risingwave_simulation::cluster::{Configuration, KillOpts};
use risingwave_simulation::nexmark::{self, NexmarkCluster, THROUGHPUT};
use risingwave_simulation::utils::AssertResult;

/// Setup a nexmark stream, inject failures, and verify results.
async fn nexmark_recovery_common(create: &str, select: &str, drop: &str) -> Result<()> {
// tracing_subscriber::fmt()
// .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
// .init();

let mut cluster =
NexmarkCluster::new(Configuration::for_scale(), 6, Some(THROUGHPUT * 20), false).await?;

// get the output without failures as the standard result
cluster.run(create).await?;
sleep(Duration::from_secs(30)).await;
let expected = cluster.run(select).await?;
cluster.run(drop).await?;
sleep(Duration::from_secs(5)).await;

cluster.run(create).await?;

// kill nodes and trigger recovery
for _ in 0..5 {
sleep(Duration::from_secs(2)).await;
cluster.kill_node(&KillOpts::ALL).await;
}
// wait enough time to make sure the stream is end
sleep(Duration::from_secs(60)).await;

cluster.run(select).await?.assert_result_eq(&expected);

Ok(())
}

macro_rules! test {
($query:ident) => {
paste::paste! {
#[madsim::test]
async fn [< nexmark_recovery_ $query >]() -> Result<()> {
use risingwave_simulation::nexmark::queries::$query::*;
nexmark_recovery_common(CREATE, SELECT, DROP)
.await
}
}
};
}

// q0, q1, q2: too trivial
test!(q3);
test!(q4);
test!(q5);
// q6: cannot plan
test!(q7);
test!(q8);
test!(q9);
// q10+: duplicated or unsupported

// Self made queries.
test!(q101);
test!(q102);
test!(q103);
test!(q104);
test!(q105);

0 comments on commit 2ae019d

Please sign in to comment.