Skip to content

Commit

Permalink
test: add e2e test for region failover
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 25, 2024
1 parent 8cbe716 commit 71e7dd2
Show file tree
Hide file tree
Showing 14 changed files with 666 additions and 14 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ FUZZ_TARGET ?= fuzz_alter_table
fuzz:
cargo fuzz run ${FUZZ_TARGET} --fuzz-dir tests-fuzz -D -s none -- -runs=${RUNS}

.PHONY: fuzz-ls
fuzz-ls:
cargo fuzz list --fuzz-dir tests-fuzz

.PHONY: check
check: ## Cargo check all the targets.
cargo check --workspace --all-targets --all-features
Expand Down
10 changes: 10 additions & 0 deletions tests-fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ common-time = { workspace = true }
datatypes = { workspace = true }
derive_builder = { workspace = true }
dotenv = "0.15"
futures = { workspace = true }
humantime = { workspace = true }
k8s-openapi = { version = "0.22", features = ["v1_30"] }
kube = { version = "0.92", features = [
"runtime",
Expand Down Expand Up @@ -54,6 +56,7 @@ sqlx = { version = "0.6", features = [
"postgres",
"chrono",
] }
store-api = { workspace = true }
strum.workspace = true
tinytemplate = "1.2"
tokio = { workspace = true }
Expand Down Expand Up @@ -117,3 +120,10 @@ test = false
bench = false
doc = false
required-features = ["unstable"]

[[bin]]
name = "fuzz_failover_mito_regions"
path = "targets/failover/fuzz_failover_mito_regions.rs"
test = false
bench = false
doc = false
6 changes: 3 additions & 3 deletions tests-fuzz/src/generator/create_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct CreateTableExprGenerator<R: Rng + 'static> {
partition: usize,
if_not_exists: bool,
#[builder(setter(into))]
name: String,
name: Ident,
#[builder(setter(into))]
with_clause: HashMap<String, String>,
name_generator: Box<dyn Random<Ident, R>>,
Expand All @@ -65,7 +65,7 @@ impl<R: Rng + 'static> Default for CreateTableExprGenerator<R> {
engine: DEFAULT_ENGINE.to_string(),
if_not_exists: false,
partition: 0,
name: String::new(),
name: Ident::new(""),
with_clause: HashMap::default(),
name_generator: Box::new(MappedGenerator::new(WordGenerator, random_capitalize_map)),
ts_column_type_generator: Box::new(TsColumnTypeGenerator),
Expand Down Expand Up @@ -190,7 +190,7 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreateTableExprGenerato
if self.name.is_empty() {
builder.table_name(self.name_generator.gen(rng));
} else {
builder.table_name(self.name.to_string());
builder.table_name(self.name.clone());
}
if !self.with_clause.is_empty() {
let mut options = HashMap::new();
Expand Down
2 changes: 1 addition & 1 deletion tests-fuzz/src/generator/insert_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<R: Rng + 'static> Generator<InsertIntoExpr, R> for InsertExprGenerator<R> {
}

Ok(InsertIntoExpr {
table_name: self.table_ctx.name.to_string(),
table_name: self.table_ctx.name.clone(),
omit_column_list: self.omit_column_list,
columns: values_columns,
values_list,
Expand Down
14 changes: 8 additions & 6 deletions tests-fuzz/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use serde::{Deserialize, Serialize};

use self::insert_expr::{RowValue, RowValues};
use crate::context::TableContextRef;
use crate::generator::Random;
use crate::generator::{Random, TsValueGenerator};
use crate::impl_random;
use crate::ir::create_expr::ColumnOption;

Expand Down Expand Up @@ -127,12 +127,10 @@ pub fn generate_random_value<R: Rng>(
}

/// Generate monotonically increasing timestamps for MySQL.
pub fn generate_unique_timestamp_for_mysql<R: Rng>(
base: i64,
) -> impl Fn(&mut R, TimestampType) -> Value {
pub fn generate_unique_timestamp_for_mysql<R: Rng>(base: i64) -> TsValueGenerator<R> {
let base = Arc::new(AtomicI64::new(base));

move |_rng, ts_type| -> Value {
Box::new(move |_rng, ts_type| -> Value {
let value = base.fetch_add(1, Ordering::Relaxed);
let v = match ts_type {
TimestampType::Second(_) => Timestamp::new_second(1 + value),
Expand All @@ -141,7 +139,7 @@ pub fn generate_unique_timestamp_for_mysql<R: Rng>(
TimestampType::Nanosecond(_) => Timestamp::new_nanosecond(1_000_000_000 + value),
};
Value::from(v)
}
})
}

/// Generate random timestamps.
Expand Down Expand Up @@ -253,6 +251,10 @@ impl Ident {
quote_style: Some(quote),
}
}

pub fn is_empty(&self) -> bool {
self.value.is_empty()
}
}

impl From<&str> for Ident {
Expand Down
4 changes: 2 additions & 2 deletions tests-fuzz/src/ir/insert_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use std::fmt::{Debug, Display};

use datatypes::value::Value;

use crate::ir::Column;
use crate::ir::{Column, Ident};

pub struct InsertIntoExpr {
pub table_name: String,
pub table_name: Ident,
pub omit_column_list: bool,
pub columns: Vec<Column>,
pub values_list: Vec<RowValues>,
Expand Down
6 changes: 6 additions & 0 deletions tests-fuzz/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod cluster_info;
pub mod config;
pub mod crd;
pub mod health;
pub mod partition;
#[cfg(feature = "unstable")]
pub mod process;
pub mod wait;

use std::env;

Expand Down Expand Up @@ -89,6 +92,9 @@ pub fn load_unstable_test_env_variables() -> UnstableTestVariables {
}
}

pub const GT_FUZZ_CLUSTER_NAMESPACE: &str = "GT_FUZZ_CLUSTER_NAMESPACE";
pub const GT_FUZZ_CLUSTER_NAME: &str = "GT_FUZZ_CLUSTER_NAME";

/// Flushes memtable to SST file.
pub async fn flush_memtable(e: &Pool<MySql>, table_name: &Ident) -> Result<()> {
let sql = format!("SELECT flush_table(\"{}\")", table_name);
Expand Down
47 changes: 47 additions & 0 deletions tests-fuzz/src/utils/cluster_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use snafu::ResultExt;
use sqlx::database::HasArguments;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, Type};

use crate::error::{self, Result};

pub const PEER_TYPE_DATANODE: &str = "DATANODE";

#[derive(Debug, sqlx::FromRow)]
pub struct NodeInfo {
pub peer_id: i64,
pub peer_addr: String,
pub peer_type: String,
pub active_time: Option<String>,
}

/// Returns all [NodeInfo] in the cluster.
pub async fn fetch_nodes<'a, DB, E>(e: E) -> Result<Vec<NodeInfo>>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> i64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
let sql = "select * from information_schema.cluster_info;";
sqlx::query_as::<_, NodeInfo>(sql)
.fetch_all(e)
.await
.context(error::ExecuteQuerySnafu { sql })
}
4 changes: 2 additions & 2 deletions tests-fuzz/src/utils/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod common;
mod pod;
pub mod common;
pub mod pod;
71 changes: 71 additions & 0 deletions tests-fuzz/src/utils/partition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use snafu::ResultExt;
use sqlx::database::HasArguments;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, Type};

use crate::error::{self, Result};
use crate::ir::Ident;

#[derive(Debug, sqlx::FromRow)]
pub struct Partition {
pub datanode_id: u64,
pub region_id: u64,
}

#[derive(Debug, sqlx::FromRow)]
pub struct PartitionCount {
pub count: i64,
}

pub async fn count_partitions<'a, DB, E>(e: E, datanode_id: u64) -> Result<PartitionCount>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> i64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> u64: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
let sql = "select count(1) as count from information_schema.region_peers where peer_id == ?";
Ok(sqlx::query_as::<_, PartitionCount>(sql)
.bind(datanode_id)
.fetch_all(e)
.await
.context(error::ExecuteQuerySnafu { sql })?
.remove(0))
}

/// Returns all [Partition] of the specific `table`
pub async fn fetch_partitions<'a, DB, E>(e: E, table_name: Ident) -> Result<Vec<Partition>>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> u64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
let sql = "select b.peer_id as datanode_id, a.greptime_partition_id as region_id
from information_schema.partitions a left join information_schema.region_peers b
on a.greptime_partition_id = b.region_id where a.table_name= ? order by datanode_id asc;";
sqlx::query_as::<_, Partition>(sql)
.bind(table_name.value.to_string())
.fetch_all(e)
.await
.context(error::ExecuteQuerySnafu { sql })
}
38 changes: 38 additions & 0 deletions tests-fuzz/src/utils/wait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use futures::future::BoxFuture;

pub async fn wait_condition_fn<F, T, U>(
timeout: Duration,
check: F,
condition: U,
retry_interval: Duration,
) where
F: Fn() -> BoxFuture<'static, T>,
U: Fn(T) -> bool,
{
tokio::time::timeout(timeout, async move {
loop {
if condition(check().await) {
break;
}
tokio::time::sleep(retry_interval).await
}
})
.await
.unwrap();
}
21 changes: 21 additions & 0 deletions tests-fuzz/src/validator/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,27 @@ where
Ok(())
}

#[derive(Debug, sqlx::FromRow)]
pub struct ValueCount {
pub count: i64,
}

pub async fn count_values<'a, DB, E>(e: E, sql: &'a str) -> Result<ValueCount>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> i64: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
Ok(sqlx::query_as::<_, ValueCount>(sql)
.fetch_all(e)
.await
.context(error::ExecuteQuerySnafu { sql })?
.remove(0))
}

/// Returns all [RowEntry] of the `table_name`.
pub async fn fetch_values<'a, DB, E>(e: E, sql: &'a str) -> Result<Vec<<DB as Database>::Row>>
where
Expand Down
Loading

0 comments on commit 71e7dd2

Please sign in to comment.