From f15db85041a3f3aeb932bf85b95364efd8a45135 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Mon, 26 Aug 2024 15:53:40 +0800 Subject: [PATCH] feat!: impl admin command (#4600) * feat: impl admin statement parser * feat: introduce AsyncFunction and implements it for admin functions * feat: execute admin functions * fix: license header * fix: panic in test * chore: fixed by code review --- Cargo.lock | 1 + src/common/function/Cargo.toml | 1 + src/common/function/src/flush_flow.rs | 8 +- src/common/function/src/function.rs | 21 ++ src/common/function/src/function_registry.rs | 24 +- src/common/function/src/system.rs | 2 +- .../function/src/system/procedure_state.rs | 14 +- src/common/function/src/table.rs | 12 +- .../src/table/flush_compact_region.rs | 14 +- .../function/src/table/flush_compact_table.rs | 14 +- .../function/src/table/migrate_region.rs | 14 +- src/common/macro/src/admin_fn.rs | 66 ++--- src/datanode/src/tests.rs | 5 +- src/frontend/src/instance.rs | 3 + src/operator/src/error.rs | 29 +++ src/operator/src/statement.rs | 2 + src/operator/src/statement/admin.rs | 233 ++++++++++++++++++ src/query/src/datafusion.rs | 4 + src/query/src/query_engine.rs | 3 + src/sql/src/parser.rs | 4 +- src/sql/src/parsers.rs | 1 + src/sql/src/parsers/admin_parser.rs | 124 ++++++++++ src/sql/src/statements.rs | 1 + src/sql/src/statements/admin.rs | 34 +++ src/sql/src/statements/statement.rs | 4 + tests-fuzz/src/utils.rs | 4 +- tests-fuzz/src/utils/migration.rs | 5 +- tests-fuzz/src/utils/procedure.rs | 2 +- tests-integration/tests/region_migration.rs | 12 +- .../admin/flush_compact_region.result | 18 +- .../function/admin/flush_compact_region.sql | 4 +- .../common/alter/drop_add_col.result | 4 +- .../standalone/common/alter/drop_add_col.sql | 2 +- .../standalone/common/flow/flow_basic.result | 84 +++---- .../standalone/common/flow/flow_basic.sql | 26 +- .../common/flow/flow_call_df_func.result | 224 ++++++++--------- .../common/flow/flow_call_df_func.sql | 64 ++--- .../function/admin/flush_compact_table.result | 8 +- .../function/admin/flush_compact_table.sql | 4 +- 39 files changed, 777 insertions(+), 322 deletions(-) create mode 100644 src/operator/src/statement/admin.rs create mode 100644 src/sql/src/parsers/admin_parser.rs create mode 100644 src/sql/src/statements/admin.rs diff --git a/Cargo.lock b/Cargo.lock index 24fc59697ef2..837bfdc23c0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1954,6 +1954,7 @@ dependencies = [ "statrs", "store-api", "table", + "tokio", ] [[package]] diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 67eda1c7a419..e7d6ee870f4f 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -39,3 +39,4 @@ table.workspace = true [dev-dependencies] ron = "0.7" serde = { version = "1.0", features = ["derive"] } +tokio.workspace = true diff --git a/src/common/function/src/flush_flow.rs b/src/common/function/src/flush_flow.rs index 27944547c349..63fd49ac93b3 100644 --- a/src/common/function/src/flush_flow.rs +++ b/src/common/function/src/flush_flow.rs @@ -110,7 +110,7 @@ mod test { use session::context::QueryContext; use super::*; - use crate::function::{Function, FunctionContext}; + use crate::function::{AsyncFunction, FunctionContext}; #[test] fn test_flush_flow_metadata() { @@ -130,8 +130,8 @@ mod test { ); } - #[test] - fn test_missing_flow_service() { + #[tokio::test] + async fn test_missing_flow_service() { let f = FlushFlowFunction; let args = vec!["flow_name"]; @@ -140,7 +140,7 @@ mod test { .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _) .collect::>(); - let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); assert_eq!( "Missing FlowServiceHandler, not expected", result.to_string() diff --git a/src/common/function/src/function.rs b/src/common/function/src/function.rs index 59c64a8d774d..d7e2d310e203 100644 --- a/src/common/function/src/function.rs +++ b/src/common/function/src/function.rs @@ -56,8 +56,10 @@ pub trait Function: fmt::Display + Sync + Send { /// Returns the name of the function, should be unique. fn name(&self) -> &str; + /// The returned data type of function execution. fn return_type(&self, input_types: &[ConcreteDataType]) -> Result; + /// The signature of function. fn signature(&self) -> Signature; /// Evaluate the function, e.g. run/execute the function. @@ -65,3 +67,22 @@ pub trait Function: fmt::Display + Sync + Send { } pub type FunctionRef = Arc; + +/// Async Scalar function trait +#[async_trait::async_trait] +pub trait AsyncFunction: fmt::Display + Sync + Send { + /// Returns the name of the function, should be unique. + fn name(&self) -> &str; + + /// The returned data type of function execution. + fn return_type(&self, input_types: &[ConcreteDataType]) -> Result; + + /// The signature of function. + fn signature(&self) -> Signature; + + /// Evaluate the function, e.g. run/execute the function. + /// TODO(dennis): simplify the signature and refactor all the admin functions. + async fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result; +} + +pub type AsyncFunctionRef = Arc; diff --git a/src/common/function/src/function_registry.rs b/src/common/function/src/function_registry.rs index 31b2b223a67b..c2a315d51dad 100644 --- a/src/common/function/src/function_registry.rs +++ b/src/common/function/src/function_registry.rs @@ -18,7 +18,7 @@ use std::sync::{Arc, RwLock}; use once_cell::sync::Lazy; -use crate::function::FunctionRef; +use crate::function::{AsyncFunctionRef, FunctionRef}; use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions}; use crate::scalars::date::DateFunction; use crate::scalars::expression::ExpressionFunction; @@ -32,6 +32,7 @@ use crate::table::TableFunction; #[derive(Default)] pub struct FunctionRegistry { functions: RwLock>, + async_functions: RwLock>, aggregate_functions: RwLock>, } @@ -44,6 +45,27 @@ impl FunctionRegistry { .insert(func.name().to_string(), func); } + pub fn register_async(&self, func: AsyncFunctionRef) { + let _ = self + .async_functions + .write() + .unwrap() + .insert(func.name().to_string(), func); + } + + pub fn get_async_function(&self, name: &str) -> Option { + self.async_functions.read().unwrap().get(name).cloned() + } + + pub fn async_functions(&self) -> Vec { + self.async_functions + .read() + .unwrap() + .values() + .cloned() + .collect() + } + pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) { let _ = self .aggregate_functions diff --git a/src/common/function/src/system.rs b/src/common/function/src/system.rs index 9b90004f4a8d..6d1c1ebb47cc 100644 --- a/src/common/function/src/system.rs +++ b/src/common/function/src/system.rs @@ -38,7 +38,7 @@ impl SystemFunction { registry.register(Arc::new(VersionFunction)); registry.register(Arc::new(DatabaseFunction)); registry.register(Arc::new(TimezoneFunction)); - registry.register(Arc::new(ProcedureStateFunction)); + registry.register_async(Arc::new(ProcedureStateFunction)); PGCatalogFunction::register(registry); } } diff --git a/src/common/function/src/system/procedure_state.rs b/src/common/function/src/system/procedure_state.rs index 3b0ce0195b59..389c55311154 100644 --- a/src/common/function/src/system/procedure_state.rs +++ b/src/common/function/src/system/procedure_state.rs @@ -96,7 +96,7 @@ mod tests { use datatypes::vectors::StringVector; use super::*; - use crate::function::{Function, FunctionContext}; + use crate::function::{AsyncFunction, FunctionContext}; #[test] fn test_procedure_state_misc() { @@ -114,8 +114,8 @@ mod tests { )); } - #[test] - fn test_missing_procedure_service() { + #[tokio::test] + async fn test_missing_procedure_service() { let f = ProcedureStateFunction; let args = vec!["pid"]; @@ -125,15 +125,15 @@ mod tests { .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _) .collect::>(); - let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); assert_eq!( "Missing ProcedureServiceHandler, not expected", result.to_string() ); } - #[test] - fn test_procedure_state() { + #[tokio::test] + async fn test_procedure_state() { let f = ProcedureStateFunction; let args = vec!["pid"]; @@ -143,7 +143,7 @@ mod tests { .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _) .collect::>(); - let result = f.eval(FunctionContext::mock(), &args).unwrap(); + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); let expect: VectorRef = Arc::new(StringVector::from(vec![ "{\"status\":\"Done\",\"error\":\"OK\"}", diff --git a/src/common/function/src/table.rs b/src/common/function/src/table.rs index d61723375f89..91ee6dd178fd 100644 --- a/src/common/function/src/table.rs +++ b/src/common/function/src/table.rs @@ -31,11 +31,11 @@ pub(crate) struct TableFunction; impl TableFunction { /// Register all table functions to [`FunctionRegistry`]. pub fn register(registry: &FunctionRegistry) { - registry.register(Arc::new(MigrateRegionFunction)); - registry.register(Arc::new(FlushRegionFunction)); - registry.register(Arc::new(CompactRegionFunction)); - registry.register(Arc::new(FlushTableFunction)); - registry.register(Arc::new(CompactTableFunction)); - registry.register(Arc::new(FlushFlowFunction)); + registry.register_async(Arc::new(MigrateRegionFunction)); + registry.register_async(Arc::new(FlushRegionFunction)); + registry.register_async(Arc::new(CompactRegionFunction)); + registry.register_async(Arc::new(FlushTableFunction)); + registry.register_async(Arc::new(CompactTableFunction)); + registry.register_async(Arc::new(FlushFlowFunction)); } } diff --git a/src/common/function/src/table/flush_compact_region.rs b/src/common/function/src/table/flush_compact_region.rs index 6d89c5c12537..17e5ee712ffc 100644 --- a/src/common/function/src/table/flush_compact_region.rs +++ b/src/common/function/src/table/flush_compact_region.rs @@ -77,7 +77,7 @@ mod tests { use datatypes::vectors::UInt64Vector; use super::*; - use crate::function::{Function, FunctionContext}; + use crate::function::{AsyncFunction, FunctionContext}; macro_rules! define_region_function_test { ($name: ident, $func: ident) => { @@ -97,8 +97,8 @@ mod tests { } if valid_types == ConcreteDataType::numerics())); } - #[test] - fn []() { + #[tokio::test] + async fn []() { let f = $func; let args = vec![99]; @@ -108,15 +108,15 @@ mod tests { .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) .collect::>(); - let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); assert_eq!( "Missing TableMutationHandler, not expected", result.to_string() ); } - #[test] - fn []() { + #[tokio::test] + async fn []() { let f = $func; @@ -127,7 +127,7 @@ mod tests { .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) .collect::>(); - let result = f.eval(FunctionContext::mock(), &args).unwrap(); + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42])); assert_eq!(expect, result); diff --git a/src/common/function/src/table/flush_compact_table.rs b/src/common/function/src/table/flush_compact_table.rs index fe2220b127ee..e946a3819406 100644 --- a/src/common/function/src/table/flush_compact_table.rs +++ b/src/common/function/src/table/flush_compact_table.rs @@ -210,7 +210,7 @@ mod tests { use session::context::QueryContext; use super::*; - use crate::function::{Function, FunctionContext}; + use crate::function::{AsyncFunction, FunctionContext}; macro_rules! define_table_function_test { ($name: ident, $func: ident) => { @@ -230,8 +230,8 @@ mod tests { } if valid_types == vec![ConcreteDataType::string_datatype()])); } - #[test] - fn []() { + #[tokio::test] + async fn []() { let f = $func; let args = vec!["test"]; @@ -241,15 +241,15 @@ mod tests { .map(|arg| Arc::new(StringVector::from(vec![arg])) as _) .collect::>(); - let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); assert_eq!( "Missing TableMutationHandler, not expected", result.to_string() ); } - #[test] - fn []() { + #[tokio::test] + async fn []() { let f = $func; @@ -260,7 +260,7 @@ mod tests { .map(|arg| Arc::new(StringVector::from(vec![arg])) as _) .collect::>(); - let result = f.eval(FunctionContext::mock(), &args).unwrap(); + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42])); assert_eq!(expect, result); diff --git a/src/common/function/src/table/migrate_region.rs b/src/common/function/src/table/migrate_region.rs index 4e416d0246a0..b46231eb4523 100644 --- a/src/common/function/src/table/migrate_region.rs +++ b/src/common/function/src/table/migrate_region.rs @@ -123,7 +123,7 @@ mod tests { use datatypes::vectors::{StringVector, UInt64Vector, VectorRef}; use super::*; - use crate::function::{Function, FunctionContext}; + use crate::function::{AsyncFunction, FunctionContext}; #[test] fn test_migrate_region_misc() { @@ -140,8 +140,8 @@ mod tests { } if sigs.len() == 2)); } - #[test] - fn test_missing_procedure_service() { + #[tokio::test] + async fn test_missing_procedure_service() { let f = MigrateRegionFunction; let args = vec![1, 1, 1]; @@ -151,15 +151,15 @@ mod tests { .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) .collect::>(); - let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); assert_eq!( "Missing ProcedureServiceHandler, not expected", result.to_string() ); } - #[test] - fn test_migrate_region() { + #[tokio::test] + async fn test_migrate_region() { let f = MigrateRegionFunction; let args = vec![1, 1, 1]; @@ -169,7 +169,7 @@ mod tests { .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) .collect::>(); - let result = f.eval(FunctionContext::mock(), &args).unwrap(); + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); assert_eq!(expect, result); diff --git a/src/common/macro/src/admin_fn.rs b/src/common/macro/src/admin_fn.rs index 843b2c406613..b95374540cd3 100644 --- a/src/common/macro/src/admin_fn.rs +++ b/src/common/macro/src/admin_fn.rs @@ -187,7 +187,8 @@ fn build_struct( } - impl crate::function::Function for #name { + #[async_trait::async_trait] + impl crate::function::AsyncFunction for #name { fn name(&self) -> &'static str { #display_name } @@ -200,7 +201,7 @@ fn build_struct( #sig_fn() } - fn eval(&self, func_ctx: crate::function::FunctionContext, columns: &[datatypes::vectors::VectorRef]) -> common_query::error::Result { + async fn eval(&self, func_ctx: crate::function::FunctionContext, columns: &[datatypes::vectors::VectorRef]) -> common_query::error::Result { // Ensure under the `greptime` catalog for security crate::ensure_greptime!(func_ctx); @@ -212,51 +213,36 @@ fn build_struct( }; let columns = Vec::from(columns); - // TODO(dennis): DataFusion doesn't support async UDF currently - std::thread::spawn(move || { - use snafu::OptionExt; - use datatypes::data_type::DataType; + use snafu::OptionExt; + use datatypes::data_type::DataType; - let query_ctx = &func_ctx.query_ctx; - let handler = func_ctx - .state - .#handler - .as_ref() - .context(#snafu_type)?; + let query_ctx = &func_ctx.query_ctx; + let handler = func_ctx + .state + .#handler + .as_ref() + .context(#snafu_type)?; - let mut builder = store_api::storage::ConcreteDataType::#ret() - .create_mutable_vector(rows_num); + let mut builder = store_api::storage::ConcreteDataType::#ret() + .create_mutable_vector(rows_num); - if columns_num == 0 { - let result = common_runtime::block_on_global(async move { - #fn_name(handler, query_ctx, &[]).await - })?; + if columns_num == 0 { + let result = #fn_name(handler, query_ctx, &[]).await?; + + builder.push_value_ref(result.as_value_ref()); + } else { + for i in 0..rows_num { + let args: Vec<_> = columns.iter() + .map(|vector| vector.get_ref(i)) + .collect(); + + let result = #fn_name(handler, query_ctx, &args).await?; builder.push_value_ref(result.as_value_ref()); - } else { - for i in 0..rows_num { - let args: Vec<_> = columns.iter() - .map(|vector| vector.get_ref(i)) - .collect(); - - let result = common_runtime::block_on_global(async move { - #fn_name(handler, query_ctx, &args).await - })?; - - builder.push_value_ref(result.as_value_ref()); - } } + } - Ok(builder.to_vector()) - }) - .join() - .map_err(|e| { - common_telemetry::error!(e; "Join thread error"); - common_query::error::Error::ThreadJoin { - location: snafu::Location::default(), - } - })? - + Ok(builder.to_vector()) } } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 645f871acdec..87285eabfa67 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -27,7 +27,7 @@ use common_runtime::Runtime; use query::dataframe::DataFrame; use query::plan::LogicalPlan; use query::planner::LogicalPlanner; -use query::query_engine::DescribeResult; +use query::query_engine::{DescribeResult, QueryEngineState}; use query::{QueryEngine, QueryEngineContext}; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; @@ -86,6 +86,9 @@ impl QueryEngine for MockQueryEngine { fn engine_context(&self, _query_ctx: QueryContextRef) -> QueryEngineContext { unimplemented!() } + fn engine_state(&self) -> &QueryEngineState { + unimplemented!() + } } /// Create a region server without any engine diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 5f027ebb7fac..b68d1a0b215c 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -441,6 +441,9 @@ pub fn check_permission( } match stmt { + // Will be checked in execution. + // TODO(dennis): add a hook for admin commands. + Statement::Admin(_) => {} // These are executed by query engine, and will be checked there. Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) | Statement::Delete(_) => {} // database ops won't be checked diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 8e7991fd06b1..ca5d8564fede 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -42,6 +42,19 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to execute admin function"))] + ExecuteAdminFunction { + #[snafu(implicit)] + location: Location, + source: common_query::error::Error, + }, + + #[snafu(display("Failed to build admin function args: {msg}"))] + BuildAdminFunctionArgs { msg: String }, + + #[snafu(display("Expected {expected} args, but actual {actual}"))] + FunctionArityMismatch { expected: usize, actual: usize }, + #[snafu(display("Failed to invalidate table cache"))] InvalidateTableCache { #[snafu(implicit)] @@ -209,6 +222,9 @@ pub enum Error { #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String }, + #[snafu(display("Admin function not found: {}", name))] + AdminFunctionNotFound { name: String }, + #[snafu(display("Flow not found: {}", flow_name))] FlowNotFound { flow_name: String }, @@ -546,6 +562,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to build record batch"))] + BuildRecordBatch { + #[snafu(implicit)] + location: Location, + source: common_recordbatch::error::Error, + }, + #[snafu(display("Failed to read orc schema"))] ReadOrc { source: common_datasource::error::Error, @@ -792,9 +815,12 @@ impl ErrorExt for Error { | Error::InvalidViewName { .. } | Error::InvalidView { .. } | Error::InvalidExpr { .. } + | Error::AdminFunctionNotFound { .. } | Error::ViewColumnsMismatch { .. } | Error::InvalidViewStmt { .. } | Error::ConvertIdentifier { .. } + | Error::BuildAdminFunctionArgs { .. } + | Error::FunctionArityMismatch { .. } | Error::InvalidPartition { .. } | Error::PhysicalExpr { .. } => StatusCode::InvalidArguments, @@ -902,6 +928,9 @@ impl ErrorExt for Error { | Error::InvalidTimestampRange { .. } => StatusCode::InvalidArguments, Error::CreateLogicalTables { .. } => StatusCode::Unexpected, + + Error::ExecuteAdminFunction { source, .. } => source.status_code(), + Error::BuildRecordBatch { source, .. } => source.status_code(), } } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 123f794cd2e8..4fcd21028528 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod admin; mod copy_database; mod copy_table_from; mod copy_table_to; @@ -277,6 +278,7 @@ impl StatementExecutor { Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await, Statement::ShowStatus(_) => self.show_status(query_ctx).await, Statement::Use(db) => self.use_database(db, query_ctx).await, + Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await, } } diff --git a/src/operator/src/statement/admin.rs b/src/operator/src/statement/admin.rs new file mode 100644 index 000000000000..37e6acf9665f --- /dev/null +++ b/src/operator/src/statement/admin.rs @@ -0,0 +1,233 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_function::function::FunctionContext; +use common_function::function_registry::FUNCTION_REGISTRY; +use common_query::prelude::TypeSignature; +use common_query::Output; +use common_recordbatch::{RecordBatch, RecordBatches}; +use common_telemetry::tracing; +use common_time::Timezone; +use datatypes::data_type::DataType; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::value::Value; +use datatypes::vectors::VectorRef; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; +use sql::ast::{Expr, FunctionArg, FunctionArgExpr, Value as SqlValue}; +use sql::statements::admin::Admin; +use sql::statements::sql_value_to_value; + +use crate::error::{self, Result}; +use crate::statement::StatementExecutor; + +const DUMMY_COLUMN: &str = ""; + +impl StatementExecutor { + /// Execute the [`Admin`] statement and returns the output. + #[tracing::instrument(skip_all)] + pub(super) async fn execute_admin_command( + &self, + stmt: Admin, + query_ctx: QueryContextRef, + ) -> Result { + let Admin::Func(func) = &stmt; + // the function name should be in lower case. + let func_name = func.name.to_string().to_lowercase(); + let admin_func = FUNCTION_REGISTRY + .get_async_function(&func_name) + .context(error::AdminFunctionNotFoundSnafu { name: func_name })?; + + let signature = admin_func.signature(); + let arg_values = func + .args + .iter() + .map(|arg| { + let FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(value))) = arg else { + return error::BuildAdminFunctionArgsSnafu { + msg: "unsupported function arg {arg}", + } + .fail(); + }; + Ok(value) + }) + .collect::>>()?; + + let args = args_to_vector(&signature.type_signature, &arg_values, &query_ctx)?; + let arg_types = args.iter().map(|arg| arg.data_type()).collect::>(); + + let func_ctx = FunctionContext { + query_ctx, + state: self.query_engine.engine_state().function_state(), + }; + + let result = admin_func + .eval(func_ctx, &args) + .await + .context(error::ExecuteAdminFunctionSnafu)?; + + let column_schemas = vec![ColumnSchema::new( + // Use statement as the result column name + stmt.to_string(), + admin_func + .return_type(&arg_types) + .context(error::ExecuteAdminFunctionSnafu)?, + false, + )]; + let schema = Arc::new(Schema::new(column_schemas)); + let batch = + RecordBatch::new(schema.clone(), vec![result]).context(error::BuildRecordBatchSnafu)?; + let batches = + RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?; + + Ok(Output::new_with_record_batches(batches)) + } +} + +/// Try to cast the arguments to vectors by function's signature. +fn args_to_vector( + type_signature: &TypeSignature, + args: &Vec<&SqlValue>, + query_ctx: &QueryContextRef, +) -> Result> { + let tz = query_ctx.timezone(); + + match type_signature { + TypeSignature::Variadic(valid_types) => { + values_to_vectors_by_valid_types(valid_types, args, Some(&tz)) + } + + TypeSignature::Uniform(arity, valid_types) => { + ensure!( + *arity == args.len(), + error::FunctionArityMismatchSnafu { + actual: args.len(), + expected: *arity, + } + ); + + values_to_vectors_by_valid_types(valid_types, args, Some(&tz)) + } + + TypeSignature::Exact(data_types) => { + values_to_vectors_by_exact_types(data_types, args, Some(&tz)) + } + + TypeSignature::VariadicAny => { + let data_types = args + .iter() + .map(|value| try_get_data_type_for_sql_value(value)) + .collect::>>()?; + + values_to_vectors_by_exact_types(&data_types, args, Some(&tz)) + } + + TypeSignature::Any(arity) => { + ensure!( + *arity == args.len(), + error::FunctionArityMismatchSnafu { + actual: args.len(), + expected: *arity, + } + ); + + let data_types = args + .iter() + .map(|value| try_get_data_type_for_sql_value(value)) + .collect::>>()?; + + values_to_vectors_by_exact_types(&data_types, args, Some(&tz)) + } + + TypeSignature::OneOf(type_sigs) => { + for type_sig in type_sigs { + if let Ok(vectors) = args_to_vector(type_sig, args, query_ctx) { + return Ok(vectors); + } + } + + error::BuildAdminFunctionArgsSnafu { + msg: "function signature not match", + } + .fail() + } + } +} + +/// Try to cast sql values to vectors by exact data types. +fn values_to_vectors_by_exact_types( + exact_types: &[ConcreteDataType], + args: &[&SqlValue], + tz: Option<&Timezone>, +) -> Result> { + args.iter() + .zip(exact_types.iter()) + .map(|(value, data_type)| { + let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None) + .context(error::ParseSqlValueSnafu)?; + + Ok(value_to_vector(value)) + }) + .collect() +} + +/// Try to cast sql values to vectors by valid data types. +fn values_to_vectors_by_valid_types( + valid_types: &[ConcreteDataType], + args: &[&SqlValue], + tz: Option<&Timezone>, +) -> Result> { + args.iter() + .map(|value| { + for data_type in valid_types { + if let Ok(value) = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None) { + return Ok(value_to_vector(value)); + } + } + + error::BuildAdminFunctionArgsSnafu { + msg: "failed to cast {value}", + } + .fail() + }) + .collect::>>() +} + +/// Build a [`VectorRef`] from [`Value`] +fn value_to_vector(value: Value) -> VectorRef { + let data_type = value.data_type(); + let mut mutable_vector = data_type.create_mutable_vector(1); + mutable_vector.push_value_ref(value.as_value_ref()); + + mutable_vector.to_vector() +} + +/// Try to infer the data type from sql value. +fn try_get_data_type_for_sql_value(value: &SqlValue) -> Result { + match value { + SqlValue::Number(_, _) => Ok(ConcreteDataType::float64_datatype()), + SqlValue::Null => Ok(ConcreteDataType::null_datatype()), + SqlValue::Boolean(_) => Ok(ConcreteDataType::boolean_datatype()), + SqlValue::HexStringLiteral(_) + | SqlValue::DoubleQuotedString(_) + | SqlValue::SingleQuotedString(_) => Ok(ConcreteDataType::string_datatype()), + _ => error::BuildAdminFunctionArgsSnafu { + msg: format!("unsupported sql value: {value}"), + } + .fail(), + } +} diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index a7dfd17d4921..907b14c20d70 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -447,6 +447,10 @@ impl QueryEngine for DatafusionQueryEngine { state.config_mut().set_extension(query_ctx.clone()); QueryEngineContext::new(state, query_ctx) } + + fn engine_state(&self) -> &QueryEngineState { + &self.state + } } impl QueryExecutor for DatafusionQueryEngine { diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 1eaff7b732c9..e2d3e01c9114 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -93,6 +93,9 @@ pub trait QueryEngine: Send + Sync { /// Create a [`QueryEngineContext`]. fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext; + + /// Retrieve the query engine state [`QueryEngineState`] + fn engine_state(&self) -> &QueryEngineState; } pub struct QueryEngineFactory { diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index c15c0cd1af1b..56163e378bbb 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -159,8 +159,10 @@ impl<'a> ParserContext<'a> { Keyword::SET => self.parse_set_variables(), + Keyword::ADMIN => self.parse_admin_command(), + Keyword::NoKeyword - if w.value.to_uppercase() == tql_parser::TQL && w.quote_style.is_none() => + if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL => { self.parse_tql() } diff --git a/src/sql/src/parsers.rs b/src/sql/src/parsers.rs index e4643da3e90a..2ae0697231c5 100644 --- a/src/sql/src/parsers.rs +++ b/src/sql/src/parsers.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod admin_parser; mod alter_parser; pub(crate) mod copy_parser; pub(crate) mod create_parser; diff --git a/src/sql/src/parsers/admin_parser.rs b/src/sql/src/parsers/admin_parser.rs new file mode 100644 index 000000000000..b9a118b2d415 --- /dev/null +++ b/src/sql/src/parsers/admin_parser.rs @@ -0,0 +1,124 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ResultExt; + +use crate::ast::Expr; +use crate::error::{Result, SyntaxSnafu}; +use crate::parser::ParserContext; +use crate::statements::admin::Admin; +use crate::statements::statement::Statement; + +/// `admin` extension parser: `admin function(arg1, arg2, ...)` +/// or `admin function` +impl<'a> ParserContext<'a> { + /// Parse `admin function(arg1, arg2, ...)` or `admin function` statement + pub(crate) fn parse_admin_command(&mut self) -> Result { + let _token = self.parser.next_token(); + + let object_name = self.parser.parse_object_name(false).context(SyntaxSnafu)?; + + let func = match self + .parser + .parse_function(object_name) + .context(SyntaxSnafu)? + { + Expr::Function(f) => f, + _ => { + return self.unsupported(self.peek_token_as_string()); + } + }; + + Ok(Statement::Admin(Admin::Func(func))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ast::{Expr, Function, FunctionArg, FunctionArgExpr, Value}; + use crate::dialect::GreptimeDbDialect; + use crate::parser::ParseOptions; + + #[test] + fn test_parse_admin_function() { + let sql = "ADMIN flush_table('test')"; + + let mut result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, result.len()); + let stmt = result.remove(0); + match &stmt { + Statement::Admin(Admin::Func(Function { name, args, .. })) => { + assert_eq!("flush_table", name.to_string()); + assert_eq!(args.len(), 1); + assert!(matches!(&args[0], + FunctionArg::Unnamed(FunctionArgExpr::Expr( + Expr::Value(Value::SingleQuotedString(s)) + )) if s == "test")); + } + _ => unreachable!(), + } + + assert_eq!(sql, stmt.to_string()); + } + + #[test] + fn test_parse_admin_function_without_args() { + let sql = "ADMIN test()"; + + let mut result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, result.len()); + let stmt = result.remove(0); + match &stmt { + Statement::Admin(Admin::Func(Function { name, args, .. })) => { + assert_eq!("test", name.to_string()); + assert_eq!(args.len(), 0); + } + _ => unreachable!(), + } + + assert_eq!("ADMIN test()", stmt.to_string()); + } + + #[test] + fn test_invalid_admin_statement() { + let sql = "ADMIN"; + assert!(ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default() + ) + .is_err()); + + let sql = "ADMIN test"; + assert!(ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default() + ) + .is_err()); + + let sql = "ADMIN test test"; + assert!(ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default() + ) + .is_err()); + } +} diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index a011fa581b46..a042473503b8 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod admin; pub mod alter; pub mod copy; pub mod create; diff --git a/src/sql/src/statements/admin.rs b/src/sql/src/statements/admin.rs new file mode 100644 index 000000000000..bbe805a4c163 --- /dev/null +++ b/src/sql/src/statements/admin.rs @@ -0,0 +1,34 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; + +use sqlparser_derive::{Visit, VisitMut}; + +use crate::ast::Function; + +/// `ADMIN` statement to execute some administration commands. +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] +pub enum Admin { + /// Run a admin function. + Func(Function), +} + +impl Display for Admin { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Admin::Func(func) => write!(f, "ADMIN {func}"), + } + } +} diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 9115902bf548..c858397e85ec 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -19,6 +19,7 @@ use sqlparser::ast::Statement as SpStatement; use sqlparser_derive::{Visit, VisitMut}; use crate::error::{ConvertToDfStatementSnafu, Error}; +use crate::statements::admin::Admin; use crate::statements::alter::AlterTable; use crate::statements::create::{ CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, @@ -110,6 +111,8 @@ pub enum Statement { ShowVariables(ShowVariables), // USE Use(String), + // Admin statement(extension) + Admin(Admin), } impl Display for Statement { @@ -154,6 +157,7 @@ impl Display for Statement { } Statement::CreateView(s) => s.fmt(f), Statement::Use(s) => s.fmt(f), + Statement::Admin(admin) => admin.fmt(f), } } } diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index 90ffe0c40e22..743347978924 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -99,7 +99,7 @@ pub const GT_FUZZ_CLUSTER_NAME: &str = "GT_FUZZ_CLUSTER_NAME"; /// Flushes memtable to SST file. pub async fn flush_memtable(e: &Pool, table_name: &Ident) -> Result<()> { - let sql = format!("SELECT flush_table(\"{}\")", table_name); + let sql = format!("admin flush_table(\"{}\")", table_name); let result = sqlx::query(&sql) .execute(e) .await @@ -111,7 +111,7 @@ pub async fn flush_memtable(e: &Pool, table_name: &Ident) -> Result<()> { /// Triggers a compaction for table pub async fn compact_table(e: &Pool, table_name: &Ident) -> Result<()> { - let sql = format!("SELECT compact_table(\"{}\")", table_name); + let sql = format!("admin compact_table(\"{}\")", table_name); let result = sqlx::query(&sql) .execute(e) .await diff --git a/tests-fuzz/src/utils/migration.rs b/tests-fuzz/src/utils/migration.rs index e4e7e0d1e39a..043e2271ae92 100644 --- a/tests-fuzz/src/utils/migration.rs +++ b/tests-fuzz/src/utils/migration.rs @@ -35,9 +35,8 @@ pub async fn migrate_region( to_peer_id: u64, timeout_secs: u64, ) -> String { - let sql = format!( - "select migrate_region({region_id}, {from_peer_id}, {to_peer_id}, {timeout_secs}) as output;" - ); + let sql = + format!("admin migrate_region({region_id}, {from_peer_id}, {to_peer_id}, {timeout_secs});"); let result = sqlx::query(&sql) .fetch_one(e) .await diff --git a/tests-fuzz/src/utils/procedure.rs b/tests-fuzz/src/utils/procedure.rs index 0b5e50adb91b..72259272120a 100644 --- a/tests-fuzz/src/utils/procedure.rs +++ b/tests-fuzz/src/utils/procedure.rs @@ -23,7 +23,7 @@ use crate::error; /// Fetches the state of a procedure. pub async fn procedure_state(e: &Pool, procedure_id: &str) -> String { - let sql = format!("select procedure_state(\"{procedure_id}\");"); + let sql = format!("admin procedure_state(\"{procedure_id}\");"); let result = sqlx::query(&sql) .fetch_one(e) .await diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 056aa2ab7fde..98e10c8b2ddb 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -1118,9 +1118,9 @@ async fn trigger_migration_by_sql( from_peer_id: u64, to_peer_id: u64, ) -> String { - let OutputData::Stream(stream) = run_sql( + let OutputData::RecordBatches(recordbatches) = run_sql( &cluster.frontend, - &format!("select migrate_region({region_id}, {from_peer_id}, {to_peer_id})"), + &format!("admin migrate_region({region_id}, {from_peer_id}, {to_peer_id})"), QueryContext::arc(), ) .await @@ -1130,8 +1130,6 @@ async fn trigger_migration_by_sql( unreachable!(); }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - info!("SQL result:\n {}", recordbatches.pretty_print().unwrap()); let Value::String(procedure_id) = recordbatches.take()[0].column(0).get(0) else { @@ -1143,9 +1141,9 @@ async fn trigger_migration_by_sql( /// Query procedure state by SQL. async fn query_procedure_by_sql(instance: &Arc, pid: &str) -> String { - let OutputData::Stream(stream) = run_sql( + let OutputData::RecordBatches(recordbatches) = run_sql( instance, - &format!("select procedure_state('{pid}')"), + &format!("admin procedure_state('{pid}')"), QueryContext::arc(), ) .await @@ -1155,8 +1153,6 @@ async fn query_procedure_by_sql(instance: &Arc, pid: &str) -> String { unreachable!(); }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - info!("SQL result:\n {}", recordbatches.pretty_print().unwrap()); let Value::String(state) = recordbatches.take()[0].column(0).get(0) else { diff --git a/tests/cases/distributed/function/admin/flush_compact_region.result b/tests/cases/distributed/function/admin/flush_compact_region.result index 322b99b76c75..a13755b34824 100644 --- a/tests/cases/distributed/function/admin/flush_compact_region.result +++ b/tests/cases/distributed/function/admin/flush_compact_region.result @@ -31,22 +31,8 @@ SELECT * FROM my_table; | 22 | f | 1970-01-01T00:00:00.005 | +----+---+-------------------------+ -SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; - -+-------------------------------------------------------------------+ -| flush_region(information_schema.partitions.greptime_partition_id) | -+-------------------------------------------------------------------+ -| 0 | -+-------------------------------------------------------------------+ - -SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; - -+---------------------------------------------------------------------+ -| compact_region(information_schema.partitions.greptime_partition_id) | -+---------------------------------------------------------------------+ -| 0 | -+---------------------------------------------------------------------+ - +-- SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; -- +-- SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; -- SELECT * FROM my_table; +----+---+-------------------------+ diff --git a/tests/cases/distributed/function/admin/flush_compact_region.sql b/tests/cases/distributed/function/admin/flush_compact_region.sql index f7146c6b20e6..ba9bc442bf01 100644 --- a/tests/cases/distributed/function/admin/flush_compact_region.sql +++ b/tests/cases/distributed/function/admin/flush_compact_region.sql @@ -17,9 +17,9 @@ INSERT INTO my_table VALUES SELECT * FROM my_table; -SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; +-- SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; -- -SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; +-- SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; -- SELECT * FROM my_table; diff --git a/tests/cases/standalone/common/alter/drop_add_col.result b/tests/cases/standalone/common/alter/drop_add_col.result index a677c097d73a..62480f3fc15f 100644 --- a/tests/cases/standalone/common/alter/drop_add_col.result +++ b/tests/cases/standalone/common/alter/drop_add_col.result @@ -15,10 +15,10 @@ SELECT * FROM test order by i; | 1970-01-01T00:00:00.002 | 12 | 5 | +-------------------------+----+---+ -SELECT FLUSH_TABLE('test'); +ADMIN FLUSH_TABLE('test'); +---------------------------+ -| flush_table(Utf8("test")) | +| ADMIN FLUSH_TABLE('test') | +---------------------------+ | 0 | +---------------------------+ diff --git a/tests/cases/standalone/common/alter/drop_add_col.sql b/tests/cases/standalone/common/alter/drop_add_col.sql index 0fd9795efd19..379e31b5046a 100644 --- a/tests/cases/standalone/common/alter/drop_add_col.sql +++ b/tests/cases/standalone/common/alter/drop_add_col.sql @@ -4,7 +4,7 @@ INSERT INTO test(i, j, k) VALUES (1, 11, 5), (2, 12, 5); SELECT * FROM test order by i; -SELECT FLUSH_TABLE('test'); +ADMIN FLUSH_TABLE('test'); ALTER TABLE test DROP COLUMN j; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index 3c49535b0fd3..d4fb6276bd8a 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -7,38 +7,38 @@ CREATE TABLE numbers_input_basic ( Affected Rows: 0 -CREATE FLOW test_numbers_basic +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic -AS +AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); Affected Rows: 0 -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -select flush_flow('test_numbers_basic')<=1; +admin flush_flow('test_numbers_basic'); -+----------------------------------------------------+ -| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) | -+----------------------------------------------------+ -| true | -+----------------------------------------------------+ ++----------------------------------------+ +| ADMIN flush_flow('test_numbers_basic') | ++----------------------------------------+ +| 0 | ++----------------------------------------+ -- SQLNESS ARG restart=true -INSERT INTO numbers_input_basic +INSERT INTO numbers_input_basic VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); Affected Rows: 2 -select flush_flow('test_numbers_basic')<=1; +admin flush_flow('test_numbers_basic'); -+----------------------------------------------------+ -| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) | -+----------------------------------------------------+ -| true | -+----------------------------------------------------+ ++----------------------------------------+ +| ADMIN flush_flow('test_numbers_basic') | ++----------------------------------------+ +| 1 | ++----------------------------------------+ SELECT col_0, window_start, window_end FROM out_num_cnt_basic; @@ -48,28 +48,28 @@ SELECT col_0, window_start, window_end FROM out_num_cnt_basic; | 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +-------+---------------------+---------------------+ -select flush_flow('test_numbers_basic')<=1; +admin flush_flow('test_numbers_basic'); -+----------------------------------------------------+ -| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) | -+----------------------------------------------------+ -| true | -+----------------------------------------------------+ ++----------------------------------------+ +| ADMIN flush_flow('test_numbers_basic') | ++----------------------------------------+ +| 0 | ++----------------------------------------+ -INSERT INTO numbers_input_basic +INSERT INTO numbers_input_basic VALUES (23,"2021-07-01 00:00:01.000"), (24,"2021-07-01 00:00:01.500"); Affected Rows: 2 -select flush_flow('test_numbers_basic')<=1; +admin flush_flow('test_numbers_basic'); -+----------------------------------------------------+ -| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) | -+----------------------------------------------------+ -| true | -+----------------------------------------------------+ ++----------------------------------------+ +| ADMIN flush_flow('test_numbers_basic') | ++----------------------------------------+ +| 1 | ++----------------------------------------+ SELECT col_0, window_start, window_end FROM out_num_cnt_basic; @@ -158,19 +158,19 @@ SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second Affected Rows: 0 -INSERT INTO bytes_log VALUES +INSERT INTO bytes_log VALUES (101, '2025-01-01 00:00:01'), (300, '2025-01-01 00:00:29'); Affected Rows: 2 -SELECT flush_flow('find_approx_rate')<=1; +admin flush_flow('find_approx_rate'); -+--------------------------------------------------+ -| flush_flow(Utf8("find_approx_rate")) <= Int64(1) | -+--------------------------------------------------+ -| true | -+--------------------------------------------------+ ++--------------------------------------+ +| ADMIN flush_flow('find_approx_rate') | ++--------------------------------------+ +| 1 | ++--------------------------------------+ SELECT rate, time_window FROM approx_rate; @@ -180,19 +180,19 @@ SELECT rate, time_window FROM approx_rate; | 6.633333 | 2025-01-01T00:00:00 | +----------+---------------------+ -INSERT INTO bytes_log VALUES +INSERT INTO bytes_log VALUES (450, '2025-01-01 00:00:32'), (500, '2025-01-01 00:00:37'); Affected Rows: 2 -SELECT flush_flow('find_approx_rate')<=1; +admin flush_flow('find_approx_rate'); -+--------------------------------------------------+ -| flush_flow(Utf8("find_approx_rate")) <= Int64(1) | -+--------------------------------------------------+ -| true | -+--------------------------------------------------+ ++--------------------------------------+ +| ADMIN flush_flow('find_approx_rate') | ++--------------------------------------+ +| 1 | ++--------------------------------------+ SELECT rate, time_window FROM approx_rate; diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 1f282cca2e03..e60764463e68 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -5,33 +5,33 @@ CREATE TABLE numbers_input_basic ( TIME INDEX(ts) ); -CREATE FLOW test_numbers_basic +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic -AS +AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -select flush_flow('test_numbers_basic')<=1; +admin flush_flow('test_numbers_basic'); -- SQLNESS ARG restart=true -INSERT INTO numbers_input_basic +INSERT INTO numbers_input_basic VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -select flush_flow('test_numbers_basic')<=1; +admin flush_flow('test_numbers_basic'); SELECT col_0, window_start, window_end FROM out_num_cnt_basic; -select flush_flow('test_numbers_basic')<=1; +admin flush_flow('test_numbers_basic'); -INSERT INTO numbers_input_basic +INSERT INTO numbers_input_basic VALUES (23,"2021-07-01 00:00:01.000"), (24,"2021-07-01 00:00:01.500"); -select flush_flow('test_numbers_basic')<=1; +admin flush_flow('test_numbers_basic'); SELECT col_0, window_start, window_end FROM out_num_cnt_basic; @@ -79,22 +79,22 @@ SINK TO approx_rate AS SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window; -INSERT INTO bytes_log VALUES +INSERT INTO bytes_log VALUES (101, '2025-01-01 00:00:01'), (300, '2025-01-01 00:00:29'); -SELECT flush_flow('find_approx_rate')<=1; +admin flush_flow('find_approx_rate'); SELECT rate, time_window FROM approx_rate; -INSERT INTO bytes_log VALUES +INSERT INTO bytes_log VALUES (450, '2025-01-01 00:00:32'), (500, '2025-01-01 00:00:37'); -SELECT flush_flow('find_approx_rate')<=1; +admin flush_flow('find_approx_rate'); SELECT rate, time_window FROM approx_rate; DROP TABLE bytes_log; DROP FLOW find_approx_rate; -DROP TABLE approx_rate; \ No newline at end of file +DROP TABLE approx_rate; diff --git a/tests/cases/standalone/common/flow/flow_call_df_func.result b/tests/cases/standalone/common/flow/flow_call_df_func.result index bb4357fa611e..00f659550fb2 100644 --- a/tests/cases/standalone/common/flow/flow_call_df_func.result +++ b/tests/cases/standalone/common/flow/flow_call_df_func.result @@ -8,22 +8,22 @@ CREATE TABLE numbers_input_df_func ( Affected Rows: 0 -- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function -CREATE FLOW test_numbers_df_func +CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func -AS +AS SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); Affected Rows: 0 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 0 | ++------------------------------------------+ -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (-20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); @@ -31,13 +31,13 @@ VALUES Affected Rows: 2 -- flush flow to make sure that table is created and data is inserted -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 1 | ++------------------------------------------+ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; @@ -47,28 +47,28 @@ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; | 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +-------+---------------------+---------------------+ -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 0 | ++------------------------------------------+ -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (-24,"2021-07-01 00:00:01.500"); Affected Rows: 2 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 1 | ++------------------------------------------+ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; @@ -101,22 +101,22 @@ CREATE TABLE numbers_input_df_func ( Affected Rows: 0 -- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working -CREATE FLOW test_numbers_df_func +CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func -AS +AS SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); Affected Rows: 0 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 0 | ++------------------------------------------+ -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (-20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); @@ -124,13 +124,13 @@ VALUES Affected Rows: 2 -- flush flow to make sure that table is created and data is inserted -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 1 | ++------------------------------------------+ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; @@ -140,28 +140,28 @@ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; | 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +-------+---------------------+---------------------+ -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 0 | ++------------------------------------------+ -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (-24,"2021-07-01 00:00:01.500"); Affected Rows: 2 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 1 | ++------------------------------------------+ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; @@ -194,35 +194,35 @@ CREATE TABLE numbers_input_df_func ( Affected Rows: 0 -CREATE FLOW test_numbers_df_func +CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func -AS +AS SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond); Affected Rows: 0 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 0 | ++------------------------------------------+ -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); Affected Rows: 2 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 1 | ++------------------------------------------+ SELECT col_0, col_1 FROM out_num_cnt_df_func; @@ -232,28 +232,28 @@ SELECT col_0, col_1 FROM out_num_cnt_df_func; | 2 | 2021-07-01T00:00:00 | +-------+---------------------+ -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 0 | ++------------------------------------------+ -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (24,"2021-07-01 00:00:01.500"); Affected Rows: 2 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 1 | ++------------------------------------------+ SELECT col_0, col_1 FROM out_num_cnt_df_func; @@ -286,35 +286,35 @@ CREATE TABLE numbers_input_df_func ( Affected Rows: 0 -CREATE FLOW test_numbers_df_func +CREATE FLOW test_numbers_df_func SINK TO out_num_cnt -AS +AS SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts); Affected Rows: 0 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 0 | ++------------------------------------------+ -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); Affected Rows: 2 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 1 | ++------------------------------------------+ SELECT col_0, col_1 FROM out_num_cnt; @@ -324,28 +324,28 @@ SELECT col_0, col_1 FROM out_num_cnt; | 2021-07-01T00:00:00 | 42 | +---------------------+-------+ -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 0 | ++------------------------------------------+ -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (24,"2021-07-01 00:00:01.500"); Affected Rows: 2 -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -+------------------------------------------------------+ -| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) | -+------------------------------------------------------+ -| true | -+------------------------------------------------------+ ++------------------------------------------+ +| ADMIN flush_flow('test_numbers_df_func') | ++------------------------------------------+ +| 1 | ++------------------------------------------+ SELECT col_0, col_1 FROM out_num_cnt; diff --git a/tests/cases/standalone/common/flow/flow_call_df_func.sql b/tests/cases/standalone/common/flow/flow_call_df_func.sql index 45c316ecee9a..faa9ee1aabc2 100644 --- a/tests/cases/standalone/common/flow/flow_call_df_func.sql +++ b/tests/cases/standalone/common/flow/flow_call_df_func.sql @@ -6,31 +6,31 @@ CREATE TABLE numbers_input_df_func ( ); -- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function -CREATE FLOW test_numbers_df_func +CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func -AS +AS SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (-20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -- flush flow to make sure that table is created and data is inserted -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (-24,"2021-07-01 00:00:01.500"); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; @@ -46,31 +46,31 @@ CREATE TABLE numbers_input_df_func ( ); -- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working -CREATE FLOW test_numbers_df_func +CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func -AS +AS SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (-20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -- flush flow to make sure that table is created and data is inserted -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (-24,"2021-07-01 00:00:01.500"); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; @@ -86,30 +86,30 @@ CREATE TABLE numbers_input_df_func ( TIME INDEX(ts) ); -CREATE FLOW test_numbers_df_func +CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func -AS +AS SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); SELECT col_0, col_1 FROM out_num_cnt_df_func; -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (24,"2021-07-01 00:00:01.500"); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); SELECT col_0, col_1 FROM out_num_cnt_df_func; @@ -126,30 +126,30 @@ CREATE TABLE numbers_input_df_func ( TIME INDEX(ts) ); -CREATE FLOW test_numbers_df_func +CREATE FLOW test_numbers_df_func SINK TO out_num_cnt -AS +AS SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); SELECT col_0, col_1 FROM out_num_cnt; -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); -INSERT INTO numbers_input_df_func +INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (24,"2021-07-01 00:00:01.500"); -select flush_flow('test_numbers_df_func')<=1; +admin flush_flow('test_numbers_df_func'); SELECT col_0, col_1 FROM out_num_cnt; diff --git a/tests/cases/standalone/common/function/admin/flush_compact_table.result b/tests/cases/standalone/common/function/admin/flush_compact_table.result index 01bad5f3b1cc..950461f98e6c 100644 --- a/tests/cases/standalone/common/function/admin/flush_compact_table.result +++ b/tests/cases/standalone/common/function/admin/flush_compact_table.result @@ -19,18 +19,18 @@ SELECT * FROM test; | 1970-01-01T00:00:00.005 | +-------------------------+ -SELECT FLUSH_TABLE('test'); +ADMIN FLUSH_TABLE('test'); +---------------------------+ -| flush_table(Utf8("test")) | +| ADMIN FLUSH_TABLE('test') | +---------------------------+ | 0 | +---------------------------+ -SELECT COMPACT_TABLE('test'); +ADMIN COMPACT_TABLE('test'); +-----------------------------+ -| compact_table(Utf8("test")) | +| ADMIN COMPACT_TABLE('test') | +-----------------------------+ | 0 | +-----------------------------+ diff --git a/tests/cases/standalone/common/function/admin/flush_compact_table.sql b/tests/cases/standalone/common/function/admin/flush_compact_table.sql index 42e1ee5f3e6d..8c52862cdb55 100644 --- a/tests/cases/standalone/common/function/admin/flush_compact_table.sql +++ b/tests/cases/standalone/common/function/admin/flush_compact_table.sql @@ -6,9 +6,9 @@ INSERT INTO test VALUES (1), (2), (3), (4), (5); SELECT * FROM test; -SELECT FLUSH_TABLE('test'); +ADMIN FLUSH_TABLE('test'); -SELECT COMPACT_TABLE('test'); +ADMIN COMPACT_TABLE('test'); --- doesn't change anything --- SELECT * FROM test;