Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: invoke flush_table and compact_table in fuzz tests #4045

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
Expand All @@ -44,7 +45,7 @@ use store_api::storage::{RegionId, ScanRequest};

use self::state::MetricEngineState;
use crate::data_region::DataRegion;
use crate::error::{Result, UnsupportedRegionRequestSnafu};
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::utils;

Expand Down Expand Up @@ -144,10 +145,33 @@ impl RegionEngine for MetricEngine {
.alter_region(region_id, alter, &mut extension_return_value)
.await
}
RegionRequest::Delete(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
RegionRequest::Flush(_) => {
if self.inner.is_physical_region(region_id) {
self.inner
.mito
.handle_request(region_id, request)
.await
.context(error::MitoFlushOperationSnafu)
.map(|response| response.affected_rows)
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Compact(_) => {
if self.inner.is_physical_region(region_id) {
self.inner
.mito
.handle_request(region_id, request)
.await
.context(error::MitoFlushOperationSnafu)
.map(|response| response.affected_rows)
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Delete(_) | RegionRequest::Truncate(_) => {
UnsupportedRegionRequestSnafu { request }.fail()
}
RegionRequest::Catchup(ref req) => self.inner.catchup_region(region_id, *req).await,
};

Expand Down
18 changes: 17 additions & 1 deletion src/metric-engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,27 @@ pub enum Error {
location: Location,
},

#[snafu(display("Mito flush operation fails"))]
MitoFlushOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Mito catchup operation fails"))]
MitoCatchupOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Mito compact operation fails"))]
MitoCompactOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to collect record batch stream"))]
CollectRecordBatchStream {
source: common_recordbatch::error::Error,
Expand Down Expand Up @@ -275,7 +289,9 @@ impl ErrorExt for Error {
| CloseMitoRegion { source, .. }
| MitoReadOperation { source, .. }
| MitoWriteOperation { source, .. }
| MitoCatchupOperation { source, .. } => source.status_code(),
| MitoCatchupOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoCompactOperation { source, .. } => source.status_code(),

CollectRecordBatchStream { source, .. } => source.status_code(),

Expand Down
7 changes: 4 additions & 3 deletions tests-fuzz/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use rand::Rng;
use serde::{Deserialize, Serialize};

use self::insert_expr::{RowValue, RowValues};
use crate::context::TableContextRef;
use crate::generator::Random;
use crate::impl_random;
use crate::ir::create_expr::ColumnOption;
Expand Down Expand Up @@ -442,15 +443,15 @@ pub fn generate_columns<R: Rng + 'static>(
/// Replace Value::Default with the corresponding default value in the rows for comparison.
pub fn replace_default(
rows: &[RowValues],
create_expr: &CreateTableExpr,
table_ctx_ref: &TableContextRef,
insert_expr: &InsertIntoExpr,
) -> Vec<RowValues> {
let index_map: HashMap<usize, usize> = insert_expr
.columns
.iter()
.enumerate()
.map(|(insert_idx, insert_column)| {
let create_idx = create_expr
let create_idx = table_ctx_ref
.columns
.iter()
.position(|create_column| create_column.name == insert_column.name)
Expand All @@ -464,7 +465,7 @@ pub fn replace_default(
let mut new_row = Vec::new();
for (idx, value) in row.iter().enumerate() {
if let RowValue::Default = value {
let column = &create_expr.columns[index_map[&idx]];
let column = &table_ctx_ref.columns[index_map[&idx]];
new_row.push(RowValue::Value(column.default_value().unwrap().clone()));
} else {
new_row.push(value.clone());
Expand Down
28 changes: 28 additions & 0 deletions tests-fuzz/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ pub mod process;
use std::env;

use common_telemetry::info;
use snafu::ResultExt;
use sqlx::mysql::MySqlPoolOptions;
use sqlx::{MySql, Pool};

use crate::error::{self, Result};
use crate::ir::Ident;

/// Database connections
pub struct Connections {
pub mysql: Option<Pool<MySql>>,
Expand Down Expand Up @@ -83,3 +87,27 @@ pub fn load_unstable_test_env_variables() -> UnstableTestVariables {
root_dir,
}
}

/// Flushes memtable to SST file.
pub async fn flush_memtable(e: &Pool<MySql>, table_name: &Ident) -> Result<()> {
let sql = format!("SELECT flush_table(\"{}\")", table_name);
let result = sqlx::query(&sql)
.execute(e)
.await
.context(error::ExecuteQuerySnafu { sql })?;
info!("Flush table: {}\n\nResult: {result:?}\n\n", table_name);

Ok(())
}

/// Triggers a compaction for table
pub async fn compact_table(e: &Pool<MySql>, table_name: &Ident) -> Result<()> {
let sql = format!("SELECT compact_table(\"{}\")", table_name);
let result = sqlx::query(&sql)
.execute(e)
.await
.context(error::ExecuteQuerySnafu { sql })?;
info!("Compact table: {}\n\nResult: {result:?}\n\n", table_name);

Ok(())
}
10 changes: 7 additions & 3 deletions tests-fuzz/targets/fuzz_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tests_fuzz::ir::{
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections_via_env, Connections};
use tests_fuzz::utils::{flush_memtable, init_greptime_connections_via_env, Connections};
use tests_fuzz::validator;

struct FuzzContext {
Expand Down Expand Up @@ -120,7 +120,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
.context(error::ExecuteQuerySnafu { sql: &sql })?;

let table_ctx = Arc::new(TableContext::from(&create_expr));
let insert_expr = generate_insert_expr(input, &mut rng, table_ctx)?;
let insert_expr = generate_insert_expr(input, &mut rng, table_ctx.clone())?;
let translator = InsertIntoExprTranslator;
let sql = translator.translate(&insert_expr)?;
let result = ctx
Expand All @@ -141,6 +141,10 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
}
);

if rng.gen_bool(0.5) {
flush_memtable(&ctx.greptime, &create_expr.table_name).await?;
}

// Validate inserted rows
// The order of inserted rows are random, so we need to sort the inserted rows by primary keys and time index for comparison
let primary_keys_names = create_expr
Expand Down Expand Up @@ -178,7 +182,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
column_list, create_expr.table_name, primary_keys_column_list
);
let fetched_rows = validator::row::fetch_values(&ctx.greptime, select_sql.as_str()).await?;
let mut expected_rows = replace_default(&insert_expr.values_list, &create_expr, &insert_expr);
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, &insert_expr);
expected_rows.sort_by(|a, b| {
let a_keys: Vec<_> = primary_keys_idxs_in_insert_expr
.iter()
Expand Down
Loading