Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(expr): add pg_sleep function #12294

Merged
merged 7 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ message ExprNode {
VNODE = 1101;
// Non-deterministic functions
PROCTIME = 2023;
PG_SLEEP = 2024;
PG_SLEEP_FOR = 2025;
PG_SLEEP_UNTIL = 2026;
}
Type function_type = 1;
data.DataType return_type = 3;
Expand Down
6 changes: 4 additions & 2 deletions src/expr/macro/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,13 @@ impl FunctionAttr {
true => quote! { &mut writer, },
false => quote! {},
};
let await_ = user_fn.async_.then(|| quote! { .await });
// call the user defined function
// inputs: [ Option<impl ScalarRef> ]
let mut output =
quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) };
let mut output = quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) #await_ };
output = match user_fn.return_type_kind {
// XXX: we don't support void type yet. return null::int for now.
_ if self.ret == "void" => quote! { { #output; Option::<i32>::None } },
ReturnTypeKind::T => quote! { Some(#output) },
ReturnTypeKind::Option => output,
ReturnTypeKind::Result => quote! { Some(#output?) },
Expand Down
34 changes: 28 additions & 6 deletions src/expr/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ mod utils;
///
/// # Table of Contents
///
/// - [Function Signature](#function-signature)
/// - [SQL Function Signature](#sql-function-signature)
/// - [Multiple Function Definitions](#multiple-function-definitions)
/// - [Type Expansion](#type-expansion)
/// - [Automatic Type Inference](#automatic-type-inference)
/// - [Custom Type Inference Function](#custom-type-inference-function)
/// - [Rust Function Requirements](#rust-function-requirements)
/// - [Rust Function Signature](#rust-function-signature)
/// - [Nullable Arguments](#nullable-arguments)
/// - [Return Value](#return-value)
/// - [Optimization](#optimization)
/// - [Functions Returning Strings](#functions-returning-strings)
/// - [Preprocessing Constant Arguments](#preprocessing-constant-arguments)
/// - [Context](#context)
/// - [Async Function](#async-function)
/// - [Table Function](#table-function)
/// - [Registration and Invocation](#registration-and-invocation)
/// - [Appendix: Type Matrix](#appendix-type-matrix)
Expand All @@ -55,13 +56,13 @@ mod utils;
/// }
/// ```
///
/// # Function Signature
/// # SQL Function Signature
///
/// Each function must have a signature, specified in the `function("...")` part of the macro
/// invocation. The signature follows this pattern:
///
/// ```text
/// name([arg_types],*) -> [setof] return_type
/// name ( [arg_types],* ) [ -> [setof] return_type ]
/// ```
///
/// Where `name` is the function name, which must match the function name defined in `prost`.
Expand All @@ -73,6 +74,9 @@ mod utils;
/// function (table function), meaning it can return multiple values instead of just one. For more
/// details, see the section on table functions.
///
/// If no return type is specified, the function returns `void`. However, the void type is not
/// supported in our type system, so it now returns a null value of type int.
///
/// ## Multiple Function Definitions
///
/// Multiple `#[function]` macros can be applied to a single generic Rust function to define
Expand Down Expand Up @@ -154,7 +158,7 @@ mod utils;
///
/// This type inference function will be invoked at the frontend.
///
/// # Rust Function Requirements
/// # Rust Function Signature
///
/// The `#[function]` macro can handle various types of Rust functions.
///
Expand Down Expand Up @@ -277,6 +281,19 @@ mod utils;
/// }
/// ```
///
/// ## Async Function
///
/// Functions can be asynchronous.
///
/// ```ignore
/// #[function("pg_sleep(float64)")]
/// async fn pg_sleep(second: F64) {
/// tokio::time::sleep(Duration::from_secs_f64(second.0)).await;
/// }
/// ```
///
/// Asynchronous functions will be evaluated on rows sequentially.
///
/// # Table Function
///
/// A table function is a special kind of function that can return multiple values instead of just
Expand Down Expand Up @@ -460,6 +477,8 @@ struct FunctionAttr {
prebuild: Option<String>,
/// Type inference function.
type_infer: Option<String>,
/// Whether the function is volatile.
volatile: bool,
/// Whether the function is deprecated.
deprecated: bool,
}
Expand All @@ -469,6 +488,8 @@ struct FunctionAttr {
struct UserFunctionAttr {
/// Function name
name: String,
/// Whether the function is async.
async_: bool,
/// Whether contains argument `&Context`.
context: bool,
/// The last argument type is `&mut dyn Write`.
Expand Down Expand Up @@ -556,7 +577,8 @@ impl FunctionAttr {
impl UserFunctionAttr {
/// Returns true if the function is like `fn(T1, T2, .., Tn) -> T`.
fn is_pure(&self) -> bool {
!self.write
!self.async_
&& !self.write
&& !self.context
&& !self.arg_option
&& self.return_type_kind == ReturnTypeKind::T
Expand Down
10 changes: 7 additions & 3 deletions src/expr/macro/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ impl Parse for FunctionAttr {

let sig = input.parse::<LitStr>()?;
let sig_str = sig.value();
let (name_args, ret) = sig_str
.split_once("->")
.ok_or_else(|| Error::new_spanned(&sig, "expected '->'"))?;
let (name_args, ret) = match sig_str.split_once("->") {
Some((name_args, ret)) => (name_args, ret),
None => (sig_str.as_str(), "void"),
};
let (name, args) = name_args
.split_once('(')
.ok_or_else(|| Error::new_spanned(&sig, "expected '('"))?;
Expand Down Expand Up @@ -74,6 +75,8 @@ impl Parse for FunctionAttr {
parsed.prebuild = Some(get_value()?);
} else if meta.path().is_ident("type_infer") {
parsed.type_infer = Some(get_value()?);
} else if meta.path().is_ident("volatile") {
parsed.volatile = true;
} else if meta.path().is_ident("deprecated") {
parsed.deprecated = true;
} else if meta.path().is_ident("append_only") {
Expand Down Expand Up @@ -113,6 +116,7 @@ impl From<&syn::Signature> for UserFunctionAttr {
};
UserFunctionAttr {
name: sig.ident.to_string(),
async_: sig.asyncness.is_some(),
write: sig.inputs.iter().any(arg_is_write),
context: sig.inputs.iter().any(arg_is_context),
retract: last_arg_is_retract(sig),
Expand Down
4 changes: 4 additions & 0 deletions src/expr/macro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ fn lookup_matrix(mut ty: &str, idx: usize) -> &str {
ty = "list";
} else if ty.starts_with("struct") {
ty = "struct";
} else if ty == "void" {
// XXX: we don't support void type yet.
// replace it with int32 for now.
ty = "int32";
}
let s = TYPE_MATRIX.trim().lines().find_map(|line| {
let mut parts = line.split_whitespace();
Expand Down
52 changes: 52 additions & 0 deletions src/expr/src/vector_op/delay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use risingwave_common::types::{Interval, F64};
use risingwave_expr_macro::function;

/// Makes the current session's process sleep until the given number of seconds have elapsed.
///
/// ```slt
/// query I
/// SELECT pg_sleep(1.5);
/// ----
/// NULL
/// ```
#[function("pg_sleep(float64)", volatile)]
async fn pg_sleep(second: F64) {
tokio::time::sleep(Duration::from_secs_f64(second.0)).await;
}

/// Makes the current session's process sleep until the given interval has elapsed.
///
/// ```slt
/// query I
/// SELECT pg_sleep_for('1 second');
/// ----
/// NULL
/// ```
#[function("pg_sleep_for(interval)", volatile)]
async fn pg_sleep_for(interval: Interval) {
// we only use the microsecond part of the interval
let usecs = if interval.is_positive() {
interval.usecs() as u64
} else {
// return if the interval is not positive
return;
};
let duration = Duration::from_micros(usecs);
tokio::time::sleep(duration).await;
}
1 change: 1 addition & 0 deletions src/expr/src/vector_op/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod cmp;
pub mod concat_op;
pub mod conjunction;
pub mod date_trunc;
pub mod delay;
pub mod encdec;
pub mod exp;
pub mod extract;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/vector_op/proctime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_expr_macro::function;
use crate::{ExprError, Result};

/// Get the processing time in Timestamptz scalar from the task-local epoch.
#[function("proctime() -> timestamptz")]
#[function("proctime() -> timestamptz", volatile)]
fn proctime() -> Result<Timestamptz> {
let epoch = epoch::task_local::curr_epoch().ok_or(ExprError::Context)?;
Ok(epoch.as_timestamptz())
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,11 @@ impl Binder {
// non-deterministic
("now", now()),
("current_timestamp", now()),
("proctime", proctime())
("proctime", proctime()),
("pg_sleep", raw_call(ExprType::PgSleep)),
("pg_sleep_for", raw_call(ExprType::PgSleepFor)),
// TODO: implement pg_sleep_until
// ("pg_sleep_until", raw_call(ExprType::PgSleepUntil)),
]
.into_iter()
.collect()
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ impl ExprVisitor<bool> for ImpureAnalyzer {
x
}
// expression output is not deterministic
expr_node::Type::Vnode | expr_node::Type::Proctime => true,
expr_node::Type::Vnode
| expr_node::Type::Proctime
| expr_node::Type::PgSleep
| expr_node::Type::PgSleepFor
| expr_node::Type::PgSleepUntil => true,
}
}
}
Expand Down
Loading