From bce63efeba1e83666101908b90ffb01de75fb811 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 5 Jan 2023 16:21:53 +0800 Subject: [PATCH 01/15] add empty `create_function` Signed-off-by: Runji Wang --- proto/catalog.proto | 7 +++ proto/ddl_service.proto | 21 +++++++ src/common/src/catalog/mod.rs | 33 ++++++++++ src/frontend/src/catalog/catalog_service.rs | 21 +++++-- src/frontend/src/handler/create_function.rs | 70 +++++++++++++++++++++ src/frontend/src/handler/mod.rs | 17 +++++ src/frontend/src/test_utils.rs | 15 ++++- src/meta/src/rpc/service/ddl_service.rs | 14 +++++ src/rpc_client/src/meta_client.rs | 28 ++++++++- 9 files changed, 216 insertions(+), 10 deletions(-) create mode 100644 src/frontend/src/handler/create_function.rs diff --git a/proto/catalog.proto b/proto/catalog.proto index 8bf885f8205c2..45da5c44c752e 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -75,6 +75,13 @@ message Index { repeated int32 original_columns = 9; } +message Function { + uint32 id = 1; + uint32 schema_id = 2; + uint32 database_id = 3; + string name = 4; +} + // See `TableCatalog` struct in frontend crate for more information. message Table { enum TableType { diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 5cbbe530f6d26..c5fc8bd359861 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -138,6 +138,25 @@ message CreateTableResponse { uint64 version = 3; } +message CreateFunctionRequest { + catalog.Function function = 1; +} + +message CreateFunctionResponse { + common.Status status = 1; + uint32 function_id = 2; + uint64 version = 3; +} + +message DropFunctionRequest { + uint32 function_id = 1; +} + +message DropFunctionResponse { + common.Status status = 1; + uint64 version = 2; +} + message DropTableRequest { oneof source_id { uint32 id = 1; @@ -197,4 +216,6 @@ service DdlService { rpc DropView(DropViewRequest) returns (DropViewResponse); rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse); rpc DropIndex(DropIndexRequest) returns (DropIndexResponse); + rpc CreateFunction(CreateFunctionRequest) returns (CreateFunctionResponse); + rpc DropFunction(DropFunctionRequest) returns (DropFunctionResponse); } diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 3e0d9e760232c..397f729e921b6 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -217,3 +217,36 @@ impl From for u32 { id.index_id } } + +#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)] +pub struct FunctionId { + pub function_id: u32, +} + +impl FunctionId { + pub const fn new(function_id: u32) -> Self { + FunctionId { function_id } + } + + pub fn function_id(&self) -> u32 { + self.function_id + } +} + +impl From for FunctionId { + fn from(id: u32) -> Self { + Self::new(id) + } +} + +impl From<&u32> for FunctionId { + fn from(id: &u32) -> Self { + Self::new(*id) + } +} + +impl From for u32 { + fn from(id: FunctionId) -> Self { + id.function_id + } +} diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 40dd37bde1eb8..1a214425b39c3 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -16,12 +16,13 @@ use std::sync::Arc; use parking_lot::lock_api::ArcRwLockReadGuard; use parking_lot::{RawRwLock, RwLock}; -use risingwave_common::catalog::{CatalogVersion, IndexId, TableId}; +use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::error::ErrorCode::InternalError; use risingwave_common::error::{Result, RwError}; use risingwave_pb::catalog::{ - Database as ProstDatabase, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, - Source as ProstSource, Table as ProstTable, View as ProstView, + Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex, + Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable, + View as ProstView, }; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_rpc_client::MetaClient; @@ -47,7 +48,7 @@ impl CatalogReader { } } -/// [`CatalogWriter`] initiate DDL operations (create table/schema/database). +/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function). /// It will only send rpc to meta and get the catalog version as response. /// Then it will wait for the local catalog to be synced to the version, which is performed by /// [observer](`crate::observer::FrontendObserverNode`). @@ -88,6 +89,8 @@ pub trait CatalogWriter: Send + Sync { async fn create_sink(&self, sink: ProstSink, graph: StreamFragmentGraph) -> Result<()>; + async fn create_function(&self, function: ProstFunction) -> Result<()>; + async fn drop_table(&self, source_id: Option, table_id: TableId) -> Result<()>; async fn drop_materialized_view(&self, table_id: TableId) -> Result<()>; @@ -103,6 +106,8 @@ pub trait CatalogWriter: Send + Sync { async fn drop_schema(&self, schema_id: u32) -> Result<()>; async fn drop_index(&self, index_id: IndexId) -> Result<()>; + + async fn drop_function(&self, function_id: FunctionId) -> Result<()>; } #[derive(Clone)] @@ -191,6 +196,10 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } + async fn create_function(&self, function: ProstFunction) -> Result<()> { + todo!() + } + async fn drop_table(&self, source_id: Option, table_id: TableId) -> Result<()> { let version = self.meta_client.drop_table(source_id, table_id).await?; self.wait_version(version).await @@ -221,6 +230,10 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } + async fn drop_function(&self, function_id: FunctionId) -> Result<()> { + todo!() + } + async fn drop_schema(&self, schema_id: u32) -> Result<()> { let version = self.meta_client.drop_schema(schema_id).await?; self.wait_version(version).await diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs new file mode 100644 index 0000000000000..d1696992e063d --- /dev/null +++ b/src/frontend/src/handler/create_function.rs @@ -0,0 +1,70 @@ +// Copyright 2023 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_sqlparser::ast::{ + CreateFunctionBody, DataType, FunctionDefinition, ObjectName, OperateFunctionArg, +}; + +use super::*; + +pub fn handle_create_function( + handle_args: HandlerArgs, + or_replace: bool, + temporary: bool, + name: ObjectName, + args: Option>, + return_type: Option, + params: CreateFunctionBody, +) -> Result { + if or_replace { + return Err(ErrorCode::NotImplemented( + "CREATE OR REPLACE FUNCTION".to_string(), + None.into(), + ) + .into()); + } + if temporary { + return Err(ErrorCode::NotImplemented( + "CREATE TEMPORARY FUNCTION".to_string(), + None.into(), + ) + .into()); + } + match params.language { + None => { + return Err( + ErrorCode::InvalidParameterValue("LANGUAGE must be specified".to_string()).into(), + ) + } + Some(lang) if lang.real_value() != "arrow_flight" => { + return Err(ErrorCode::InvalidParameterValue( + "LANGUAGE should be one of: arrow_flight".to_string(), + ) + .into()) + } + _ => {} + } + let Some(FunctionDefinition::SingleQuotedDef(flight_server_addr)) = params.as_ else { + return Err(ErrorCode::InvalidParameterValue( + "AS must be specified".to_string(), + ) + .into()); + }; + let Some(return_type) = return_type else { + return Err( + ErrorCode::InvalidParameterValue("return type must be specified".to_string()).into(), + ) + }; + todo!() +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index d44f015781b82..9448a896977ec 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -34,6 +34,7 @@ use crate::utils::WithOptions; pub mod alter_user; mod create_database; +pub mod create_function; pub mod create_index; pub mod create_mv; pub mod create_schema; @@ -164,6 +165,22 @@ pub async fn handle( stmt, } => create_source::handle_create_source(handler_args, is_materialized, stmt).await, Statement::CreateSink { stmt } => create_sink::handle_create_sink(handler_args, stmt).await, + Statement::CreateFunction { + or_replace, + temporary, + name, + args, + return_type, + params, + } => create_function::handle_create_function( + handler_args, + or_replace, + temporary, + name, + args, + return_type, + params, + ), Statement::CreateTable { name, columns, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index c32e966747cb9..31f95d3150eac 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -23,15 +23,16 @@ use pgwire::pg_response::StatementType; use pgwire::pg_server::{BoxedError, Session, SessionId, SessionManager, UserAuthenticator}; use pgwire::types::Row; use risingwave_common::catalog::{ - IndexId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, + FunctionId, IndexId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, }; use risingwave_common::error::Result; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ - Database as ProstDatabase, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, - Source as ProstSource, Table as ProstTable, View as ProstView, + Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex, + Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable, + View as ProstView, }; use risingwave_pb::hummock::HummockSnapshot; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; @@ -273,6 +274,10 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } + async fn create_function(&self, function: ProstFunction) -> Result<()> { + todo!() + } + async fn drop_table(&self, source_id: Option, table_id: TableId) -> Result<()> { if let Some(source_id) = source_id { self.drop_table_or_source_id(source_id); @@ -358,6 +363,10 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } + async fn drop_function(&self, function_id: FunctionId) -> Result<()> { + todo!() + } + async fn drop_database(&self, database_id: u32) -> Result<()> { self.catalog.write().drop_database(database_id); Ok(()) diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index d7f4fc937db84..b98990f07c77c 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -344,6 +344,20 @@ where })) } + async fn create_function( + &self, + request: Request, + ) -> Result, Status> { + todo!() + } + + async fn drop_function( + &self, + request: Request, + ) -> Result, Status> { + todo!() + } + async fn create_table( &self, request: Request, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 0ccbe44c6682e..151f7a139be0a 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -18,7 +18,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::BoxStream; -use risingwave_common::catalog::{CatalogVersion, IndexId, TableId}; +use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; use risingwave_common::util::addr::HostAddr; use risingwave_hummock_sdk::compact::CompactorRuntimeConfig; @@ -30,8 +30,9 @@ use risingwave_hummock_sdk::{ use risingwave_pb::backup_service::backup_service_client::BackupServiceClient; use risingwave_pb::backup_service::*; use risingwave_pb::catalog::{ - Database as ProstDatabase, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, - Source as ProstSource, Table as ProstTable, View as ProstView, + Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex, + Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable, + View as ProstView, }; use risingwave_pb::common::WorkerType; use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient; @@ -230,6 +231,17 @@ impl MetaClient { Ok((resp.sink_id, resp.version)) } + pub async fn create_function( + &self, + function: ProstFunction, + ) -> Result<(FunctionId, CatalogVersion)> { + let request = CreateFunctionRequest { + function: Some(function), + }; + let resp = self.inner.create_function(request).await?; + Ok((resp.function_id.into(), resp.version)) + } + pub async fn create_table( &self, source: Option, @@ -309,6 +321,14 @@ impl MetaClient { Ok(resp.version) } + pub async fn drop_function(&self, function_id: FunctionId) -> Result { + let request = DropFunctionRequest { + function_id: function_id.function_id, + }; + let resp = self.inner.drop_function(request).await?; + Ok(resp.version) + } + pub async fn drop_database(&self, database_id: u32) -> Result { let request = DropDatabaseRequest { database_id }; let resp = self.inner.drop_database(request).await?; @@ -939,6 +959,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse } ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse } ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse } + ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse } ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse } ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse } ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse } @@ -947,6 +968,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse } ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse } ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse } + ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse } ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse } ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse } ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse } From 65618f3f3b9ecffdc155e132e9f9c5e8bdfbd0ed Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 5 Jan 2023 17:05:43 +0800 Subject: [PATCH 02/15] handle create function Signed-off-by: Runji Wang --- proto/catalog.proto | 10 ++++ src/common/src/catalog/mod.rs | 16 +++--- src/common/src/types/mod.rs | 6 +++ src/frontend/src/catalog/catalog_service.rs | 6 ++- src/frontend/src/handler/create_function.rs | 54 ++++++++++++++++----- src/frontend/src/handler/mod.rs | 21 ++++---- src/rpc_client/src/meta_client.rs | 2 +- src/utils/pgwire/src/pg_response.rs | 2 + 8 files changed, 87 insertions(+), 30 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 45da5c44c752e..ea78dd6595c2d 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -4,6 +4,7 @@ package catalog; import "expr.proto"; import "plan_common.proto"; +import "data.proto"; option optimize_for = SPEED; @@ -80,6 +81,15 @@ message Function { uint32 schema_id = 2; uint32 database_id = 3; string name = 4; + repeated FunctionArgument arguments = 5; + data.DataType return_type = 6; + string language = 7; + string path = 8; +} + +message FunctionArgument { + string name = 1; + data.DataType type = 2; } // See `TableCatalog` struct in frontend crate for more information. diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 397f729e921b6..efc50fda697e5 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -219,17 +219,19 @@ impl From for u32 { } #[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)] -pub struct FunctionId { - pub function_id: u32, -} +pub struct FunctionId(pub u32); impl FunctionId { - pub const fn new(function_id: u32) -> Self { - FunctionId { function_id } + pub const fn new(id: u32) -> Self { + FunctionId(id) + } + + pub const fn placeholder() -> Self { + FunctionId(u32::MAX - 1) } pub fn function_id(&self) -> u32 { - self.function_id + self.0 } } @@ -247,6 +249,6 @@ impl From<&u32> for FunctionId { impl From for u32 { fn from(id: FunctionId) -> Self { - id.function_id + id.0 } } diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 5d4dc05ac43d3..cb803796d2166 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -395,6 +395,12 @@ impl DataType { } } +impl From for ProstDataType { + fn from(data_type: DataType) -> Self { + data_type.to_protobuf() + } +} + /// `Scalar` is a trait over all possible owned types in the evaluation /// framework. /// diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 1a214425b39c3..17e273f129fd5 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -197,7 +197,8 @@ impl CatalogWriter for CatalogWriterImpl { } async fn create_function(&self, function: ProstFunction) -> Result<()> { - todo!() + let (_, version) = self.meta_client.create_function(function).await?; + self.wait_version(version).await } async fn drop_table(&self, source_id: Option, table_id: TableId) -> Result<()> { @@ -231,7 +232,8 @@ impl CatalogWriter for CatalogWriterImpl { } async fn drop_function(&self, function_id: FunctionId) -> Result<()> { - todo!() + let version = self.meta_client.drop_function(function_id).await?; + self.wait_version(version).await } async fn drop_schema(&self, schema_id: u32) -> Result<()> { diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index d1696992e063d..399d3b5dcf106 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use pgwire::pg_response::StatementType; +use risingwave_common::catalog::FunctionId; +use risingwave_pb::catalog::{Function, FunctionArgument}; use risingwave_sqlparser::ast::{ CreateFunctionBody, DataType, FunctionDefinition, ObjectName, OperateFunctionArg, }; use super::*; +use crate::{bind_data_type, Binder}; -pub fn handle_create_function( - handle_args: HandlerArgs, +pub async fn handle_create_function( + handler_args: HandlerArgs, or_replace: bool, temporary: bool, name: ObjectName, @@ -41,19 +45,19 @@ pub fn handle_create_function( ) .into()); } - match params.language { + let language = match params.language { + Some(lang) => lang.real_value(), None => { return Err( ErrorCode::InvalidParameterValue("LANGUAGE must be specified".to_string()).into(), ) } - Some(lang) if lang.real_value() != "arrow_flight" => { - return Err(ErrorCode::InvalidParameterValue( - "LANGUAGE should be one of: arrow_flight".to_string(), - ) - .into()) - } - _ => {} + }; + if language != "arrow_flight" { + return Err(ErrorCode::InvalidParameterValue( + "LANGUAGE should be one of: arrow_flight".to_string(), + ) + .into()); } let Some(FunctionDefinition::SingleQuotedDef(flight_server_addr)) = params.as_ else { return Err(ErrorCode::InvalidParameterValue( @@ -66,5 +70,33 @@ pub fn handle_create_function( ErrorCode::InvalidParameterValue("return type must be specified".to_string()).into(), ) }; - todo!() + let mut arguments = vec![]; + for arg in args.unwrap_or_default() { + arguments.push(FunctionArgument { + name: arg.name.map_or(String::new(), |ident| ident.real_value()), + r#type: Some(bind_data_type(&arg.data_type)?.into()), + }); + } + + // resolve database and schema id + let session = &handler_args.session; + let db_name = session.database(); + let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; + + let function = Function { + id: FunctionId::placeholder().0, + schema_id, + database_id, + name: function_name, + arguments, + return_type: Some(bind_data_type(&return_type)?.into()), + language, + path: flight_server_addr, + }; + + let catalog_writer = session.env().catalog_writer(); + catalog_writer.create_function(function).await?; + + Ok(PgResponse::empty_result(StatementType::CREATE_FUNCTION)) } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 9448a896977ec..dd98b7e2497e1 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -172,15 +172,18 @@ pub async fn handle( args, return_type, params, - } => create_function::handle_create_function( - handler_args, - or_replace, - temporary, - name, - args, - return_type, - params, - ), + } => { + create_function::handle_create_function( + handler_args, + or_replace, + temporary, + name, + args, + return_type, + params, + ) + .await + } Statement::CreateTable { name, columns, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 151f7a139be0a..eacddcf65c099 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -323,7 +323,7 @@ impl MetaClient { pub async fn drop_function(&self, function_id: FunctionId) -> Result { let request = DropFunctionRequest { - function_id: function_id.function_id, + function_id: function_id.0, }; let resp = self.inner.drop_function(request).await?; Ok(resp.version) diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index bb028ad4b6cb0..6368020ffdb69 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -44,12 +44,14 @@ pub enum StatementType { CREATE_SCHEMA, CREATE_USER, CREATE_INDEX, + CREATE_FUNCTION, DESCRIBE_TABLE, GRANT_PRIVILEGE, DROP_TABLE, DROP_MATERIALIZED_VIEW, DROP_VIEW, DROP_INDEX, + DROP_FUNCTION, DROP_SOURCE, DROP_SINK, DROP_SCHEMA, From 2e6c7f1c3737c705bfef9388dab4743f05d02f37 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 5 Jan 2023 17:33:30 +0800 Subject: [PATCH 03/15] add function to database catalog Signed-off-by: Runji Wang --- proto/meta.proto | 1 + src/meta/src/manager/catalog/database.rs | 17 +++++++++++-- src/meta/src/manager/catalog/mod.rs | 11 +++++++- src/meta/src/manager/id.rs | 1 + src/meta/src/model/catalog.rs | 5 +++- src/meta/src/rpc/service/ddl_service.rs | 25 +++++++++++++++++-- .../src/rpc/service/notification_service.rs | 14 ++++++++--- 7 files changed, 64 insertions(+), 10 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index e8af12e17b4e4..68a8cfb4fe5cd 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -200,6 +200,7 @@ message MetaSnapshot { repeated catalog.Table tables = 5; repeated catalog.Index indexes = 6; repeated catalog.View views = 7; + repeated catalog.Function functions = 15; repeated user.UserInfo users = 8; repeated common.ParallelUnitMapping parallel_unit_mappings = 9; repeated common.WorkerNode nodes = 10; diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index ad4f2d0af2b74..fff22e10e5efa 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -16,9 +16,9 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use itertools::Itertools; -use risingwave_pb::catalog::{Database, Index, Schema, Sink, Source, Table, View}; +use risingwave_pb::catalog::{Database, Function, Index, Schema, Sink, Source, Table, View}; -use super::{DatabaseId, RelationId, SchemaId, SinkId, SourceId, ViewId}; +use super::{DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, ViewId}; use crate::manager::{IndexId, MetaSrvEnv, TableId}; use crate::model::MetadataModel; use crate::storage::MetaStore; @@ -32,6 +32,7 @@ pub type Catalog = ( Vec, Vec, Vec, + Vec, ); type DatabaseKey = String; @@ -55,6 +56,8 @@ pub struct DatabaseManager { pub(super) tables: BTreeMap, /// Cached view information. pub(super) views: BTreeMap, + /// Cached function information. + pub(super) functions: BTreeMap, /// Relation refer count mapping. // TODO(zehua): avoid key conflicts after distinguishing table's and source's id generator. @@ -78,6 +81,7 @@ impl DatabaseManager { let tables = Table::list(env.meta_store()).await?; let indexes = Index::list(env.meta_store()).await?; let views = View::list(env.meta_store()).await?; + let functions = Function::list(env.meta_store()).await?; let mut relation_ref_count = HashMap::new(); @@ -102,6 +106,7 @@ impl DatabaseManager { } (view.id, view) })); + let functions = BTreeMap::from_iter(functions.into_iter().map(|f| (f.id, f))); Ok(Self { databases, @@ -111,6 +116,7 @@ impl DatabaseManager { views, tables, indexes, + functions, relation_ref_count, in_progress_creation_tracker: HashSet::default(), in_progress_creation_streaming_job: HashSet::default(), @@ -127,6 +133,7 @@ impl DatabaseManager { self.sinks.values().cloned().collect_vec(), self.indexes.values().cloned().collect_vec(), self.views.values().cloned().collect_vec(), + self.functions.values().cloned().collect_vec(), ) } @@ -161,6 +168,12 @@ impl DatabaseManager { && x.name.eq(&relation_key.2) }) { Err(MetaError::catalog_duplicated("view", &relation_key.2)) + } else if self.functions.values().any(|x| { + x.database_id == relation_key.0 + && x.schema_id == relation_key.1 + && x.name.eq(&relation_key.2) + }) { + Err(MetaError::catalog_duplicated("function", &relation_key.2)) } else { Ok(()) } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index f4159e41ea329..2f19ba5c5cf3c 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -32,7 +32,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::{bail, ensure}; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{Database, Index, Schema, Sink, Source, Table, View}; +use risingwave_pb::catalog::{Database, Function, Index, Schema, Sink, Source, Table, View}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object}; use risingwave_pb::user::update_user_request::UpdateField; @@ -53,6 +53,7 @@ pub type SinkId = u32; pub type RelationId = u32; pub type IndexId = u32; pub type ViewId = u32; +pub type FunctionId = u32; pub type UserId = u32; @@ -449,6 +450,14 @@ where } } + pub async fn create_function(&self, function: &Function) -> MetaResult { + todo!("create function") + } + + pub async fn drop_function(&self, function_id: FunctionId) -> MetaResult { + todo!("drop function") + } + pub async fn start_create_stream_job_procedure( &self, stream_job: &StreamingJob, diff --git a/src/meta/src/manager/id.rs b/src/meta/src/manager/id.rs index 549585f9c4268..3993d9714f699 100644 --- a/src/meta/src/manager/id.rs +++ b/src/meta/src/manager/id.rs @@ -140,6 +140,7 @@ pub mod IdCategory { pub const Sink: IdCategoryType = 13; pub const Index: IdCategoryType = 14; pub const CompactionGroup: IdCategoryType = 15; + pub const Function: IdCategoryType = 16; } pub type IdGeneratorManagerRef = Arc>; diff --git a/src/meta/src/model/catalog.rs b/src/meta/src/model/catalog.rs index e3aae4e2bf8a1..f70b25840268c 100644 --- a/src/meta/src/model/catalog.rs +++ b/src/meta/src/model/catalog.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::catalog::{Database, Index, Schema, Sink, Source, Table, View}; +use risingwave_pb::catalog::{Database, Function, Index, Schema, Sink, Source, Table, View}; use crate::model::{MetadataModel, MetadataModelResult}; @@ -24,6 +24,8 @@ const CATALOG_SOURCE_CF_NAME: &str = "cf/catalog_source"; const CATALOG_SINK_CF_NAME: &str = "cf/catalog_sink"; /// Column family name for index catalog. const CATALOG_INDEX_CF_NAME: &str = "cf/catalog_index"; +/// Column family name for function catalog. +const CATALOG_FUNCTION_CF_NAME: &str = "cf/catalog_function"; /// Column family name for table catalog. const CATALOG_TABLE_CF_NAME: &str = "cf/catalog_table"; /// Column family name for schema catalog. @@ -60,6 +62,7 @@ impl_model_for_catalog!(View, CATALOG_VIEW_CF_NAME, u32, get_id); impl_model_for_catalog!(Source, CATALOG_SOURCE_CF_NAME, u32, get_id); impl_model_for_catalog!(Sink, CATALOG_SINK_CF_NAME, u32, get_id); impl_model_for_catalog!(Index, CATALOG_INDEX_CF_NAME, u32, get_id); +impl_model_for_catalog!(Function, CATALOG_FUNCTION_CF_NAME, u32, get_id); impl_model_for_catalog!(Table, CATALOG_TABLE_CF_NAME, u32, get_id); impl_model_for_catalog!(Schema, CATALOG_SCHEMA_CF_NAME, u32, get_id); impl_model_for_catalog!(Database, CATALOG_DATABASE_CF_NAME, u32, get_id); diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index b98990f07c77c..b031a24b20a03 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -348,14 +348,35 @@ where &self, request: Request, ) -> Result, Status> { - todo!() + let req = request.into_inner(); + let id = self.gen_unique_id::<{ IdCategory::Function }>().await?; + let mut function = req.get_function()?.clone(); + function.id = id; + let version = self.catalog_manager.create_function(&function).await?; + + Ok(Response::new(CreateFunctionResponse { + status: None, + function_id: id, + version, + })) } async fn drop_function( &self, request: Request, ) -> Result, Status> { - todo!() + self.check_barrier_manager_status().await?; + let request = request.into_inner(); + + let version = self + .catalog_manager + .drop_function(request.function_id.into()) + .await?; + + Ok(Response::new(DropFunctionResponse { + status: None, + version, + })) } async fn create_table( diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/src/rpc/service/notification_service.rs index 945a1df5c1931..fc80e6212d34f 100644 --- a/src/meta/src/rpc/service/notification_service.rs +++ b/src/meta/src/rpc/service/notification_service.rs @@ -67,12 +67,14 @@ where async fn get_catalog_snapshot(&self) -> (Catalog, Vec, NotificationVersion) { let catalog_guard = self.catalog_manager.get_catalog_core_guard().await; - let (databases, schemas, tables, sources, sinks, indexes, views) = + let (databases, schemas, tables, sources, sinks, indexes, views, functions) = catalog_guard.database.get_catalog(); let users = catalog_guard.user.list_users(); let notification_version = self.env.notification_manager().current_version().await; ( - (databases, schemas, tables, sources, sinks, indexes, views), + ( + databases, schemas, tables, sources, sinks, indexes, views, functions, + ), users, notification_version, ) @@ -116,8 +118,11 @@ where } async fn frontend_subscribe(&self) -> MetaSnapshot { - let ((databases, schemas, tables, sources, sinks, indexes, views), users, catalog_version) = - self.get_catalog_snapshot().await; + let ( + (databases, schemas, tables, sources, sinks, indexes, views, functions), + users, + catalog_version, + ) = self.get_catalog_snapshot().await; let (parallel_unit_mappings, parallel_unit_mapping_version) = self.get_parallel_unit_mapping_snapshot().await; let (nodes, worker_node_version) = self.get_worker_node_snapshot().await; @@ -132,6 +137,7 @@ where tables, indexes, views, + functions, users, parallel_unit_mappings, nodes, From 57915329ecb0eaabd491b23a02831c3fcdebc000 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 5 Jan 2023 18:11:34 +0800 Subject: [PATCH 04/15] create function pass Signed-off-by: Runji Wang --- proto/catalog.proto | 1 + proto/meta.proto | 1 + .../common_service/src/observer_manager.rs | 1 + src/frontend/src/catalog/function_catalog.rs | 75 +++++++++++++++++++ src/frontend/src/catalog/mod.rs | 1 + src/frontend/src/catalog/root_catalog.rs | 28 ++++++- src/frontend/src/catalog/schema_catalog.rs | 31 +++++++- src/frontend/src/handler/create_function.rs | 1 + src/frontend/src/observer/observer_manager.rs | 10 +++ src/meta/src/manager/catalog/mod.rs | 21 +++++- src/meta/src/manager/id.rs | 3 + 11 files changed, 166 insertions(+), 7 deletions(-) create mode 100644 src/frontend/src/catalog/function_catalog.rs diff --git a/proto/catalog.proto b/proto/catalog.proto index ea78dd6595c2d..8502792a5e7b9 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -85,6 +85,7 @@ message Function { data.DataType return_type = 6; string language = 7; string path = 8; + uint32 owner = 9; } message FunctionArgument { diff --git a/proto/meta.proto b/proto/meta.proto index 68a8cfb4fe5cd..4095b3d8e7b1b 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -230,6 +230,7 @@ message SubscribeResponse { catalog.Sink sink = 8; catalog.Index index = 9; catalog.View view = 10; + catalog.Function function = 18; user.UserInfo user = 11; common.ParallelUnitMapping parallel_unit_mapping = 12; common.WorkerNode node = 13; diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index 2e8f3e1c80b27..9a641a141dedc 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -123,6 +123,7 @@ where | Info::Sink(_) | Info::Index(_) | Info::View(_) + | Info::Function(_) | Info::User(_) => notification.version > catalog_version, Info::ParallelUnitMapping(_) => notification.version > parallel_unit_mapping_version, Info::Node(_) => notification.version > worker_node_version, diff --git a/src/frontend/src/catalog/function_catalog.rs b/src/frontend/src/catalog/function_catalog.rs new file mode 100644 index 0000000000000..a0bd796ff573d --- /dev/null +++ b/src/frontend/src/catalog/function_catalog.rs @@ -0,0 +1,75 @@ +// Copyright 2023 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::FunctionId; +use risingwave_common::types::DataType; +use risingwave_pb::catalog::{ + Function as ProstFunction, FunctionArgument as ProstFunctionArgument, +}; + +#[derive(Clone, Debug)] +pub struct FunctionCatalog { + pub id: FunctionId, + pub name: String, + pub owner: u32, + pub arguments: Vec, + pub return_type: DataType, + pub language: String, + pub path: String, +} + +#[derive(Clone, Debug)] +pub struct FunctionArgument { + pub name: Option, + pub data_type: DataType, +} + +impl From<&ProstFunction> for FunctionCatalog { + fn from(prost: &ProstFunction) -> Self { + FunctionCatalog { + id: prost.id.into(), + name: prost.name.clone(), + owner: prost.owner, + arguments: prost.arguments.iter().map(|arg| arg.into()).collect(), + return_type: prost.return_type.as_ref().expect("no return type").into(), + language: prost.language.clone(), + path: prost.path.clone(), + } + } +} + +impl From<&ProstFunctionArgument> for FunctionArgument { + fn from(prost: &ProstFunctionArgument) -> Self { + FunctionArgument { + name: if prost.name.is_empty() { + None + } else { + Some(prost.name.clone()) + }, + data_type: prost.r#type.as_ref().expect("no return type").into(), + } + } +} + +impl FunctionCatalog { + pub fn name(&self) -> &str { + &self.name + } + + /// Returns the SQL statement that can be used to create this view. + pub fn create_sql(&self) -> String { + todo!(); + format!("CREATE FUNCTION {}", self.name) + } +} diff --git a/src/frontend/src/catalog/mod.rs b/src/frontend/src/catalog/mod.rs index 244b84f31876d..654aa8d02c4d8 100644 --- a/src/frontend/src/catalog/mod.rs +++ b/src/frontend/src/catalog/mod.rs @@ -26,6 +26,7 @@ pub(crate) mod catalog_service; pub(crate) mod column_catalog; pub(crate) mod database_catalog; +pub(crate) mod function_catalog; pub(crate) mod index_catalog; pub(crate) mod root_catalog; pub(crate) mod schema_catalog; diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index d8b27a883d875..608d7c2898f2c 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -16,11 +16,12 @@ use std::collections::HashMap; use std::sync::Arc; use itertools::Itertools; -use risingwave_common::catalog::{CatalogVersion, IndexId, TableId}; +use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD}; use risingwave_pb::catalog::{ - Database as ProstDatabase, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, - Source as ProstSource, Table as ProstTable, View as ProstView, + Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex, + Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable, + View as ProstView, }; use super::source_catalog::SourceCatalog; @@ -163,6 +164,14 @@ impl Catalog { .create_view(proto); } + pub fn create_function(&mut self, proto: &ProstFunction) { + self.get_database_mut(proto.database_id) + .unwrap() + .get_schema_mut(proto.schema_id) + .unwrap() + .create_function(proto); + } + pub fn drop_database(&mut self, db_id: DatabaseId) { let name = self.db_name_by_id.remove(&db_id).unwrap(); let _database = self.database_by_name.remove(&name).unwrap(); @@ -222,6 +231,19 @@ impl Catalog { .drop_view(view_id); } + pub fn drop_function( + &mut self, + db_id: DatabaseId, + schema_id: SchemaId, + function_id: FunctionId, + ) { + self.get_database_mut(db_id) + .unwrap() + .get_schema_mut(schema_id) + .unwrap() + .drop_function(function_id); + } + pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> { self.database_by_name .get(db_name) diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index f400b74c979ae..f7083fdbc6e0f 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -16,14 +16,15 @@ use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; use std::sync::Arc; -use risingwave_common::catalog::{valid_table_name, IndexId, TableId}; +use risingwave_common::catalog::{valid_table_name, FunctionId, IndexId, TableId}; use risingwave_pb::catalog::{ - Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, - Table as ProstTable, View as ProstView, + Function as ProstFunction, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, + Source as ProstSource, Table as ProstTable, View as ProstView, }; use super::source_catalog::SourceCatalog; use super::ViewId; +use crate::catalog::function_catalog::FunctionCatalog; use crate::catalog::index_catalog::IndexCatalog; use crate::catalog::sink_catalog::SinkCatalog; use crate::catalog::system_catalog::SystemCatalog; @@ -49,6 +50,9 @@ pub struct SchemaCatalog { indexes_by_table_id: HashMap>>, view_by_name: HashMap>, view_by_id: HashMap>, + // TODO: handle overload functions with the same name + function_by_name: HashMap>, + function_by_id: HashMap>, // This field only available when schema is "pg_catalog". Meanwhile, others will be empty. system_table_by_name: HashMap, @@ -182,6 +186,25 @@ impl SchemaCatalog { self.view_by_name.remove(&view_ref.name).unwrap(); } + pub fn create_function(&mut self, prost: &ProstFunction) { + let name = prost.name.clone(); + let id = prost.id; + let function = FunctionCatalog::from(prost); + let function_ref = Arc::new(function); + + self.function_by_name + .try_insert(name, function_ref.clone()) + .unwrap(); + self.function_by_id + .try_insert(id.into(), function_ref) + .unwrap(); + } + + pub fn drop_function(&mut self, id: FunctionId) { + let function_ref = self.function_by_id.remove(&id).unwrap(); + self.function_by_name.remove(&function_ref.name).unwrap(); + } + pub fn iter_table(&self) -> impl Iterator> { self.table_by_name .iter() @@ -310,6 +333,8 @@ impl From<&ProstSchema> for SchemaCatalog { system_table_by_name: HashMap::new(), view_by_name: HashMap::new(), view_by_id: HashMap::new(), + function_by_name: HashMap::new(), + function_by_id: HashMap::new(), } } } diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index 399d3b5dcf106..e873844d4c065 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -93,6 +93,7 @@ pub async fn handle_create_function( return_type: Some(bind_data_type(&return_type)?.into()), language, path: flight_server_addr, + owner: session.user_id(), }; let catalog_writer = session.env().catalog_writer(); diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 0931cb46e9d6b..3e148050954f5 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -53,6 +53,7 @@ impl ObserverState for FrontendObserverNode { | Info::Source(_) | Info::Index(_) | Info::Sink(_) + | Info::Function(_) | Info::View(_) => { self.handle_catalog_notification(resp); } @@ -215,6 +216,15 @@ impl FrontendObserverNode { } _ => panic!("receive an unsupported notify {:?}", resp), }, + Info::Function(function) => match resp.operation() { + Operation::Add => catalog_guard.create_function(function), + Operation::Delete => catalog_guard.drop_function( + function.database_id, + function.schema_id, + function.id.into(), + ), + _ => panic!("receive an unsupported notify {:?}", resp), + }, _ => unreachable!(), } assert!( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 2f19ba5c5cf3c..32feb74b55c76 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -451,7 +451,26 @@ where } pub async fn create_function(&self, function: &Function) -> MetaResult { - todo!("create function") + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let user_core = &mut core.user; + database_core.ensure_database_id(function.database_id)?; + database_core.ensure_schema_id(function.schema_id)?; + + #[cfg(not(test))] + user_core.ensure_user_id(function.owner)?; + + let mut functions = BTreeMapTransaction::new(&mut database_core.functions); + functions.insert(function.id, function.clone()); + commit_meta!(self, functions)?; + + user_core.increase_ref(function.owner); + + let version = self + .notify_frontend(Operation::Add, Info::Function(function.to_owned())) + .await; + + Ok(version) } pub async fn drop_function(&self, function_id: FunctionId) -> MetaResult { diff --git a/src/meta/src/manager/id.rs b/src/meta/src/manager/id.rs index 3993d9714f699..f42dffd22faf3 100644 --- a/src/meta/src/manager/id.rs +++ b/src/meta/src/manager/id.rs @@ -153,6 +153,7 @@ pub struct IdGeneratorManager { database: Arc>, schema: Arc>, table: Arc>, + function: Arc>, worker: Arc>, fragment: Arc>, actor: Arc>, @@ -182,6 +183,7 @@ where ) .await, ), + function: Arc::new(StoredIdGenerator::new(meta_store.clone(), "function", None).await), worker: Arc::new( StoredIdGenerator::new(meta_store.clone(), "worker", Some(META_NODE_ID as u64 + 1)) .await, @@ -227,6 +229,7 @@ where IdCategory::Database => &self.database, IdCategory::Schema => &self.schema, IdCategory::Table => &self.table, + IdCategory::Function => &self.function, IdCategory::Fragment => &self.fragment, IdCategory::Actor => &self.actor, IdCategory::User => &self.user, From 480b426f67aa9fc8e0583bd946c8d3491f35b2af Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 6 Jan 2023 14:53:30 +0800 Subject: [PATCH 05/15] add empty DROP FUNCTION handler Signed-off-by: Runji Wang --- src/frontend/src/catalog/function_catalog.rs | 25 ++++++++++++++----- src/frontend/src/handler/drop_function.rs | 26 ++++++++++++++++++++ src/frontend/src/handler/mod.rs | 6 +++++ src/sqlparser/src/ast/mod.rs | 2 ++ src/sqlparser/src/parser.rs | 5 ++-- 5 files changed, 56 insertions(+), 8 deletions(-) create mode 100644 src/frontend/src/handler/drop_function.rs diff --git a/src/frontend/src/catalog/function_catalog.rs b/src/frontend/src/catalog/function_catalog.rs index a0bd796ff573d..ef3907a3fad54 100644 --- a/src/frontend/src/catalog/function_catalog.rs +++ b/src/frontend/src/catalog/function_catalog.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use itertools::Itertools; use risingwave_common::catalog::FunctionId; use risingwave_common::types::DataType; use risingwave_pb::catalog::{ @@ -63,13 +64,25 @@ impl From<&ProstFunctionArgument> for FunctionArgument { } impl FunctionCatalog { - pub fn name(&self) -> &str { - &self.name + /// Returns the SQL statement that can be used to create this function. + #[allow(dead_code)] + pub fn create_sql(&self) -> String { + format!( + "CREATE FUNCTION {}({}) RETURNS {} LANGUAGE {} AS '{}'", + self.name, + self.arguments.iter().map(|arg| arg.create_sql()).join(","), + self.return_type, + self.language, + self.path + ) } +} - /// Returns the SQL statement that can be used to create this view. - pub fn create_sql(&self) -> String { - todo!(); - format!("CREATE FUNCTION {}", self.name) +impl FunctionArgument { + fn create_sql(&self) -> String { + match &self.name { + Some(name) => format!("{} {}", name, self.data_type), + None => format!("{}", self.data_type), + } } } diff --git a/src/frontend/src/handler/drop_function.rs b/src/frontend/src/handler/drop_function.rs new file mode 100644 index 0000000000000..df16b039922b2 --- /dev/null +++ b/src/frontend/src/handler/drop_function.rs @@ -0,0 +1,26 @@ +// Copyright 2023 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_sqlparser::ast::{DropFunctionDesc, ReferentialAction}; + +use super::*; + +pub async fn handle_drop_function( + _handler_args: HandlerArgs, + _if_exists: bool, + _func_desc: Vec, + _option: Option, +) -> Result { + Err(ErrorCode::NotImplemented("drop function".to_string(), None.into()).into()) +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index dd98b7e2497e1..587d6e0ae5a9d 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -46,6 +46,7 @@ pub mod create_user; mod create_view; mod describe; mod drop_database; +pub mod drop_function; mod drop_index; pub mod drop_mv; mod drop_schema; @@ -303,6 +304,11 @@ pub async fn handle( )) .into()), }, + Statement::DropFunction { + if_exists, + func_desc, + option, + } => drop_function::handle_drop_function(handler_args, if_exists, func_desc, option).await, Statement::Query(_) | Statement::Insert { .. } | Statement::Delete { .. } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index ebc2493ef5739..65b0abf49b683 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -757,6 +757,7 @@ pub enum ShowCreateType { Index, Source, Sink, + Function, } impl fmt::Display for ShowCreateType { @@ -768,6 +769,7 @@ impl fmt::Display for ShowCreateType { ShowCreateType::Index => f.write_str("INDEX"), ShowCreateType::Source => f.write_str("SOURCE"), ShowCreateType::Sink => f.write_str("SINK"), + ShowCreateType::Function => f.write_str("FUNCTION"), } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 71b4c14187de4..f4979b63a0c58 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3078,9 +3078,10 @@ impl Parser { Keyword::INDEX => ShowCreateType::Index, Keyword::SOURCE => ShowCreateType::Source, Keyword::SINK => ShowCreateType::Sink, + Keyword::FUNCTION => ShowCreateType::Function, _ => { return self.expected( - "TABLE, MATERIALIZED VIEW, VIEW, INDEX, SOURCE or SINK", + "TABLE, MATERIALIZED VIEW, VIEW, INDEX, FUNCTION, SOURCE or SINK", self.peek_token(), ) } @@ -3091,7 +3092,7 @@ impl Parser { }); } self.expected( - "TABLE, MATERIALIZED VIEW, VIEW, INDEX, SOURCE or SINK", + "TABLE, MATERIALIZED VIEW, VIEW, INDEX, FUNCTION, SOURCE or SINK", self.peek_token(), ) } From c937bd1f88fb8e111584438b8f8f8c281bc7823a Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 6 Jan 2023 15:29:13 +0800 Subject: [PATCH 06/15] allow functions with the same name Signed-off-by: Runji Wang --- e2e_test/ddl/function.slt | 29 ++++++++++ src/frontend/src/catalog/function_catalog.rs | 56 +++----------------- src/frontend/src/catalog/schema_catalog.rs | 32 ++++++++--- 3 files changed, 61 insertions(+), 56 deletions(-) create mode 100644 e2e_test/ddl/function.slt diff --git a/e2e_test/ddl/function.slt b/e2e_test/ddl/function.slt new file mode 100644 index 0000000000000..ff7e4dd6f9195 --- /dev/null +++ b/e2e_test/ddl/function.slt @@ -0,0 +1,29 @@ +# TODO: check the service on creation + +# Create a function. +statement ok +create function func(int, int) returns int as 'http://localhost:8815' language arrow_flight; + +# Create a function with the same name but different arguments. +statement ok +create function func(int) returns int as 'http://localhost:8815' language arrow_flight; + +# TODO: check existance + +# # Create a function with the same name and arguments. +# statement error +# create function func(int) returns int as 'http://localhost:8815' language arrow_flight; + +# TODO: drop function + +# # Drop a function but ambiguous. +# statement error is not unique +# drop function func; + +# # Drop a function +# statement ok +# drop function func(int); + +# # Drop a function +# statement ok +# drop function func(int, int); diff --git a/src/frontend/src/catalog/function_catalog.rs b/src/frontend/src/catalog/function_catalog.rs index ef3907a3fad54..6ee2b8991ae5b 100644 --- a/src/frontend/src/catalog/function_catalog.rs +++ b/src/frontend/src/catalog/function_catalog.rs @@ -12,77 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; use risingwave_common::catalog::FunctionId; use risingwave_common::types::DataType; -use risingwave_pb::catalog::{ - Function as ProstFunction, FunctionArgument as ProstFunctionArgument, -}; +use risingwave_pb::catalog::Function as ProstFunction; #[derive(Clone, Debug)] pub struct FunctionCatalog { pub id: FunctionId, pub name: String, pub owner: u32, - pub arguments: Vec, + pub arg_types: Vec, pub return_type: DataType, pub language: String, pub path: String, } -#[derive(Clone, Debug)] -pub struct FunctionArgument { - pub name: Option, - pub data_type: DataType, -} - impl From<&ProstFunction> for FunctionCatalog { fn from(prost: &ProstFunction) -> Self { FunctionCatalog { id: prost.id.into(), name: prost.name.clone(), owner: prost.owner, - arguments: prost.arguments.iter().map(|arg| arg.into()).collect(), + arg_types: prost + .arguments + .iter() + .map(|arg| arg.r#type.as_ref().expect("no return type").into()) + .collect(), return_type: prost.return_type.as_ref().expect("no return type").into(), language: prost.language.clone(), path: prost.path.clone(), } } } - -impl From<&ProstFunctionArgument> for FunctionArgument { - fn from(prost: &ProstFunctionArgument) -> Self { - FunctionArgument { - name: if prost.name.is_empty() { - None - } else { - Some(prost.name.clone()) - }, - data_type: prost.r#type.as_ref().expect("no return type").into(), - } - } -} - -impl FunctionCatalog { - /// Returns the SQL statement that can be used to create this function. - #[allow(dead_code)] - pub fn create_sql(&self) -> String { - format!( - "CREATE FUNCTION {}({}) RETURNS {} LANGUAGE {} AS '{}'", - self.name, - self.arguments.iter().map(|arg| arg.create_sql()).join(","), - self.return_type, - self.language, - self.path - ) - } -} - -impl FunctionArgument { - fn create_sql(&self) -> String { - match &self.name { - Some(name) => format!("{} {}", name, self.data_type), - None => format!("{}", self.data_type), - } - } -} diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index f7083fdbc6e0f..5f0eba7cc95fd 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use risingwave_common::catalog::{valid_table_name, FunctionId, IndexId, TableId}; +use risingwave_common::types::DataType; use risingwave_pb::catalog::{ Function as ProstFunction, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable, View as ProstView, @@ -50,8 +51,7 @@ pub struct SchemaCatalog { indexes_by_table_id: HashMap>>, view_by_name: HashMap>, view_by_id: HashMap>, - // TODO: handle overload functions with the same name - function_by_name: HashMap>, + function_by_name: HashMap, Arc>>, function_by_id: HashMap>, // This field only available when schema is "pg_catalog". Meanwhile, others will be empty. @@ -190,19 +190,29 @@ impl SchemaCatalog { let name = prost.name.clone(); let id = prost.id; let function = FunctionCatalog::from(prost); + let args = function.arg_types.clone(); let function_ref = Arc::new(function); self.function_by_name - .try_insert(name, function_ref.clone()) - .unwrap(); + .entry(name) + .or_default() + .try_insert(args, function_ref.clone()) + .expect("function already exists with same argument types"); self.function_by_id .try_insert(id.into(), function_ref) - .unwrap(); + .expect("function id exists"); } pub fn drop_function(&mut self, id: FunctionId) { - let function_ref = self.function_by_id.remove(&id).unwrap(); - self.function_by_name.remove(&function_ref.name).unwrap(); + let function_ref = self + .function_by_id + .remove(&id) + .expect("function not found by id"); + self.function_by_name + .get_mut(&function_ref.name) + .expect("function not found by name") + .remove(&function_ref.arg_types) + .expect("function not found by argument types"); } pub fn iter_table(&self) -> impl Iterator> { @@ -302,6 +312,14 @@ impl SchemaCatalog { self.view_by_name.get(view_name) } + pub fn get_function_by_name_args( + &self, + name: &str, + args: &[DataType], + ) -> Option<&Arc> { + self.function_by_name.get(name)?.get(args) + } + pub fn id(&self) -> SchemaId { self.id } From c89ee4820615943a7f80619c6dc6f9002316c295 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 6 Jan 2023 16:27:12 +0800 Subject: [PATCH 07/15] add `UserDefinedFunction` expression Signed-off-by: Runji Wang --- proto/expr.proto | 2 + src/frontend/src/binder/expr/function.rs | 21 ++++++- src/frontend/src/catalog/root_catalog.rs | 1 + src/frontend/src/expr/expr_mutator.rs | 9 ++- src/frontend/src/expr/expr_rewriter.rs | 20 ++++++- src/frontend/src/expr/expr_visitor.rs | 11 +++- src/frontend/src/expr/mod.rs | 12 +++- .../src/expr/user_defined_function.rs | 57 +++++++++++++++++++ 8 files changed, 128 insertions(+), 5 deletions(-) create mode 100644 src/frontend/src/expr/user_defined_function.rs diff --git a/proto/expr.proto b/proto/expr.proto index 6ee68879758fe..b7e990a4f544a 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -117,6 +117,8 @@ message ExprNode { VNODE = 1101; // Non-deterministic functions NOW = 2022; + // User defined functions + UDF = 3000; } Type expr_type = 1; data.DataType return_type = 3; diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index ca5ae51efb59a..4f0b5a6484b89 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -28,7 +28,7 @@ use crate::binder::bind_context::Clause; use crate::binder::{Binder, BoundQuery, BoundSetExpr}; use crate::expr::{ AggCall, Expr, ExprImpl, ExprType, FunctionCall, Literal, OrderBy, Subquery, SubqueryKind, - TableFunction, TableFunctionType, WindowFunction, WindowFunctionType, + TableFunction, TableFunctionType, UserDefinedFunction, WindowFunction, WindowFunctionType, }; use crate::utils::Condition; @@ -96,6 +96,25 @@ impl Binder { return Ok(TableFunction::new(function_type, inputs)?.into()); } + // user defined function + // TODO: resolve schema name + if let Some(func) = self + .catalog + .first_valid_schema( + &self.db_name, + &self.search_path, + &self.auth_context.user_name, + )? + .get_function_by_name_args( + &function_name, + &inputs.iter().map(|arg| arg.return_type()).collect_vec(), + ) + { + return Ok( + UserDefinedFunction::new(&func.name, inputs, func.return_type.clone()).into(), + ); + } + // normal function let mut inputs = inputs; let function_type = match function_name.as_str() { diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 608d7c2898f2c..521319a771f40 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -63,6 +63,7 @@ impl<'a> SchemaPath<'a> { /// - catalog (root catalog) /// - database catalog /// - schema catalog +/// - function catalog /// - table/sink/source/index/view catalog /// - column catalog pub struct Catalog { diff --git a/src/frontend/src/expr/expr_mutator.rs b/src/frontend/src/expr/expr_mutator.rs index f077606390cef..b7fac163dcc7e 100644 --- a/src/frontend/src/expr/expr_mutator.rs +++ b/src/frontend/src/expr/expr_mutator.rs @@ -14,7 +14,7 @@ use super::{ AggCall, CorrelatedInputRef, ExprImpl, FunctionCall, InputRef, Literal, Subquery, - TableFunction, WindowFunction, + TableFunction, UserDefinedFunction, WindowFunction, }; /// with the same visit logic of `ExprVisitor`, but mutable. @@ -29,6 +29,7 @@ pub trait ExprMutator { ExprImpl::CorrelatedInputRef(inner) => self.visit_correlated_input_ref(inner), ExprImpl::TableFunction(inner) => self.visit_table_function(inner), ExprImpl::WindowFunction(inner) => self.visit_window_function(inner), + ExprImpl::UserDefinedFunction(inner) => self.visit_user_defined_function(inner), } } fn visit_function_call(&mut self, func_call: &mut FunctionCall) { @@ -61,4 +62,10 @@ pub trait ExprMutator { .iter_mut() .for_each(|expr| self.visit_expr(expr)); } + fn visit_user_defined_function(&mut self, func_call: &mut UserDefinedFunction) { + func_call + .args + .iter_mut() + .for_each(|expr| self.visit_expr(expr)); + } } diff --git a/src/frontend/src/expr/expr_rewriter.rs b/src/frontend/src/expr/expr_rewriter.rs index 80b8472d31e25..c47ff9b187079 100644 --- a/src/frontend/src/expr/expr_rewriter.rs +++ b/src/frontend/src/expr/expr_rewriter.rs @@ -14,7 +14,7 @@ use super::{ AggCall, CorrelatedInputRef, ExprImpl, FunctionCall, InputRef, Literal, Subquery, - TableFunction, WindowFunction, + TableFunction, UserDefinedFunction, WindowFunction, }; /// By default, `ExprRewriter` simply traverses the expression tree and leaves nodes unchanged. @@ -31,6 +31,7 @@ pub trait ExprRewriter { ExprImpl::CorrelatedInputRef(inner) => self.rewrite_correlated_input_ref(*inner), ExprImpl::TableFunction(inner) => self.rewrite_table_function(*inner), ExprImpl::WindowFunction(inner) => self.rewrite_window_function(*inner), + ExprImpl::UserDefinedFunction(inner) => self.rewrite_user_defined_function(*inner), } } fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl { @@ -103,4 +104,21 @@ pub trait ExprRewriter { } .into() } + fn rewrite_user_defined_function(&mut self, udf: UserDefinedFunction) -> ExprImpl { + let UserDefinedFunction { + args, + return_type, + name, + } = udf; + let args = args + .into_iter() + .map(|expr| self.rewrite_expr(expr)) + .collect(); + UserDefinedFunction { + args, + return_type, + name, + } + .into() + } } diff --git a/src/frontend/src/expr/expr_visitor.rs b/src/frontend/src/expr/expr_visitor.rs index 05b5c9352d874..4e493f207c88a 100644 --- a/src/frontend/src/expr/expr_visitor.rs +++ b/src/frontend/src/expr/expr_visitor.rs @@ -14,7 +14,7 @@ use super::{ AggCall, CorrelatedInputRef, ExprImpl, FunctionCall, InputRef, Literal, Subquery, - TableFunction, WindowFunction, + TableFunction, UserDefinedFunction, WindowFunction, }; /// Traverse an expression tree. @@ -41,6 +41,7 @@ pub trait ExprVisitor { ExprImpl::CorrelatedInputRef(inner) => self.visit_correlated_input_ref(inner), ExprImpl::TableFunction(inner) => self.visit_table_function(inner), ExprImpl::WindowFunction(inner) => self.visit_window_function(inner), + ExprImpl::UserDefinedFunction(inner) => self.visit_user_defined_function(inner), } } fn visit_function_call(&mut self, func_call: &FunctionCall) -> R { @@ -90,4 +91,12 @@ pub trait ExprVisitor { .reduce(Self::merge) .unwrap_or_default() } + fn visit_user_defined_function(&mut self, func_call: &UserDefinedFunction) -> R { + func_call + .args + .iter() + .map(|expr| self.visit_expr(expr)) + .reduce(Self::merge) + .unwrap_or_default() + } } diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index ae77a201f3048..ca1d175436278 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -28,6 +28,7 @@ mod input_ref; mod literal; mod subquery; mod table_function; +mod user_defined_function; mod window_function; mod order_by_expr; @@ -54,6 +55,7 @@ pub use type_inference::{ agg_func_sigs, align_types, cast_map_array, cast_ok, cast_sigs, func_sigs, infer_type, least_restrictive, AggFuncSig, CastContext, CastSig, FuncSign, }; +pub use user_defined_function::UserDefinedFunction; pub use utils::*; pub use window_function::{WindowFunction, WindowFunctionType}; @@ -90,7 +92,8 @@ impl_expr_impl!( AggCall, Subquery, TableFunction, - WindowFunction + WindowFunction, + UserDefinedFunction ); impl ExprImpl { @@ -662,6 +665,7 @@ impl Expr for ExprImpl { ExprImpl::CorrelatedInputRef(expr) => expr.return_type(), ExprImpl::TableFunction(expr) => expr.return_type(), ExprImpl::WindowFunction(expr) => expr.return_type(), + ExprImpl::UserDefinedFunction(expr) => expr.return_type(), } } @@ -679,6 +683,7 @@ impl Expr for ExprImpl { ExprImpl::WindowFunction(_e) => { unreachable!("Window function should not be converted to ExprNode") } + ExprImpl::UserDefinedFunction(e) => e.to_expr_proto(), } } } @@ -710,6 +715,9 @@ impl std::fmt::Debug for ExprImpl { } Self::TableFunction(arg0) => f.debug_tuple("TableFunction").field(arg0).finish(), Self::WindowFunction(arg0) => f.debug_tuple("WindowFunction").field(arg0).finish(), + Self::UserDefinedFunction(arg0) => { + f.debug_tuple("UserDefinedFunction").field(arg0).finish() + } }; } match self { @@ -721,6 +729,7 @@ impl std::fmt::Debug for ExprImpl { Self::CorrelatedInputRef(x) => write!(f, "{:?}", x), Self::TableFunction(x) => write!(f, "{:?}", x), Self::WindowFunction(x) => write!(f, "{:?}", x), + Self::UserDefinedFunction(x) => write!(f, "{:?}", x), } } } @@ -762,6 +771,7 @@ impl std::fmt::Debug for ExprDisplay<'_> { // TODO: WindowFunctionCallVerboseDisplay write!(f, "{:?}", x) } + ExprImpl::UserDefinedFunction(x) => write!(f, "{:?}", x), } } } diff --git a/src/frontend/src/expr/user_defined_function.rs b/src/frontend/src/expr/user_defined_function.rs new file mode 100644 index 0000000000000..825f6ffd25423 --- /dev/null +++ b/src/frontend/src/expr/user_defined_function.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::Schema; +use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::types::DataType; +use risingwave_pb::catalog::Function; + +use super::{cast_ok, infer_type, CastContext, Expr, ExprImpl, Literal}; +use crate::expr::{ExprDisplay, ExprType}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct UserDefinedFunction { + pub name: String, + pub return_type: DataType, + pub args: Vec, +} + +impl UserDefinedFunction { + pub fn new(name: &str, args: Vec, return_type: DataType) -> Self { + Self { + name: name.into(), + return_type, + args, + } + } +} + +impl Expr for UserDefinedFunction { + fn return_type(&self) -> DataType { + self.return_type.clone() + } + + fn to_expr_proto(&self) -> risingwave_pb::expr::ExprNode { + use risingwave_pb::expr::expr_node::*; + use risingwave_pb::expr::*; + ExprNode { + expr_type: Type::Udf.into(), + return_type: Some(self.return_type().to_protobuf()), + rex_node: Some(RexNode::FuncCall(FunctionCall { + children: self.args.iter().map(Expr::to_expr_proto).collect(), + })), + } + } +} From a462fa2c8460adf18eeb12a324b18ee0615633b2 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 6 Jan 2023 16:51:09 +0800 Subject: [PATCH 08/15] remove argument names from catalog.proto Signed-off-by: Runji Wang --- proto/catalog.proto | 7 +------ src/frontend/src/catalog/function_catalog.rs | 6 +----- src/frontend/src/handler/create_function.rs | 11 ++++------- 3 files changed, 6 insertions(+), 18 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 8502792a5e7b9..f1acc672dd39c 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -81,18 +81,13 @@ message Function { uint32 schema_id = 2; uint32 database_id = 3; string name = 4; - repeated FunctionArgument arguments = 5; + repeated data.DataType arg_types = 5; data.DataType return_type = 6; string language = 7; string path = 8; uint32 owner = 9; } -message FunctionArgument { - string name = 1; - data.DataType type = 2; -} - // See `TableCatalog` struct in frontend crate for more information. message Table { enum TableType { diff --git a/src/frontend/src/catalog/function_catalog.rs b/src/frontend/src/catalog/function_catalog.rs index 6ee2b8991ae5b..e574b526e117c 100644 --- a/src/frontend/src/catalog/function_catalog.rs +++ b/src/frontend/src/catalog/function_catalog.rs @@ -33,11 +33,7 @@ impl From<&ProstFunction> for FunctionCatalog { id: prost.id.into(), name: prost.name.clone(), owner: prost.owner, - arg_types: prost - .arguments - .iter() - .map(|arg| arg.r#type.as_ref().expect("no return type").into()) - .collect(), + arg_types: prost.arg_types.iter().map(|arg| arg.into()).collect(), return_type: prost.return_type.as_ref().expect("no return type").into(), language: prost.language.clone(), path: prost.path.clone(), diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index e873844d4c065..42c1d51ed060e 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -14,7 +14,7 @@ use pgwire::pg_response::StatementType; use risingwave_common::catalog::FunctionId; -use risingwave_pb::catalog::{Function, FunctionArgument}; +use risingwave_pb::catalog::Function; use risingwave_sqlparser::ast::{ CreateFunctionBody, DataType, FunctionDefinition, ObjectName, OperateFunctionArg, }; @@ -70,12 +70,9 @@ pub async fn handle_create_function( ErrorCode::InvalidParameterValue("return type must be specified".to_string()).into(), ) }; - let mut arguments = vec![]; + let mut arg_types = vec![]; for arg in args.unwrap_or_default() { - arguments.push(FunctionArgument { - name: arg.name.map_or(String::new(), |ident| ident.real_value()), - r#type: Some(bind_data_type(&arg.data_type)?.into()), - }); + arg_types.push(bind_data_type(&arg.data_type)?.into()); } // resolve database and schema id @@ -89,7 +86,7 @@ pub async fn handle_create_function( schema_id, database_id, name: function_name, - arguments, + arg_types, return_type: Some(bind_data_type(&return_type)?.into()), language, path: flight_server_addr, From 66867633a9afbbe06ada0014b42534ba23f2b281 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 6 Jan 2023 17:12:34 +0800 Subject: [PATCH 09/15] define UDF expression Signed-off-by: Runji Wang --- proto/expr.proto | 9 +++ src/expr/src/expr/expr_udf.rs | 70 +++++++++++++++++++ src/expr/src/expr/mod.rs | 3 + src/frontend/src/binder/expr/function.rs | 4 +- src/frontend/src/catalog/function_catalog.rs | 2 +- src/frontend/src/expr/expr_rewriter.rs | 13 +--- .../src/expr/user_defined_function.rs | 27 ++++--- 7 files changed, 103 insertions(+), 25 deletions(-) create mode 100644 src/expr/src/expr/expr_udf.rs diff --git a/proto/expr.proto b/proto/expr.proto index b7e990a4f544a..64b8463b4841d 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -126,6 +126,7 @@ message ExprNode { InputRefExpr input_ref = 4; data.Datum constant = 5; FunctionCall func_call = 6; + UserDefinedFunction udf = 7; } } @@ -215,3 +216,11 @@ message AggCall { repeated OrderByField order_by_fields = 5; ExprNode filter = 6; } + +message UserDefinedFunction { + repeated ExprNode children = 1; + string name = 2; + repeated data.DataType arg_types = 3; + string language = 4; + string path = 5; +} diff --git a/src/expr/src/expr/expr_udf.rs b/src/expr/src/expr/expr_udf.rs new file mode 100644 index 0000000000000..7c9cb89d5a287 --- /dev/null +++ b/src/expr/src/expr/expr_udf.rs @@ -0,0 +1,70 @@ +// Copyright 2023 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::TryFrom; +use std::sync::Arc; + +use risingwave_common::array::{ArrayBuilder, ArrayBuilderImpl, ArrayRef, DataChunk}; +use risingwave_common::for_all_variants; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{literal_type_match, DataType, Datum, Scalar, ScalarImpl}; +use risingwave_pb::expr::expr_node::{RexNode, Type}; +use risingwave_pb::expr::ExprNode; + +use super::{build_from_prost, BoxedExpression}; +use crate::expr::Expression; +use crate::{bail, ensure, ExprError, Result}; + +#[derive(Debug)] +pub struct UdfExpression { + children: Vec, + name: String, + arg_types: Vec, + return_type: DataType, + // TODO: arrow flight client +} + +impl Expression for UdfExpression { + fn return_type(&self) -> DataType { + self.return_type.clone() + } + + fn eval(&self, input: &DataChunk) -> Result { + todo!("evaluate UDF") + } + + fn eval_row(&self, _input: &OwnedRow) -> Result { + todo!("evaluate UDF") + } +} + +impl UdfExpression {} + +impl<'a> TryFrom<&'a ExprNode> for UdfExpression { + type Error = ExprError; + + fn try_from(prost: &'a ExprNode) -> Result { + ensure!(prost.get_expr_type().unwrap() == Type::Udf); + let ret_type = DataType::from(prost.get_return_type().unwrap()); + let RexNode::Udf(udf) = prost.get_rex_node().unwrap() else { + bail!("expect UDF"); + }; + Ok(Self { + children: udf.children.iter().map(build_from_prost).try_collect()?, + name: udf.name.clone(), + arg_types: udf.arg_types.iter().map(|t| t.into()).collect(), + return_type: ret_type, + }) + } +} diff --git a/src/expr/src/expr/mod.rs b/src/expr/src/expr/mod.rs index 6939c526dcc40..5847957e59335 100644 --- a/src/expr/src/expr/mod.rs +++ b/src/expr/src/expr/mod.rs @@ -33,6 +33,7 @@ pub mod expr_regexp; mod expr_ternary_bytes; mod expr_to_char_const_tmpl; mod expr_to_timestamp_const_tmpl; +mod expr_udf; pub mod expr_unary; mod expr_vnode; mod template; @@ -61,6 +62,7 @@ use crate::expr::expr_field::FieldExpression; use crate::expr::expr_in::InExpression; use crate::expr::expr_nested_construct::NestedConstructExpression; use crate::expr::expr_regexp::RegexpMatchExpression; +use crate::expr::expr_udf::UdfExpression; use crate::expr::expr_vnode::VnodeExpression; use crate::ExprError; @@ -160,6 +162,7 @@ pub fn build_from_prost(prost: &ExprNode) -> Result { }; LiteralExpression::try_from(bind_timestamp).map(Expression::boxed) } + Udf => UdfExpression::try_from(prost).map(Expression::boxed), _ => Err(ExprError::UnsupportedFunction(format!( "{:?}", prost.get_expr_type() diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 4f0b5a6484b89..445fe4320700c 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -110,9 +110,7 @@ impl Binder { &inputs.iter().map(|arg| arg.return_type()).collect_vec(), ) { - return Ok( - UserDefinedFunction::new(&func.name, inputs, func.return_type.clone()).into(), - ); + return Ok(UserDefinedFunction::new(func.clone(), inputs).into()); } // normal function diff --git a/src/frontend/src/catalog/function_catalog.rs b/src/frontend/src/catalog/function_catalog.rs index e574b526e117c..623731d7627e3 100644 --- a/src/frontend/src/catalog/function_catalog.rs +++ b/src/frontend/src/catalog/function_catalog.rs @@ -16,7 +16,7 @@ use risingwave_common::catalog::FunctionId; use risingwave_common::types::DataType; use risingwave_pb::catalog::Function as ProstFunction; -#[derive(Clone, Debug)] +#[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct FunctionCatalog { pub id: FunctionId, pub name: String, diff --git a/src/frontend/src/expr/expr_rewriter.rs b/src/frontend/src/expr/expr_rewriter.rs index c47ff9b187079..3da8747b7923e 100644 --- a/src/frontend/src/expr/expr_rewriter.rs +++ b/src/frontend/src/expr/expr_rewriter.rs @@ -105,20 +105,11 @@ pub trait ExprRewriter { .into() } fn rewrite_user_defined_function(&mut self, udf: UserDefinedFunction) -> ExprImpl { - let UserDefinedFunction { - args, - return_type, - name, - } = udf; + let UserDefinedFunction { args, catalog } = udf; let args = args .into_iter() .map(|expr| self.rewrite_expr(expr)) .collect(); - UserDefinedFunction { - args, - return_type, - name, - } - .into() + UserDefinedFunction { args, catalog }.into() } } diff --git a/src/frontend/src/expr/user_defined_function.rs b/src/frontend/src/expr/user_defined_function.rs index 825f6ffd25423..7ab0868b93cca 100644 --- a/src/frontend/src/expr/user_defined_function.rs +++ b/src/frontend/src/expr/user_defined_function.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use itertools::Itertools; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result}; @@ -19,28 +21,24 @@ use risingwave_common::types::DataType; use risingwave_pb::catalog::Function; use super::{cast_ok, infer_type, CastContext, Expr, ExprImpl, Literal}; +use crate::catalog::function_catalog::FunctionCatalog; use crate::expr::{ExprDisplay, ExprType}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct UserDefinedFunction { - pub name: String, - pub return_type: DataType, pub args: Vec, + pub catalog: Arc, } impl UserDefinedFunction { - pub fn new(name: &str, args: Vec, return_type: DataType) -> Self { - Self { - name: name.into(), - return_type, - args, - } + pub fn new(catalog: Arc, args: Vec) -> Self { + Self { args, catalog } } } impl Expr for UserDefinedFunction { fn return_type(&self) -> DataType { - self.return_type.clone() + self.catalog.return_type.clone() } fn to_expr_proto(&self) -> risingwave_pb::expr::ExprNode { @@ -49,8 +47,17 @@ impl Expr for UserDefinedFunction { ExprNode { expr_type: Type::Udf.into(), return_type: Some(self.return_type().to_protobuf()), - rex_node: Some(RexNode::FuncCall(FunctionCall { + rex_node: Some(RexNode::Udf(UserDefinedFunction { children: self.args.iter().map(Expr::to_expr_proto).collect(), + name: self.catalog.name.clone(), + arg_types: self + .catalog + .arg_types + .iter() + .map(|t| t.to_protobuf()) + .collect(), + language: self.catalog.language.clone(), + path: self.catalog.path.clone(), })), } } From d26cbbeca65b875ee044d681d7be996c07297ca6 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 9 Jan 2023 12:52:00 +0800 Subject: [PATCH 10/15] refactor find catalog with `SchemaPath` Signed-off-by: Runji Wang --- src/frontend/src/catalog/root_catalog.rs | 284 +++++++---------------- 1 file changed, 87 insertions(+), 197 deletions(-) diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 521319a771f40..6e7ae1049b371 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -18,12 +18,14 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD}; +use risingwave_common::types::DataType; use risingwave_pb::catalog::{ Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable, View as ProstView, }; +use super::function_catalog::FunctionCatalog; use super::source_catalog::SourceCatalog; use super::system_catalog::get_sys_catalogs_in_schema; use super::view_catalog::ViewCatalog; @@ -53,6 +55,25 @@ impl<'a> SchemaPath<'a> { None => SchemaPath::Path(search_path, user_name), } } + + /// Call function `f` for each schema name. Return the first `Some` result. + pub fn find(&self, mut f: impl FnMut(&str) -> Option) -> Option<(T, &'a str)> { + match self { + SchemaPath::Name(schema_name) => f(schema_name).map(|t| (t, *schema_name)), + SchemaPath::Path(search_path, user_name) => { + for schema_name in search_path.path() { + let mut schema_name: &str = schema_name; + if schema_name == USER_NAME_WILD_CARD { + schema_name = user_name; + } + if let Some(res) = f(schema_name) { + return Some((res, schema_name)); + } + } + None + } + } + } } /// Root catalog of database catalog. It manages all database/schema/table in memory on frontend. @@ -327,44 +348,19 @@ impl Catalog { )) } - #[inline(always)] - fn get_table_by_name_with_schema_name( - &self, - db_name: &str, - schema_name: &str, - table_name: &str, - ) -> CatalogResult<&Arc> { - self.get_schema_by_name(db_name, schema_name)? - .get_table_by_name(table_name) - .ok_or_else(|| CatalogError::NotFound("table", table_name.to_string())) - } - pub fn get_table_by_name<'a>( &self, db_name: &str, schema_path: SchemaPath<'a>, table_name: &str, ) -> CatalogResult<(&Arc, &'a str)> { - match schema_path { - SchemaPath::Name(schema_name) => self - .get_table_by_name_with_schema_name(db_name, schema_name, table_name) - .map(|table_catalog| (table_catalog, schema_name)), - SchemaPath::Path(search_path, user_name) => { - for path in search_path.path() { - let mut schema_name: &str = path; - if schema_name == USER_NAME_WILD_CARD { - schema_name = user_name; - } - - if let Ok(table_catalog) = - self.get_table_by_name_with_schema_name(db_name, schema_name, table_name) - { - return Ok((table_catalog, schema_name)); - } - } - Err(CatalogError::NotFound("table", table_name.to_string())) - } - } + schema_path + .find(|schema_name| { + self.get_schema_by_name(db_name, schema_name) + .ok()? + .get_table_by_name(table_name) + }) + .ok_or_else(|| CatalogError::NotFound("table", table_name.to_string())) } pub fn get_table_by_id(&self, table_id: &TableId) -> CatalogResult { @@ -397,56 +393,19 @@ impl Catalog { .ok_or_else(|| CatalogError::NotFound("table", table_name.to_string())) } - #[inline(always)] - fn get_source_by_name_with_schema_name( - &self, - db_name: &str, - schema_name: &str, - source_name: &str, - ) -> CatalogResult<&Arc> { - self.get_schema_by_name(db_name, schema_name)? - .get_source_by_name(source_name) - .ok_or_else(|| CatalogError::NotFound("source", source_name.to_string())) - } - pub fn get_source_by_name<'a>( &self, db_name: &str, schema_path: SchemaPath<'a>, source_name: &str, ) -> CatalogResult<(&Arc, &'a str)> { - match schema_path { - SchemaPath::Name(schema_name) => self - .get_source_by_name_with_schema_name(db_name, schema_name, source_name) - .map(|source_catalog| (source_catalog, schema_name)), - SchemaPath::Path(search_path, user_name) => { - for path in search_path.path() { - let mut schema_name: &str = path; - if schema_name == USER_NAME_WILD_CARD { - schema_name = user_name; - } - - if let Ok(source_catalog) = - self.get_source_by_name_with_schema_name(db_name, schema_name, source_name) - { - return Ok((source_catalog, schema_name)); - } - } - Err(CatalogError::NotFound("source", source_name.to_string())) - } - } - } - - #[inline(always)] - fn get_sink_by_name_with_schema_name( - &self, - db_name: &str, - schema_name: &str, - sink_name: &str, - ) -> CatalogResult<&Arc> { - self.get_schema_by_name(db_name, schema_name)? - .get_sink_by_name(sink_name) - .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_string())) + schema_path + .find(|schema_name| { + self.get_schema_by_name(db_name, schema_name) + .ok()? + .get_source_by_name(source_name) + }) + .ok_or_else(|| CatalogError::NotFound("source", source_name.to_string())) } pub fn get_sink_by_name<'a>( @@ -455,38 +414,13 @@ impl Catalog { schema_path: SchemaPath<'a>, sink_name: &str, ) -> CatalogResult<(&Arc, &'a str)> { - match schema_path { - SchemaPath::Name(schema_name) => self - .get_sink_by_name_with_schema_name(db_name, schema_name, sink_name) - .map(|sink_catalog| (sink_catalog, schema_name)), - SchemaPath::Path(search_path, user_name) => { - for path in search_path.path() { - let mut schema_name: &str = path; - if schema_name == USER_NAME_WILD_CARD { - schema_name = user_name; - } - - if let Ok(sink_catalog) = - self.get_sink_by_name_with_schema_name(db_name, schema_name, sink_name) - { - return Ok((sink_catalog, schema_name)); - } - } - Err(CatalogError::NotFound("sink", sink_name.to_string())) - } - } - } - - #[inline(always)] - fn get_index_by_name_with_schema_name( - &self, - db_name: &str, - schema_name: &str, - index_name: &str, - ) -> CatalogResult<&Arc> { - self.get_schema_by_name(db_name, schema_name)? - .get_index_by_name(index_name) - .ok_or_else(|| CatalogError::NotFound("index", index_name.to_string())) + schema_path + .find(|schema_name| { + self.get_schema_by_name(db_name, schema_name) + .ok()? + .get_sink_by_name(sink_name) + }) + .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_string())) } pub fn get_index_by_name<'a>( @@ -495,66 +429,44 @@ impl Catalog { schema_path: SchemaPath<'a>, index_name: &str, ) -> CatalogResult<(&Arc, &'a str)> { - match schema_path { - SchemaPath::Name(schema_name) => self - .get_index_by_name_with_schema_name(db_name, schema_name, index_name) - .map(|index_catalog| (index_catalog, schema_name)), - SchemaPath::Path(search_path, user_name) => { - for path in search_path.path() { - let mut schema_name: &str = path; - if schema_name == USER_NAME_WILD_CARD { - schema_name = user_name; - } - - if let Ok(index_catalog) = - self.get_index_by_name_with_schema_name(db_name, schema_name, index_name) - { - return Ok((index_catalog, schema_name)); - } - } - Err(CatalogError::NotFound("index", index_name.to_string())) - } - } + schema_path + .find(|schema_name| { + self.get_schema_by_name(db_name, schema_name) + .ok()? + .get_index_by_name(index_name) + }) + .ok_or_else(|| CatalogError::NotFound("index", index_name.to_string())) } - #[inline(always)] - fn get_view_by_name_with_schema_name( + pub fn get_view_by_name<'a>( &self, db_name: &str, - schema_name: &str, + schema_path: SchemaPath<'a>, view_name: &str, - ) -> CatalogResult<&Arc> { - self.get_schema_by_name(db_name, schema_name)? - .get_view_by_name(view_name) + ) -> CatalogResult<(&Arc, &'a str)> { + schema_path + .find(|schema_name| { + self.get_schema_by_name(db_name, schema_name) + .ok()? + .get_view_by_name(view_name) + }) .ok_or_else(|| CatalogError::NotFound("view", view_name.to_string())) } - pub fn get_view_by_name<'a>( + pub fn get_function_by_name_args<'a>( &self, db_name: &str, schema_path: SchemaPath<'a>, - view_name: &str, - ) -> CatalogResult<(&Arc, &'a str)> { - match schema_path { - SchemaPath::Name(schema_name) => self - .get_view_by_name_with_schema_name(db_name, schema_name, view_name) - .map(|view_catalog| (view_catalog, schema_name)), - SchemaPath::Path(search_path, user_name) => { - for path in search_path.path() { - let mut schema_name: &str = path; - if schema_name == USER_NAME_WILD_CARD { - schema_name = user_name; - } - - if let Ok(view_catalog) = - self.get_view_by_name_with_schema_name(db_name, schema_name, view_name) - { - return Ok((view_catalog, schema_name)); - } - } - Err(CatalogError::NotFound("view", view_name.to_string())) - } - } + function_name: &str, + args: &[DataType], + ) -> CatalogResult<(&Arc, &'a str)> { + schema_path + .find(|schema_name| { + self.get_schema_by_name(db_name, schema_name) + .ok()? + .get_function_by_name_args(function_name, args) + }) + .ok_or_else(|| CatalogError::NotFound("function", function_name.to_string())) } /// Check the name if duplicated with existing table, materialized view or source. @@ -616,52 +528,30 @@ impl Catalog { .get_indexes_by_table_id(&mv_id) } - fn get_id_by_class_name_inner( - &self, - db_name: &str, - schema_name: &str, - class_name: &str, - ) -> CatalogResult { - let schema = self.get_schema_by_name(db_name, schema_name)?; - if let Some(item) = schema.get_system_table_by_name(class_name) { - return Ok(item.id().into()); - } else if let Some(item) = schema.get_table_by_name(class_name) { - return Ok(item.id().into()); - } else if let Some(item) = schema.get_index_by_name(class_name) { - return Ok(item.id.into()); - } else if let Some(item) = schema.get_source_by_name(class_name) { - return Ok(item.id); - } else if let Some(item) = schema.get_view_by_name(class_name) { - return Ok(item.id); - } - Err(CatalogError::NotFound("class", class_name.to_string())) - } - pub fn get_id_by_class_name( &self, db_name: &str, schema_path: SchemaPath<'_>, class_name: &str, ) -> CatalogResult { - match schema_path { - SchemaPath::Name(schema_name) => { - self.get_id_by_class_name_inner(db_name, schema_name, class_name) - } - SchemaPath::Path(search_path, user_name) => { - for path in search_path.path() { - let mut schema_name: &str = path; - if schema_name == USER_NAME_WILD_CARD { - schema_name = user_name; - } - - if let Ok(id) = - self.get_id_by_class_name_inner(db_name, schema_name, class_name) - { - return Ok(id); - } + schema_path + .find(|schema_name| { + let schema = self.get_schema_by_name(db_name, schema_name).ok()?; + if let Some(item) = schema.get_system_table_by_name(class_name) { + Some(item.id().into()) + } else if let Some(item) = schema.get_table_by_name(class_name) { + Some(item.id().into()) + } else if let Some(item) = schema.get_index_by_name(class_name) { + Some(item.id.into()) + } else if let Some(item) = schema.get_source_by_name(class_name) { + Some(item.id) + } else if let Some(item) = schema.get_view_by_name(class_name) { + Some(item.id) + } else { + None } - Err(CatalogError::NotFound("class", class_name.to_string())) - } - } + }) + .map(|(id, _)| id) + .ok_or_else(|| CatalogError::NotFound("class", class_name.to_string())) } } From 1ff3089e083d2a62db07d43df731e121e57a9853 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 9 Jan 2023 13:37:20 +0800 Subject: [PATCH 11/15] support DROP FUNCTION Signed-off-by: Runji Wang --- e2e_test/ddl/function.slt | 14 +++--- proto/user.proto | 1 + src/frontend/src/handler/drop_function.rs | 59 +++++++++++++++++++++-- src/frontend/src/user/user_privilege.rs | 2 + src/meta/src/manager/catalog/mod.rs | 28 ++++++++++- 5 files changed, 92 insertions(+), 12 deletions(-) diff --git a/e2e_test/ddl/function.slt b/e2e_test/ddl/function.slt index ff7e4dd6f9195..617cf0d31c28d 100644 --- a/e2e_test/ddl/function.slt +++ b/e2e_test/ddl/function.slt @@ -14,16 +14,16 @@ create function func(int) returns int as 'http://localhost:8815' language arrow_ # statement error # create function func(int) returns int as 'http://localhost:8815' language arrow_flight; -# TODO: drop function +# TODO: drop function without arguments # # Drop a function but ambiguous. # statement error is not unique # drop function func; -# # Drop a function -# statement ok -# drop function func(int); +# Drop a function +statement ok +drop function func(int); -# # Drop a function -# statement ok -# drop function func(int, int); +# Drop a function +statement ok +drop function func(int, int); diff --git a/proto/user.proto b/proto/user.proto index 2bb1670d3f29a..2b5032ca68685 100644 --- a/proto/user.proto +++ b/proto/user.proto @@ -57,6 +57,7 @@ message GrantPrivilege { uint32 source_id = 4; uint32 sink_id = 5; uint32 view_id = 6; + uint32 function_id = 8; uint32 all_tables_schema_id = 11; uint32 all_sources_schema_id = 12; diff --git a/src/frontend/src/handler/drop_function.rs b/src/frontend/src/handler/drop_function.rs index df16b039922b2..2419c1a2df67e 100644 --- a/src/frontend/src/handler/drop_function.rs +++ b/src/frontend/src/handler/drop_function.rs @@ -12,15 +12,66 @@ // See the License for the specific language governing permissions and // limitations under the License. +use pgwire::pg_response::StatementType; use risingwave_sqlparser::ast::{DropFunctionDesc, ReferentialAction}; use super::*; +use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::CatalogError; +use crate::{bind_data_type, Binder}; pub async fn handle_drop_function( - _handler_args: HandlerArgs, - _if_exists: bool, - _func_desc: Vec, + handler_args: HandlerArgs, + if_exists: bool, + mut func_desc: Vec, _option: Option, ) -> Result { - Err(ErrorCode::NotImplemented("drop function".to_string(), None.into()).into()) + if func_desc.len() != 1 { + return Err(ErrorCode::NotImplemented( + "only support dropping 1 function".to_string(), + None.into(), + ) + .into()); + } + let func_desc = func_desc.remove(0); + + let session = handler_args.session; + let db_name = session.database(); + let (schema_name, function_name) = + Binder::resolve_schema_qualified_name(db_name, func_desc.name)?; + let search_path = session.config().get_search_path(); + let user_name = &session.auth_context().user_name; + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + // TODO: argument is not specified, drop the only function with the name + let mut arg_types = vec![]; + for arg in func_desc.args.unwrap_or_default() { + arg_types.push(bind_data_type(&arg.data_type)?.into()); + } + + let function_id = { + let reader = session.env().catalog_reader().read_guard(); + match reader.get_function_by_name_args(db_name, schema_path, &function_name, &arg_types) { + Ok((function, _)) => { + if session.user_id() != function.owner { + return Err( + ErrorCode::PermissionDenied("Do not have the privilege".into()).into(), + ); + } + function.id + } + Err(CatalogError::NotFound(kind, _)) if kind == "function" && if_exists => { + return Ok(RwPgResponse::empty_result_with_notice( + StatementType::DROP_FUNCTION, + format!("function \"{}\" does not exist, skipping", function_name), + )); + } + Err(e) => return Err(e.into()), + } + }; + + let catalog_writer = session.env().catalog_writer(); + catalog_writer.drop_function(function_id).await?; + + Ok(PgResponse::empty_result(StatementType::DROP_FUNCTION)) } diff --git a/src/frontend/src/user/user_privilege.rs b/src/frontend/src/user/user_privilege.rs index c55d95ceabaf5..c03ed8133888a 100644 --- a/src/frontend/src/user/user_privilege.rs +++ b/src/frontend/src/user/user_privilege.rs @@ -34,6 +34,7 @@ static AVAILABLE_ACTION_ON_SOURCE: &[Action] = &[ static AVAILABLE_ACTION_ON_MVIEW: &[Action] = &[Action::Select { columns: None }]; static AVAILABLE_ACTION_ON_VIEW: &[Action] = AVAILABLE_ACTION_ON_MVIEW; static AVAILABLE_ACTION_ON_SINK: &[Action] = &[]; +static AVAILABLE_ACTION_ON_FUNCTION: &[Action] = &[]; pub fn check_privilege_type(privilege: &Privileges, objects: &GrantObjects) -> Result<()> { match privilege { @@ -113,6 +114,7 @@ pub fn available_prost_privilege(object: ProstObject) -> ProstPrivilege { } ProstObject::ViewId(_) => AVAILABLE_ACTION_ON_VIEW.to_vec(), ProstObject::SinkId(_) => AVAILABLE_ACTION_ON_SINK.to_vec(), + ProstObject::FunctionId(_) => AVAILABLE_ACTION_ON_FUNCTION.to_vec(), }; let actions = actions .iter() diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 32feb74b55c76..f63eb806f89e6 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -474,7 +474,33 @@ where } pub async fn drop_function(&self, function_id: FunctionId) -> MetaResult { - todo!("drop function") + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let user_core = &mut core.user; + let mut functions = BTreeMapTransaction::new(&mut database_core.functions); + let mut users = BTreeMapTransaction::new(&mut user_core.user_info); + + let function = functions + .remove(function_id) + .ok_or_else(|| anyhow!("function not found"))?; + + let objects = &[Object::FunctionId(function_id)]; + let users_need_update = Self::update_user_privileges(&mut users, objects); + + commit_meta!(self, functions, users)?; + + user_core.decrease_ref(function.owner); + + for user in users_need_update { + self.notify_frontend(Operation::Update, Info::User(user)) + .await; + } + + let version = self + .notify_frontend(Operation::Delete, Info::Function(function)) + .await; + + Ok(version) } pub async fn start_create_stream_job_procedure( From 293a9f55a80d20c538d79348d8917ef359db90f7 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 9 Jan 2023 13:51:39 +0800 Subject: [PATCH 12/15] check duplicated function on creation Signed-off-by: Runji Wang --- e2e_test/ddl/function.slt | 8 +++----- src/frontend/src/handler/create_function.rs | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/e2e_test/ddl/function.slt b/e2e_test/ddl/function.slt index 617cf0d31c28d..5f7dea9487f72 100644 --- a/e2e_test/ddl/function.slt +++ b/e2e_test/ddl/function.slt @@ -8,11 +8,9 @@ create function func(int, int) returns int as 'http://localhost:8815' language a statement ok create function func(int) returns int as 'http://localhost:8815' language arrow_flight; -# TODO: check existance - -# # Create a function with the same name and arguments. -# statement error -# create function func(int) returns int as 'http://localhost:8815' language arrow_flight; +# Create a function with the same name and arguments. +statement error exists +create function func(int) returns int as 'http://localhost:8815' language arrow_flight; # TODO: drop function without arguments diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index 42c1d51ed060e..45190941fa7a7 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use itertools::Itertools; use pgwire::pg_response::StatementType; use risingwave_common::catalog::FunctionId; use risingwave_pb::catalog::Function; @@ -20,6 +21,7 @@ use risingwave_sqlparser::ast::{ }; use super::*; +use crate::catalog::CatalogError; use crate::{bind_data_type, Binder}; pub async fn handle_create_function( @@ -72,7 +74,7 @@ pub async fn handle_create_function( }; let mut arg_types = vec![]; for arg in args.unwrap_or_default() { - arg_types.push(bind_data_type(&arg.data_type)?.into()); + arg_types.push(bind_data_type(&arg.data_type)?); } // resolve database and schema id @@ -81,12 +83,25 @@ pub async fn handle_create_function( let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; + // check if function exists + if (session.env().catalog_reader().read_guard()) + .get_schema_by_id(&database_id, &schema_id)? + .get_function_by_name_args(&function_name, &arg_types) + .is_some() + { + let name = format!( + "{function_name}({})", + arg_types.iter().map(|t| t.to_string()).join(",") + ); + return Err(CatalogError::Duplicated("function", name).into()); + } + let function = Function { id: FunctionId::placeholder().0, schema_id, database_id, name: function_name, - arg_types, + arg_types: arg_types.into_iter().map(|t| t.into()).collect(), return_type: Some(bind_data_type(&return_type)?.into()), language, path: flight_server_addr, From 45839b71463827f95c7fb9a0e8263a5787c03e40 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 9 Jan 2023 14:17:38 +0800 Subject: [PATCH 13/15] fix clippy Signed-off-by: Runji Wang --- src/expr/src/expr/expr_udf.rs | 2 ++ src/frontend/src/catalog/root_catalog.rs | 1 + src/frontend/src/expr/user_defined_function.rs | 7 +------ src/frontend/src/handler/drop_function.rs | 2 +- src/frontend/src/test_utils.rs | 4 ++-- src/meta/src/rpc/service/ddl_service.rs | 2 +- 6 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/expr/src/expr/expr_udf.rs b/src/expr/src/expr/expr_udf.rs index 7c9cb89d5a287..8858996be4121 100644 --- a/src/expr/src/expr/expr_udf.rs +++ b/src/expr/src/expr/expr_udf.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![allow(warnings)] // unfinished + use std::convert::TryFrom; use std::sync::Arc; diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 6e7ae1049b371..bcb20bf02fe4b 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -537,6 +537,7 @@ impl Catalog { schema_path .find(|schema_name| { let schema = self.get_schema_by_name(db_name, schema_name).ok()?; + #[allow(clippy::manual_map)] if let Some(item) = schema.get_system_table_by_name(class_name) { Some(item.id().into()) } else if let Some(item) = schema.get_table_by_name(class_name) { diff --git a/src/frontend/src/expr/user_defined_function.rs b/src/frontend/src/expr/user_defined_function.rs index 7ab0868b93cca..7600478be1249 100644 --- a/src/frontend/src/expr/user_defined_function.rs +++ b/src/frontend/src/expr/user_defined_function.rs @@ -14,15 +14,10 @@ use std::sync::Arc; -use itertools::Itertools; -use risingwave_common::catalog::Schema; -use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; -use risingwave_pb::catalog::Function; -use super::{cast_ok, infer_type, CastContext, Expr, ExprImpl, Literal}; +use super::{Expr, ExprImpl}; use crate::catalog::function_catalog::FunctionCatalog; -use crate::expr::{ExprDisplay, ExprType}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct UserDefinedFunction { diff --git a/src/frontend/src/handler/drop_function.rs b/src/frontend/src/handler/drop_function.rs index 2419c1a2df67e..99849a339ceb9 100644 --- a/src/frontend/src/handler/drop_function.rs +++ b/src/frontend/src/handler/drop_function.rs @@ -46,7 +46,7 @@ pub async fn handle_drop_function( // TODO: argument is not specified, drop the only function with the name let mut arg_types = vec![]; for arg in func_desc.args.unwrap_or_default() { - arg_types.push(bind_data_type(&arg.data_type)?.into()); + arg_types.push(bind_data_type(&arg.data_type)?); } let function_id = { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 31f95d3150eac..b729df2605a80 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -274,7 +274,7 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } - async fn create_function(&self, function: ProstFunction) -> Result<()> { + async fn create_function(&self, _function: ProstFunction) -> Result<()> { todo!() } @@ -363,7 +363,7 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } - async fn drop_function(&self, function_id: FunctionId) -> Result<()> { + async fn drop_function(&self, _function_id: FunctionId) -> Result<()> { todo!() } diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index b031a24b20a03..27bc681bd4b2f 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -370,7 +370,7 @@ where let version = self .catalog_manager - .drop_function(request.function_id.into()) + .drop_function(request.function_id) .await?; Ok(Response::new(DropFunctionResponse { From 419dfad8118ebde048e5ec70f5d7eb1cb9c75f38 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 9 Jan 2023 17:04:38 +0800 Subject: [PATCH 14/15] fix catalog error Signed-off-by: Runji Wang --- src/frontend/src/catalog/root_catalog.rs | 91 ++++++++++++------------ 1 file changed, 47 insertions(+), 44 deletions(-) diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index bcb20bf02fe4b..c71d193f75731 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -57,20 +57,23 @@ impl<'a> SchemaPath<'a> { } /// Call function `f` for each schema name. Return the first `Some` result. - pub fn find(&self, mut f: impl FnMut(&str) -> Option) -> Option<(T, &'a str)> { + pub fn try_find( + &self, + mut f: impl FnMut(&str) -> Result, E>, + ) -> Result, E> { match self { - SchemaPath::Name(schema_name) => f(schema_name).map(|t| (t, *schema_name)), + SchemaPath::Name(schema_name) => Ok(f(schema_name)?.map(|t| (t, *schema_name))), SchemaPath::Path(search_path, user_name) => { for schema_name in search_path.path() { let mut schema_name: &str = schema_name; if schema_name == USER_NAME_WILD_CARD { schema_name = user_name; } - if let Some(res) = f(schema_name) { - return Some((res, schema_name)); + if let Ok(Some(res)) = f(schema_name) { + return Ok(Some((res, schema_name))); } } - None + Ok(None) } } } @@ -355,11 +358,11 @@ impl Catalog { table_name: &str, ) -> CatalogResult<(&Arc, &'a str)> { schema_path - .find(|schema_name| { - self.get_schema_by_name(db_name, schema_name) - .ok()? - .get_table_by_name(table_name) - }) + .try_find(|schema_name| { + Ok(self + .get_schema_by_name(db_name, schema_name)? + .get_table_by_name(table_name)) + })? .ok_or_else(|| CatalogError::NotFound("table", table_name.to_string())) } @@ -400,11 +403,11 @@ impl Catalog { source_name: &str, ) -> CatalogResult<(&Arc, &'a str)> { schema_path - .find(|schema_name| { - self.get_schema_by_name(db_name, schema_name) - .ok()? - .get_source_by_name(source_name) - }) + .try_find(|schema_name| { + Ok(self + .get_schema_by_name(db_name, schema_name)? + .get_source_by_name(source_name)) + })? .ok_or_else(|| CatalogError::NotFound("source", source_name.to_string())) } @@ -415,11 +418,11 @@ impl Catalog { sink_name: &str, ) -> CatalogResult<(&Arc, &'a str)> { schema_path - .find(|schema_name| { - self.get_schema_by_name(db_name, schema_name) - .ok()? - .get_sink_by_name(sink_name) - }) + .try_find(|schema_name| { + Ok(self + .get_schema_by_name(db_name, schema_name)? + .get_sink_by_name(sink_name)) + })? .ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_string())) } @@ -430,11 +433,11 @@ impl Catalog { index_name: &str, ) -> CatalogResult<(&Arc, &'a str)> { schema_path - .find(|schema_name| { - self.get_schema_by_name(db_name, schema_name) - .ok()? - .get_index_by_name(index_name) - }) + .try_find(|schema_name| { + Ok(self + .get_schema_by_name(db_name, schema_name)? + .get_index_by_name(index_name)) + })? .ok_or_else(|| CatalogError::NotFound("index", index_name.to_string())) } @@ -445,11 +448,11 @@ impl Catalog { view_name: &str, ) -> CatalogResult<(&Arc, &'a str)> { schema_path - .find(|schema_name| { - self.get_schema_by_name(db_name, schema_name) - .ok()? - .get_view_by_name(view_name) - }) + .try_find(|schema_name| { + Ok(self + .get_schema_by_name(db_name, schema_name)? + .get_view_by_name(view_name)) + })? .ok_or_else(|| CatalogError::NotFound("view", view_name.to_string())) } @@ -461,11 +464,11 @@ impl Catalog { args: &[DataType], ) -> CatalogResult<(&Arc, &'a str)> { schema_path - .find(|schema_name| { - self.get_schema_by_name(db_name, schema_name) - .ok()? - .get_function_by_name_args(function_name, args) - }) + .try_find(|schema_name| { + Ok(self + .get_schema_by_name(db_name, schema_name)? + .get_function_by_name_args(function_name, args)) + })? .ok_or_else(|| CatalogError::NotFound("function", function_name.to_string())) } @@ -535,23 +538,23 @@ impl Catalog { class_name: &str, ) -> CatalogResult { schema_path - .find(|schema_name| { - let schema = self.get_schema_by_name(db_name, schema_name).ok()?; + .try_find(|schema_name| { + let schema = self.get_schema_by_name(db_name, schema_name)?; #[allow(clippy::manual_map)] if let Some(item) = schema.get_system_table_by_name(class_name) { - Some(item.id().into()) + Ok(Some(item.id().into())) } else if let Some(item) = schema.get_table_by_name(class_name) { - Some(item.id().into()) + Ok(Some(item.id().into())) } else if let Some(item) = schema.get_index_by_name(class_name) { - Some(item.id.into()) + Ok(Some(item.id.into())) } else if let Some(item) = schema.get_source_by_name(class_name) { - Some(item.id) + Ok(Some(item.id)) } else if let Some(item) = schema.get_view_by_name(class_name) { - Some(item.id) + Ok(Some(item.id)) } else { - None + Ok(None) } - }) + })? .map(|(id, _)| id) .ok_or_else(|| CatalogError::NotFound("class", class_name.to_string())) } From 09dd229984402c176b426dace4bb33ab298d5c08 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 9 Jan 2023 17:04:45 +0800 Subject: [PATCH 15/15] fix proto format Signed-off-by: Runji Wang --- proto/catalog.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index f1acc672dd39c..16b8800ef06fe 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -2,9 +2,9 @@ syntax = "proto3"; package catalog; +import "data.proto"; import "expr.proto"; import "plan_common.proto"; -import "data.proto"; option optimize_for = SPEED;