diff --git a/proto/expr.proto b/proto/expr.proto index c4779deafacb..e81036f92190 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -220,6 +220,9 @@ message ExprNode { VNODE = 1101; // Non-deterministic functions PROCTIME = 2023; + PG_SLEEP = 2024; + PG_SLEEP_FOR = 2025; + PG_SLEEP_UNTIL = 2026; } Type function_type = 1; data.DataType return_type = 3; diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 6b7d33ac7468..9e8b4d9d3c1b 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -235,11 +235,13 @@ impl FunctionAttr { true => quote! { &mut writer, }, false => quote! {}, }; + let await_ = user_fn.async_.then(|| quote! { .await }); // call the user defined function // inputs: [ Option ] - let mut output = - quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) }; + let mut output = quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) #await_ }; output = match user_fn.return_type_kind { + // XXX: we don't support void type yet. return null::int for now. + _ if self.ret == "void" => quote! { { #output; Option::::None } }, ReturnTypeKind::T => quote! { Some(#output) }, ReturnTypeKind::Option => output, ReturnTypeKind::Result => quote! { Some(#output?) }, diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 82e7c8014220..4d8c48ca9cca 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -30,18 +30,19 @@ mod utils; /// /// # Table of Contents /// -/// - [Function Signature](#function-signature) +/// - [SQL Function Signature](#sql-function-signature) /// - [Multiple Function Definitions](#multiple-function-definitions) /// - [Type Expansion](#type-expansion) /// - [Automatic Type Inference](#automatic-type-inference) /// - [Custom Type Inference Function](#custom-type-inference-function) -/// - [Rust Function Requirements](#rust-function-requirements) +/// - [Rust Function Signature](#rust-function-signature) /// - [Nullable Arguments](#nullable-arguments) /// - [Return Value](#return-value) /// - [Optimization](#optimization) /// - [Functions Returning Strings](#functions-returning-strings) /// - [Preprocessing Constant Arguments](#preprocessing-constant-arguments) /// - [Context](#context) +/// - [Async Function](#async-function) /// - [Table Function](#table-function) /// - [Registration and Invocation](#registration-and-invocation) /// - [Appendix: Type Matrix](#appendix-type-matrix) @@ -55,13 +56,13 @@ mod utils; /// } /// ``` /// -/// # Function Signature +/// # SQL Function Signature /// /// Each function must have a signature, specified in the `function("...")` part of the macro /// invocation. The signature follows this pattern: /// /// ```text -/// name([arg_types],*) -> [setof] return_type +/// name ( [arg_types],* ) [ -> [setof] return_type ] /// ``` /// /// Where `name` is the function name, which must match the function name defined in `prost`. @@ -73,6 +74,9 @@ mod utils; /// function (table function), meaning it can return multiple values instead of just one. For more /// details, see the section on table functions. /// +/// If no return type is specified, the function returns `void`. However, the void type is not +/// supported in our type system, so it now returns a null value of type int. +/// /// ## Multiple Function Definitions /// /// Multiple `#[function]` macros can be applied to a single generic Rust function to define @@ -154,7 +158,7 @@ mod utils; /// /// This type inference function will be invoked at the frontend. /// -/// # Rust Function Requirements +/// # Rust Function Signature /// /// The `#[function]` macro can handle various types of Rust functions. /// @@ -277,6 +281,19 @@ mod utils; /// } /// ``` /// +/// ## Async Function +/// +/// Functions can be asynchronous. +/// +/// ```ignore +/// #[function("pg_sleep(float64)")] +/// async fn pg_sleep(second: F64) { +/// tokio::time::sleep(Duration::from_secs_f64(second.0)).await; +/// } +/// ``` +/// +/// Asynchronous functions will be evaluated on rows sequentially. +/// /// # Table Function /// /// A table function is a special kind of function that can return multiple values instead of just @@ -460,6 +477,8 @@ struct FunctionAttr { prebuild: Option, /// Type inference function. type_infer: Option, + /// Whether the function is volatile. + volatile: bool, /// Whether the function is deprecated. deprecated: bool, } @@ -469,6 +488,8 @@ struct FunctionAttr { struct UserFunctionAttr { /// Function name name: String, + /// Whether the function is async. + async_: bool, /// Whether contains argument `&Context`. context: bool, /// The last argument type is `&mut dyn Write`. @@ -556,7 +577,8 @@ impl FunctionAttr { impl UserFunctionAttr { /// Returns true if the function is like `fn(T1, T2, .., Tn) -> T`. fn is_pure(&self) -> bool { - !self.write + !self.async_ + && !self.write && !self.context && !self.arg_option && self.return_type_kind == ReturnTypeKind::T diff --git a/src/expr/macro/src/parse.rs b/src/expr/macro/src/parse.rs index dfcd08aafd3f..be7a6c86df62 100644 --- a/src/expr/macro/src/parse.rs +++ b/src/expr/macro/src/parse.rs @@ -28,9 +28,10 @@ impl Parse for FunctionAttr { let sig = input.parse::()?; let sig_str = sig.value(); - let (name_args, ret) = sig_str - .split_once("->") - .ok_or_else(|| Error::new_spanned(&sig, "expected '->'"))?; + let (name_args, ret) = match sig_str.split_once("->") { + Some((name_args, ret)) => (name_args, ret), + None => (sig_str.as_str(), "void"), + }; let (name, args) = name_args .split_once('(') .ok_or_else(|| Error::new_spanned(&sig, "expected '('"))?; @@ -74,6 +75,8 @@ impl Parse for FunctionAttr { parsed.prebuild = Some(get_value()?); } else if meta.path().is_ident("type_infer") { parsed.type_infer = Some(get_value()?); + } else if meta.path().is_ident("volatile") { + parsed.volatile = true; } else if meta.path().is_ident("deprecated") { parsed.deprecated = true; } else if meta.path().is_ident("append_only") { @@ -113,6 +116,7 @@ impl From<&syn::Signature> for UserFunctionAttr { }; UserFunctionAttr { name: sig.ident.to_string(), + async_: sig.asyncness.is_some(), write: sig.inputs.iter().any(arg_is_write), context: sig.inputs.iter().any(arg_is_context), retract: last_arg_is_retract(sig), diff --git a/src/expr/macro/src/types.rs b/src/expr/macro/src/types.rs index b9e4e9b6ccef..53f224b79a77 100644 --- a/src/expr/macro/src/types.rs +++ b/src/expr/macro/src/types.rs @@ -81,6 +81,10 @@ fn lookup_matrix(mut ty: &str, idx: usize) -> &str { ty = "list"; } else if ty.starts_with("struct") { ty = "struct"; + } else if ty == "void" { + // XXX: we don't support void type yet. + // replace it with int32 for now. + ty = "int32"; } let s = TYPE_MATRIX.trim().lines().find_map(|line| { let mut parts = line.split_whitespace(); diff --git a/src/expr/src/vector_op/delay.rs b/src/expr/src/vector_op/delay.rs new file mode 100644 index 000000000000..b1661e56e75b --- /dev/null +++ b/src/expr/src/vector_op/delay.rs @@ -0,0 +1,52 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use risingwave_common::types::{Interval, F64}; +use risingwave_expr_macro::function; + +/// Makes the current session's process sleep until the given number of seconds have elapsed. +/// +/// ```slt +/// query I +/// SELECT pg_sleep(1.5); +/// ---- +/// NULL +/// ``` +#[function("pg_sleep(float64)", volatile)] +async fn pg_sleep(second: F64) { + tokio::time::sleep(Duration::from_secs_f64(second.0)).await; +} + +/// Makes the current session's process sleep until the given interval has elapsed. +/// +/// ```slt +/// query I +/// SELECT pg_sleep_for('1 second'); +/// ---- +/// NULL +/// ``` +#[function("pg_sleep_for(interval)", volatile)] +async fn pg_sleep_for(interval: Interval) { + // we only use the microsecond part of the interval + let usecs = if interval.is_positive() { + interval.usecs() as u64 + } else { + // return if the interval is not positive + return; + }; + let duration = Duration::from_micros(usecs); + tokio::time::sleep(duration).await; +} diff --git a/src/expr/src/vector_op/mod.rs b/src/expr/src/vector_op/mod.rs index aa0f99a505d6..acad4f8e7168 100644 --- a/src/expr/src/vector_op/mod.rs +++ b/src/expr/src/vector_op/mod.rs @@ -32,6 +32,7 @@ pub mod cmp; pub mod concat_op; pub mod conjunction; pub mod date_trunc; +pub mod delay; pub mod encdec; pub mod exp; pub mod extract; diff --git a/src/expr/src/vector_op/proctime.rs b/src/expr/src/vector_op/proctime.rs index f0ad130cd6d3..12b0aa6e5ef5 100644 --- a/src/expr/src/vector_op/proctime.rs +++ b/src/expr/src/vector_op/proctime.rs @@ -19,7 +19,7 @@ use risingwave_expr_macro::function; use crate::{ExprError, Result}; /// Get the processing time in Timestamptz scalar from the task-local epoch. -#[function("proctime() -> timestamptz")] +#[function("proctime() -> timestamptz", volatile)] fn proctime() -> Result { let epoch = epoch::task_local::curr_epoch().ok_or(ExprError::Context)?; Ok(epoch.as_timestamptz()) diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 6bfa4883c6a5..9cb17c5e3925 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1142,7 +1142,11 @@ impl Binder { // non-deterministic ("now", now()), ("current_timestamp", now()), - ("proctime", proctime()) + ("proctime", proctime()), + ("pg_sleep", raw_call(ExprType::PgSleep)), + ("pg_sleep_for", raw_call(ExprType::PgSleepFor)), + // TODO: implement pg_sleep_until + // ("pg_sleep_until", raw_call(ExprType::PgSleepUntil)), ] .into_iter() .collect() diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index a229fed79b4f..3c48aa656182 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -207,7 +207,11 @@ impl ExprVisitor for ImpureAnalyzer { x } // expression output is not deterministic - expr_node::Type::Vnode | expr_node::Type::Proctime => true, + expr_node::Type::Vnode + | expr_node::Type::Proctime + | expr_node::Type::PgSleep + | expr_node::Type::PgSleepFor + | expr_node::Type::PgSleepUntil => true, } } }