From e2816840317b3a5145c1778f7cb87f78d0b80467 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 14 Sep 2023 01:51:47 +0800 Subject: [PATCH 1/6] support `pg_sleep` and `pg_sleep_for` Signed-off-by: Runji Wang --- proto/expr.proto | 4 ++ src/expr/macro/src/gen.rs | 6 ++- src/expr/macro/src/lib.rs | 5 ++- src/expr/macro/src/parse.rs | 8 ++-- src/expr/macro/src/types.rs | 4 ++ src/expr/src/vector_op/delay.rs | 52 ++++++++++++++++++++++++ src/expr/src/vector_op/mod.rs | 1 + src/frontend/src/binder/expr/function.rs | 6 ++- src/frontend/src/expr/pure.rs | 6 ++- 9 files changed, 84 insertions(+), 8 deletions(-) create mode 100644 src/expr/src/vector_op/delay.rs diff --git a/proto/expr.proto b/proto/expr.proto index c4779deafacb..6bf17358052e 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -220,6 +220,10 @@ message ExprNode { VNODE = 1101; // Non-deterministic functions PROCTIME = 2023; + // Async functions + PG_SLEEP = 3001; + PG_SLEEP_FOR = 3002; + PG_SLEEP_UNTIL = 3003; } Type function_type = 1; data.DataType return_type = 3; diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 9dcde3c6ffdc..d1fb0e727b6e 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -235,11 +235,13 @@ impl FunctionAttr { true => quote! { &mut writer, }, false => quote! {}, }; + let await_ = user_fn.async_.then(|| quote! { .await }); // call the user defined function // inputs: [ Option ] - let mut output = - quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) }; + let mut output = quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) #await_ }; output = match user_fn.return_type_kind { + // void functions should return null + _ if self.ret == "void" => quote! { { #output; Option::::None } }, ReturnTypeKind::T => quote! { Some(#output) }, ReturnTypeKind::Option => output, ReturnTypeKind::Result => quote! { Some(#output?) }, diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 0f490b6f1880..8ac2949c77ad 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -454,6 +454,8 @@ struct FunctionAttr { struct UserFunctionAttr { /// Function name name: String, + /// Whether the function is async. + async_: bool, /// Whether contains argument `&Context`. context: bool, /// The last argument type is `&mut dyn Write`. @@ -495,7 +497,8 @@ impl FunctionAttr { impl UserFunctionAttr { /// Returns true if the function is like `fn(T1, T2, .., Tn) -> T`. fn is_pure(&self) -> bool { - !self.write + !self.async_ + && !self.write && !self.context && !self.arg_option && self.return_type_kind == ReturnTypeKind::T diff --git a/src/expr/macro/src/parse.rs b/src/expr/macro/src/parse.rs index e45ec20d174f..ef5963db1153 100644 --- a/src/expr/macro/src/parse.rs +++ b/src/expr/macro/src/parse.rs @@ -28,9 +28,10 @@ impl Parse for FunctionAttr { let sig = input.parse::()?; let sig_str = sig.value(); - let (name_args, ret) = sig_str - .split_once("->") - .ok_or_else(|| Error::new_spanned(&sig, "expected '->'"))?; + let (name_args, ret) = match sig_str.split_once("->") { + Some((name_args, ret)) => (name_args, ret), + None => (sig_str.as_str(), "void"), + }; let (name, args) = name_args .split_once('(') .ok_or_else(|| Error::new_spanned(&sig, "expected '('"))?; @@ -106,6 +107,7 @@ impl Parse for UserFunctionAttr { }; Ok(UserFunctionAttr { name: sig.ident.to_string(), + async_: sig.asyncness.is_some(), write: sig.inputs.iter().any(arg_is_write), context: sig.inputs.iter().any(arg_is_context), retract: last_arg_is_retract(sig), diff --git a/src/expr/macro/src/types.rs b/src/expr/macro/src/types.rs index b9e4e9b6ccef..53f224b79a77 100644 --- a/src/expr/macro/src/types.rs +++ b/src/expr/macro/src/types.rs @@ -81,6 +81,10 @@ fn lookup_matrix(mut ty: &str, idx: usize) -> &str { ty = "list"; } else if ty.starts_with("struct") { ty = "struct"; + } else if ty == "void" { + // XXX: we don't support void type yet. + // replace it with int32 for now. + ty = "int32"; } let s = TYPE_MATRIX.trim().lines().find_map(|line| { let mut parts = line.split_whitespace(); diff --git a/src/expr/src/vector_op/delay.rs b/src/expr/src/vector_op/delay.rs new file mode 100644 index 000000000000..0ba2f11007c0 --- /dev/null +++ b/src/expr/src/vector_op/delay.rs @@ -0,0 +1,52 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use risingwave_common::types::{Interval, F64}; +use risingwave_expr_macro::function; + +/// Makes the current session's process sleep until the given number of seconds have elapsed. +/// +/// ```slt +/// query I +/// SELECT pg_sleep(1.5); +/// ---- +/// NULL +/// ``` +#[function("pg_sleep(float64)")] +async fn pg_sleep(second: F64) { + tokio::time::sleep(Duration::from_secs_f64(second.0)).await; +} + +/// Makes the current session's process sleep until the given interval has elapsed. +/// +/// ```slt +/// query I +/// SELECT pg_sleep_for('1 second'); +/// ---- +/// NULL +/// ``` +#[function("pg_sleep_for(interval)")] +async fn pg_sleep_for(interval: Interval) { + // we only use the microsecond part of the interval + let usecs = if interval.is_positive() { + interval.usecs() as u64 + } else { + // return if the interval is not positive + return; + }; + let duration = Duration::from_micros(usecs); + tokio::time::sleep(duration).await; +} diff --git a/src/expr/src/vector_op/mod.rs b/src/expr/src/vector_op/mod.rs index aa0f99a505d6..5e89ea29c804 100644 --- a/src/expr/src/vector_op/mod.rs +++ b/src/expr/src/vector_op/mod.rs @@ -47,6 +47,7 @@ pub mod overlay; pub mod position; pub mod proctime; pub mod regexp; +pub mod delay; pub mod repeat; pub mod replace; pub mod round; diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 6bfa4883c6a5..9cb17c5e3925 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1142,7 +1142,11 @@ impl Binder { // non-deterministic ("now", now()), ("current_timestamp", now()), - ("proctime", proctime()) + ("proctime", proctime()), + ("pg_sleep", raw_call(ExprType::PgSleep)), + ("pg_sleep_for", raw_call(ExprType::PgSleepFor)), + // TODO: implement pg_sleep_until + // ("pg_sleep_until", raw_call(ExprType::PgSleepUntil)), ] .into_iter() .collect() diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index a229fed79b4f..3c48aa656182 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -207,7 +207,11 @@ impl ExprVisitor for ImpureAnalyzer { x } // expression output is not deterministic - expr_node::Type::Vnode | expr_node::Type::Proctime => true, + expr_node::Type::Vnode + | expr_node::Type::Proctime + | expr_node::Type::PgSleep + | expr_node::Type::PgSleepFor + | expr_node::Type::PgSleepUntil => true, } } } From 81d3fdb52789fe98351dafab3806e2fef7eb9f14 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 14 Sep 2023 01:56:13 +0800 Subject: [PATCH 2/6] add `volatile` mark to `#[function]` Signed-off-by: Runji Wang --- src/expr/macro/src/lib.rs | 1 + src/expr/macro/src/parse.rs | 2 ++ src/expr/src/vector_op/delay.rs | 4 ++-- src/expr/src/vector_op/proctime.rs | 2 +- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 8ac2949c77ad..55be113d2bc9 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -446,6 +446,7 @@ struct FunctionAttr { init_state: Option, prebuild: Option, type_infer: Option, + volatile: bool, deprecated: bool, } diff --git a/src/expr/macro/src/parse.rs b/src/expr/macro/src/parse.rs index ef5963db1153..1a4993ec9419 100644 --- a/src/expr/macro/src/parse.rs +++ b/src/expr/macro/src/parse.rs @@ -75,6 +75,8 @@ impl Parse for FunctionAttr { parsed.prebuild = Some(get_value()?); } else if meta.path().is_ident("type_infer") { parsed.type_infer = Some(get_value()?); + } else if meta.path().is_ident("volatile") { + parsed.volatile = true; } else if meta.path().is_ident("deprecated") { parsed.deprecated = true; } else { diff --git a/src/expr/src/vector_op/delay.rs b/src/expr/src/vector_op/delay.rs index 0ba2f11007c0..b1661e56e75b 100644 --- a/src/expr/src/vector_op/delay.rs +++ b/src/expr/src/vector_op/delay.rs @@ -25,7 +25,7 @@ use risingwave_expr_macro::function; /// ---- /// NULL /// ``` -#[function("pg_sleep(float64)")] +#[function("pg_sleep(float64)", volatile)] async fn pg_sleep(second: F64) { tokio::time::sleep(Duration::from_secs_f64(second.0)).await; } @@ -38,7 +38,7 @@ async fn pg_sleep(second: F64) { /// ---- /// NULL /// ``` -#[function("pg_sleep_for(interval)")] +#[function("pg_sleep_for(interval)", volatile)] async fn pg_sleep_for(interval: Interval) { // we only use the microsecond part of the interval let usecs = if interval.is_positive() { diff --git a/src/expr/src/vector_op/proctime.rs b/src/expr/src/vector_op/proctime.rs index f0ad130cd6d3..12b0aa6e5ef5 100644 --- a/src/expr/src/vector_op/proctime.rs +++ b/src/expr/src/vector_op/proctime.rs @@ -19,7 +19,7 @@ use risingwave_expr_macro::function; use crate::{ExprError, Result}; /// Get the processing time in Timestamptz scalar from the task-local epoch. -#[function("proctime() -> timestamptz")] +#[function("proctime() -> timestamptz", volatile)] fn proctime() -> Result { let epoch = epoch::task_local::curr_epoch().ok_or(ExprError::Context)?; Ok(epoch.as_timestamptz()) From f6ecf21b29337f0428e2e57b7850f505567efeb8 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 14 Sep 2023 10:53:55 +0800 Subject: [PATCH 3/6] change function numbers Signed-off-by: Runji Wang --- proto/expr.proto | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/proto/expr.proto b/proto/expr.proto index 6bf17358052e..e81036f92190 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -220,10 +220,9 @@ message ExprNode { VNODE = 1101; // Non-deterministic functions PROCTIME = 2023; - // Async functions - PG_SLEEP = 3001; - PG_SLEEP_FOR = 3002; - PG_SLEEP_UNTIL = 3003; + PG_SLEEP = 2024; + PG_SLEEP_FOR = 2025; + PG_SLEEP_UNTIL = 2026; } Type function_type = 1; data.DataType return_type = 3; From 9f6b61ca1aea6cb13fac1f49d6f4e50114973879 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 14 Sep 2023 11:02:53 +0800 Subject: [PATCH 4/6] update comment Signed-off-by: Runji Wang --- src/expr/macro/src/gen.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index d1fb0e727b6e..e280f9dca2b7 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -240,7 +240,7 @@ impl FunctionAttr { // inputs: [ Option ] let mut output = quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) #await_ }; output = match user_fn.return_type_kind { - // void functions should return null + // XXX: we don't support void type yet. return null::int for now. _ if self.ret == "void" => quote! { { #output; Option::::None } }, ReturnTypeKind::T => quote! { Some(#output) }, ReturnTypeKind::Option => output, From 315765965c74ecccf08b407f57ac0243b8ec6401 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 14 Sep 2023 11:35:32 +0800 Subject: [PATCH 5/6] update doc of `#[function]` macro Signed-off-by: Runji Wang --- src/expr/macro/src/lib.rs | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 55be113d2bc9..9922531d56cd 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -30,18 +30,19 @@ mod utils; /// /// # Table of Contents /// -/// - [Function Signature](#function-signature) +/// - [SQL Function Signature](#sql-function-signature) /// - [Multiple Function Definitions](#multiple-function-definitions) /// - [Type Expansion](#type-expansion) /// - [Automatic Type Inference](#automatic-type-inference) /// - [Custom Type Inference Function](#custom-type-inference-function) -/// - [Rust Function Requirements](#rust-function-requirements) +/// - [Rust Function Signature](#rust-function-signature) /// - [Nullable Arguments](#nullable-arguments) /// - [Return Value](#return-value) /// - [Optimization](#optimization) /// - [Functions Returning Strings](#functions-returning-strings) /// - [Preprocessing Constant Arguments](#preprocessing-constant-arguments) /// - [Context](#context) +/// - [Async Function](#async-function) /// - [Table Function](#table-function) /// - [Registration and Invocation](#registration-and-invocation) /// - [Appendix: Type Matrix](#appendix-type-matrix) @@ -55,13 +56,13 @@ mod utils; /// } /// ``` /// -/// # Function Signature +/// # SQL Function Signature /// /// Each function must have a signature, specified in the `function("...")` part of the macro /// invocation. The signature follows this pattern: /// /// ```text -/// name([arg_types],*) -> [setof] return_type +/// name ( [arg_types],* ) [ -> [setof] return_type ] /// ``` /// /// Where `name` is the function name, which must match the function name defined in `prost`. @@ -72,6 +73,9 @@ mod utils; /// When `setof` appears before the return type, this indicates that the function is a set-returning /// function (table function), meaning it can return multiple values instead of just one. For more /// details, see the section on table functions. +/// +/// If no return type is specified, the function returns `void`. However, the void type is not +/// supported in our type system, so it now returns a null value of type int. /// /// ## Multiple Function Definitions /// @@ -154,7 +158,7 @@ mod utils; /// /// This type inference function will be invoked at the frontend. /// -/// # Rust Function Requirements +/// # Rust Function Signature /// /// The `#[function]` macro can handle various types of Rust functions. /// @@ -277,6 +281,19 @@ mod utils; /// } /// ``` /// +/// ## Async Function +/// +/// Functions can be asynchronous. +/// +/// ```ignore +/// #[function("pg_sleep(float64)")] +/// async fn pg_sleep(second: F64) { +/// tokio::time::sleep(Duration::from_secs_f64(second.0)).await; +/// } +/// ``` +/// +/// Asynchronous functions will be evaluated on rows sequentially. +/// /// # Table Function /// /// A table function is a special kind of function that can return multiple values instead of just From 1acb29931d5deed9b5239f276409918b8ee8928d Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 14 Sep 2023 11:37:25 +0800 Subject: [PATCH 6/6] fix check Signed-off-by: Runji Wang --- src/expr/macro/src/lib.rs | 6 +++--- src/expr/src/vector_op/mod.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 9922531d56cd..79371c0679a7 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -73,8 +73,8 @@ mod utils; /// When `setof` appears before the return type, this indicates that the function is a set-returning /// function (table function), meaning it can return multiple values instead of just one. For more /// details, see the section on table functions. -/// -/// If no return type is specified, the function returns `void`. However, the void type is not +/// +/// If no return type is specified, the function returns `void`. However, the void type is not /// supported in our type system, so it now returns a null value of type int. /// /// ## Multiple Function Definitions @@ -284,7 +284,7 @@ mod utils; /// ## Async Function /// /// Functions can be asynchronous. -/// +/// /// ```ignore /// #[function("pg_sleep(float64)")] /// async fn pg_sleep(second: F64) { diff --git a/src/expr/src/vector_op/mod.rs b/src/expr/src/vector_op/mod.rs index 5e89ea29c804..acad4f8e7168 100644 --- a/src/expr/src/vector_op/mod.rs +++ b/src/expr/src/vector_op/mod.rs @@ -32,6 +32,7 @@ pub mod cmp; pub mod concat_op; pub mod conjunction; pub mod date_trunc; +pub mod delay; pub mod encdec; pub mod exp; pub mod extract; @@ -47,7 +48,6 @@ pub mod overlay; pub mod position; pub mod proctime; pub mod regexp; -pub mod delay; pub mod repeat; pub mod replace; pub mod round;