Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): support basic CREATE / DROP FUNCTION #7265

Merged
merged 16 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions e2e_test/ddl/function.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# TODO: check the service on creation

# Create a function.
statement ok
create function func(int, int) returns int as 'http://localhost:8815' language arrow_flight;

# Create a function with the same name but different arguments.
statement ok
create function func(int) returns int as 'http://localhost:8815' language arrow_flight;

# Create a function with the same name and arguments.
statement error exists
create function func(int) returns int as 'http://localhost:8815' language arrow_flight;

# TODO: drop function without arguments

# # Drop a function but ambiguous.
# statement error is not unique
# drop function func;

# Drop a function
statement ok
drop function func(int);

# Drop a function
statement ok
drop function func(int, int);
13 changes: 13 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package catalog;

import "data.proto";
import "expr.proto";
import "plan_common.proto";

Expand Down Expand Up @@ -75,6 +76,18 @@ message Index {
repeated int32 original_columns = 9;
}

message Function {
uint32 id = 1;
uint32 schema_id = 2;
uint32 database_id = 3;
string name = 4;
repeated data.DataType arg_types = 5;
data.DataType return_type = 6;
string language = 7;
string path = 8;
uint32 owner = 9;
}

// See `TableCatalog` struct in frontend crate for more information.
message Table {
enum TableType {
Expand Down
21 changes: 21 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,25 @@ message CreateTableResponse {
uint64 version = 3;
}

message CreateFunctionRequest {
catalog.Function function = 1;
}

message CreateFunctionResponse {
common.Status status = 1;
uint32 function_id = 2;
uint64 version = 3;
}

message DropFunctionRequest {
uint32 function_id = 1;
}

message DropFunctionResponse {
common.Status status = 1;
uint64 version = 2;
}

message DropTableRequest {
oneof source_id {
uint32 id = 1;
Expand Down Expand Up @@ -212,5 +231,7 @@ service DdlService {
rpc DropView(DropViewRequest) returns (DropViewResponse);
rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse);
rpc DropIndex(DropIndexRequest) returns (DropIndexResponse);
rpc CreateFunction(CreateFunctionRequest) returns (CreateFunctionResponse);
rpc DropFunction(DropFunctionRequest) returns (DropFunctionResponse);
rpc ReplaceTablePlan(ReplaceTablePlanRequest) returns (ReplaceTablePlanResponse);
}
11 changes: 11 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,16 @@ message ExprNode {
VNODE = 1101;
// Non-deterministic functions
NOW = 2022;
// User defined functions
UDF = 3000;
}
Type expr_type = 1;
data.DataType return_type = 3;
oneof rex_node {
InputRefExpr input_ref = 4;
data.Datum constant = 5;
FunctionCall func_call = 6;
UserDefinedFunction udf = 7;
}
}

Expand Down Expand Up @@ -213,3 +216,11 @@ message AggCall {
repeated OrderByField order_by_fields = 5;
ExprNode filter = 6;
}

message UserDefinedFunction {
repeated ExprNode children = 1;
string name = 2;
repeated data.DataType arg_types = 3;
string language = 4;
string path = 5;
}
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ message MetaSnapshot {
repeated catalog.Table tables = 5;
repeated catalog.Index indexes = 6;
repeated catalog.View views = 7;
repeated catalog.Function functions = 15;
repeated user.UserInfo users = 8;
repeated common.ParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
Expand Down Expand Up @@ -229,6 +230,7 @@ message SubscribeResponse {
catalog.Sink sink = 8;
catalog.Index index = 9;
catalog.View view = 10;
catalog.Function function = 18;
user.UserInfo user = 11;
common.ParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
Expand Down
1 change: 1 addition & 0 deletions proto/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ message GrantPrivilege {
uint32 source_id = 4;
uint32 sink_id = 5;
uint32 view_id = 6;
uint32 function_id = 8;

uint32 all_tables_schema_id = 11;
uint32 all_sources_schema_id = 12;
Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ where
| Info::Sink(_)
| Info::Index(_)
| Info::View(_)
| Info::Function(_)
| Info::User(_) => notification.version > catalog_version,
Info::ParallelUnitMapping(_) => notification.version > parallel_unit_mapping_version,
Info::Node(_) => notification.version > worker_node_version,
Expand Down
35 changes: 35 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,38 @@ impl From<IndexId> for u32 {
id.index_id
}
}

#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub struct FunctionId(pub u32);

impl FunctionId {
pub const fn new(id: u32) -> Self {
FunctionId(id)
}

pub const fn placeholder() -> Self {
FunctionId(u32::MAX - 1)
}

pub fn function_id(&self) -> u32 {
self.0
}
}

impl From<u32> for FunctionId {
fn from(id: u32) -> Self {
Self::new(id)
}
}

impl From<&u32> for FunctionId {
fn from(id: &u32) -> Self {
Self::new(*id)
}
}

impl From<FunctionId> for u32 {
fn from(id: FunctionId) -> Self {
id.0
}
}
6 changes: 6 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@ impl DataType {
}
}

impl From<DataType> for ProstDataType {
fn from(data_type: DataType) -> Self {
data_type.to_protobuf()
}
}

/// `Scalar` is a trait over all possible owned types in the evaluation
/// framework.
///
Expand Down
72 changes: 72 additions & 0 deletions src/expr/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(warnings)] // unfinished

use std::convert::TryFrom;
use std::sync::Arc;

use risingwave_common::array::{ArrayBuilder, ArrayBuilderImpl, ArrayRef, DataChunk};
use risingwave_common::for_all_variants;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{literal_type_match, DataType, Datum, Scalar, ScalarImpl};
use risingwave_pb::expr::expr_node::{RexNode, Type};
use risingwave_pb::expr::ExprNode;

use super::{build_from_prost, BoxedExpression};
use crate::expr::Expression;
use crate::{bail, ensure, ExprError, Result};

#[derive(Debug)]
pub struct UdfExpression {
children: Vec<BoxedExpression>,
name: String,
arg_types: Vec<DataType>,
return_type: DataType,
// TODO: arrow flight client
}

impl Expression for UdfExpression {
fn return_type(&self) -> DataType {
self.return_type.clone()
}

fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
todo!("evaluate UDF")
}

fn eval_row(&self, _input: &OwnedRow) -> Result<Datum> {
todo!("evaluate UDF")
}
}

impl UdfExpression {}

impl<'a> TryFrom<&'a ExprNode> for UdfExpression {
type Error = ExprError;

fn try_from(prost: &'a ExprNode) -> Result<Self> {
ensure!(prost.get_expr_type().unwrap() == Type::Udf);
let ret_type = DataType::from(prost.get_return_type().unwrap());
let RexNode::Udf(udf) = prost.get_rex_node().unwrap() else {
bail!("expect UDF");
};
Ok(Self {
children: udf.children.iter().map(build_from_prost).try_collect()?,
name: udf.name.clone(),
arg_types: udf.arg_types.iter().map(|t| t.into()).collect(),
return_type: ret_type,
})
}
}
3 changes: 3 additions & 0 deletions src/expr/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod expr_regexp;
mod expr_ternary_bytes;
mod expr_to_char_const_tmpl;
mod expr_to_timestamp_const_tmpl;
mod expr_udf;
pub mod expr_unary;
mod expr_vnode;
mod template;
Expand Down Expand Up @@ -61,6 +62,7 @@ use crate::expr::expr_field::FieldExpression;
use crate::expr::expr_in::InExpression;
use crate::expr::expr_nested_construct::NestedConstructExpression;
use crate::expr::expr_regexp::RegexpMatchExpression;
use crate::expr::expr_udf::UdfExpression;
use crate::expr::expr_vnode::VnodeExpression;
use crate::ExprError;

Expand Down Expand Up @@ -160,6 +162,7 @@ pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
};
LiteralExpression::try_from(bind_timestamp).map(Expression::boxed)
}
Udf => UdfExpression::try_from(prost).map(Expression::boxed),
_ => Err(ExprError::UnsupportedFunction(format!(
"{:?}",
prost.get_expr_type()
Expand Down
19 changes: 18 additions & 1 deletion src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::binder::bind_context::Clause;
use crate::binder::{Binder, BoundQuery, BoundSetExpr};
use crate::expr::{
AggCall, Expr, ExprImpl, ExprType, FunctionCall, Literal, OrderBy, Subquery, SubqueryKind,
TableFunction, TableFunctionType, WindowFunction, WindowFunctionType,
TableFunction, TableFunctionType, UserDefinedFunction, WindowFunction, WindowFunctionType,
};
use crate::utils::Condition;

Expand Down Expand Up @@ -96,6 +96,23 @@ impl Binder {
return Ok(TableFunction::new(function_type, inputs)?.into());
}

// user defined function
// TODO: resolve schema name
if let Some(func) = self
.catalog
.first_valid_schema(
&self.db_name,
&self.search_path,
&self.auth_context.user_name,
)?
.get_function_by_name_args(
&function_name,
&inputs.iter().map(|arg| arg.return_type()).collect_vec(),
)
{
return Ok(UserDefinedFunction::new(func.clone(), inputs).into());
}

// normal function
let mut inputs = inputs;
let function_type = match function_name.as_str() {
Expand Down
23 changes: 19 additions & 4 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ use std::sync::Arc;

use parking_lot::lock_api::ArcRwLockReadGuard;
use parking_lot::{RawRwLock, RwLock};
use risingwave_common::catalog::{CatalogVersion, IndexId, TableId};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_pb::catalog::{
Database as ProstDatabase, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink,
Source as ProstSource, Table as ProstTable, View as ProstView,
Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex,
Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable,
View as ProstView,
};
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_rpc_client::MetaClient;
Expand All @@ -47,7 +48,7 @@ impl CatalogReader {
}
}

/// [`CatalogWriter`] initiate DDL operations (create table/schema/database).
/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function).
/// It will only send rpc to meta and get the catalog version as response.
/// Then it will wait for the local catalog to be synced to the version, which is performed by
/// [observer](`crate::observer::FrontendObserverNode`).
Expand Down Expand Up @@ -88,6 +89,8 @@ pub trait CatalogWriter: Send + Sync {

async fn create_sink(&self, sink: ProstSink, graph: StreamFragmentGraph) -> Result<()>;

async fn create_function(&self, function: ProstFunction) -> Result<()>;

async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()>;

async fn drop_materialized_view(&self, table_id: TableId) -> Result<()>;
Expand All @@ -103,6 +106,8 @@ pub trait CatalogWriter: Send + Sync {
async fn drop_schema(&self, schema_id: u32) -> Result<()>;

async fn drop_index(&self, index_id: IndexId) -> Result<()>;

async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -191,6 +196,11 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn create_function(&self, function: ProstFunction) -> Result<()> {
let (_, version) = self.meta_client.create_function(function).await?;
self.wait_version(version).await
}

async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()> {
let version = self.meta_client.drop_table(source_id, table_id).await?;
self.wait_version(version).await
Expand Down Expand Up @@ -221,6 +231,11 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn drop_function(&self, function_id: FunctionId) -> Result<()> {
let version = self.meta_client.drop_function(function_id).await?;
self.wait_version(version).await
}

async fn drop_schema(&self, schema_id: u32) -> Result<()> {
let version = self.meta_client.drop_schema(schema_id).await?;
self.wait_version(version).await
Expand Down
Loading