Skip to content

Commit

Permalink
feat!: impl admin command (GreptimeTeam#4600)
Browse files Browse the repository at this point in the history
* feat: impl admin statement parser

* feat: introduce AsyncFunction and implements it for admin functions

* feat: execute admin functions

* fix: license header

* fix: panic in test

* chore: fixed by code review
  • Loading branch information
killme2008 authored and CookiePieWw committed Sep 17, 2024
1 parent 2ac6d25 commit f15db85
Show file tree
Hide file tree
Showing 39 changed files with 777 additions and 322 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ table.workspace = true
[dev-dependencies]
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
tokio.workspace = true
8 changes: 4 additions & 4 deletions src/common/function/src/flush_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ mod test {
use session::context::QueryContext;

use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};

#[test]
fn test_flush_flow_metadata() {
Expand All @@ -130,8 +130,8 @@ mod test {
);
}

#[test]
fn test_missing_flow_service() {
#[tokio::test]
async fn test_missing_flow_service() {
let f = FlushFlowFunction;

let args = vec!["flow_name"];
Expand All @@ -140,7 +140,7 @@ mod test {
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing FlowServiceHandler, not expected",
result.to_string()
Expand Down
21 changes: 21 additions & 0 deletions src/common/function/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,33 @@ pub trait Function: fmt::Display + Sync + Send {
/// Returns the name of the function, should be unique.
fn name(&self) -> &str;

/// The returned data type of function execution.
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType>;

/// The signature of function.
fn signature(&self) -> Signature;

/// Evaluate the function, e.g. run/execute the function.
fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef>;
}

pub type FunctionRef = Arc<dyn Function>;

/// Async Scalar function trait
#[async_trait::async_trait]
pub trait AsyncFunction: fmt::Display + Sync + Send {
/// Returns the name of the function, should be unique.
fn name(&self) -> &str;

/// The returned data type of function execution.
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType>;

/// The signature of function.
fn signature(&self) -> Signature;

/// Evaluate the function, e.g. run/execute the function.
/// TODO(dennis): simplify the signature and refactor all the admin functions.
async fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef>;
}

pub type AsyncFunctionRef = Arc<dyn AsyncFunction>;
24 changes: 23 additions & 1 deletion src/common/function/src/function_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::{Arc, RwLock};

use once_cell::sync::Lazy;

use crate::function::FunctionRef;
use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
Expand All @@ -32,6 +32,7 @@ use crate::table::TableFunction;
#[derive(Default)]
pub struct FunctionRegistry {
functions: RwLock<HashMap<String, FunctionRef>>,
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
}

Expand All @@ -44,6 +45,27 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func);
}

pub fn register_async(&self, func: AsyncFunctionRef) {
let _ = self
.async_functions
.write()
.unwrap()
.insert(func.name().to_string(), func);
}

pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
self.async_functions.read().unwrap().get(name).cloned()
}

pub fn async_functions(&self) -> Vec<AsyncFunctionRef> {
self.async_functions
.read()
.unwrap()
.values()
.cloned()
.collect()
}

pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
let _ = self
.aggregate_functions
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl SystemFunction {
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register(Arc::new(ProcedureStateFunction));
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);
}
}
14 changes: 7 additions & 7 deletions src/common/function/src/system/procedure_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ mod tests {
use datatypes::vectors::StringVector;

use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};

#[test]
fn test_procedure_state_misc() {
Expand All @@ -114,8 +114,8 @@ mod tests {
));
}

#[test]
fn test_missing_procedure_service() {
#[tokio::test]
async fn test_missing_procedure_service() {
let f = ProcedureStateFunction;

let args = vec!["pid"];
Expand All @@ -125,15 +125,15 @@ mod tests {
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}

#[test]
fn test_procedure_state() {
#[tokio::test]
async fn test_procedure_state() {
let f = ProcedureStateFunction;

let args = vec!["pid"];
Expand All @@ -143,7 +143,7 @@ mod tests {
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::mock(), &args).unwrap();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();

let expect: VectorRef = Arc::new(StringVector::from(vec![
"{\"status\":\"Done\",\"error\":\"OK\"}",
Expand Down
12 changes: 6 additions & 6 deletions src/common/function/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ pub(crate) struct TableFunction;
impl TableFunction {
/// Register all table functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(MigrateRegionFunction));
registry.register(Arc::new(FlushRegionFunction));
registry.register(Arc::new(CompactRegionFunction));
registry.register(Arc::new(FlushTableFunction));
registry.register(Arc::new(CompactTableFunction));
registry.register(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(MigrateRegionFunction));
registry.register_async(Arc::new(FlushRegionFunction));
registry.register_async(Arc::new(CompactRegionFunction));
registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction));
}
}
14 changes: 7 additions & 7 deletions src/common/function/src/table/flush_compact_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ mod tests {
use datatypes::vectors::UInt64Vector;

use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};

macro_rules! define_region_function_test {
($name: ident, $func: ident) => {
Expand All @@ -97,8 +97,8 @@ mod tests {
} if valid_types == ConcreteDataType::numerics()));
}

#[test]
fn [<test_ $name _missing_table_mutation>]() {
#[tokio::test]
async fn [<test_ $name _missing_table_mutation>]() {
let f = $func;

let args = vec![99];
Expand All @@ -108,15 +108,15 @@ mod tests {
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing TableMutationHandler, not expected",
result.to_string()
);
}

#[test]
fn [<test_ $name>]() {
#[tokio::test]
async fn [<test_ $name>]() {
let f = $func;


Expand All @@ -127,7 +127,7 @@ mod tests {
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::mock(), &args).unwrap();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();

let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
assert_eq!(expect, result);
Expand Down
14 changes: 7 additions & 7 deletions src/common/function/src/table/flush_compact_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ mod tests {
use session::context::QueryContext;

use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};

macro_rules! define_table_function_test {
($name: ident, $func: ident) => {
Expand All @@ -230,8 +230,8 @@ mod tests {
} if valid_types == vec![ConcreteDataType::string_datatype()]));
}

#[test]
fn [<test_ $name _missing_table_mutation>]() {
#[tokio::test]
async fn [<test_ $name _missing_table_mutation>]() {
let f = $func;

let args = vec!["test"];
Expand All @@ -241,15 +241,15 @@ mod tests {
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing TableMutationHandler, not expected",
result.to_string()
);
}

#[test]
fn [<test_ $name>]() {
#[tokio::test]
async fn [<test_ $name>]() {
let f = $func;


Expand All @@ -260,7 +260,7 @@ mod tests {
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::mock(), &args).unwrap();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();

let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
assert_eq!(expect, result);
Expand Down
14 changes: 7 additions & 7 deletions src/common/function/src/table/migrate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod tests {
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};

use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};

#[test]
fn test_migrate_region_misc() {
Expand All @@ -140,8 +140,8 @@ mod tests {
} if sigs.len() == 2));
}

#[test]
fn test_missing_procedure_service() {
#[tokio::test]
async fn test_missing_procedure_service() {
let f = MigrateRegionFunction;

let args = vec![1, 1, 1];
Expand All @@ -151,15 +151,15 @@ mod tests {
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}

#[test]
fn test_migrate_region() {
#[tokio::test]
async fn test_migrate_region() {
let f = MigrateRegionFunction;

let args = vec![1, 1, 1];
Expand All @@ -169,7 +169,7 @@ mod tests {
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::mock(), &args).unwrap();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();

let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
Expand Down
Loading

0 comments on commit f15db85

Please sign in to comment.