Skip to content

Commit

Permalink
feat(source): introduce create/show connection statement (risingwavel…
Browse files Browse the repository at this point in the history
  • Loading branch information
WillyKidd authored Apr 6, 2023
1 parent 7c9da96 commit f1b7942
Show file tree
Hide file tree
Showing 32 changed files with 530 additions and 49 deletions.
6 changes: 4 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ message Sink {
string definition = 13;
}

// TODO: add database_id and schema_id
message Connection {
message PrivateLinkService {
string provider = 1;
string endpoint_id = 2;
map<string, string> dns_entries = 3;
string service_name = 2;
string endpoint_id = 3;
map<string, string> dns_entries = 4;
}

uint32 id = 1;
Expand Down
9 changes: 6 additions & 3 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ message CreateConnectionRequest {
string service_name = 2;
repeated string availability_zones = 3;
}

string name = 1;
oneof payload {
PrivateLink private_link = 1;
PrivateLink private_link = 2;
}
}

Expand All @@ -281,7 +281,10 @@ message DropConnectionRequest {
string connection_name = 1;
}

message DropConnectionResponse {}
message DropConnectionResponse {
common.Status status = 1;
uint64 version = 2;
}

service DdlService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
Expand Down
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ message MetaSnapshot {
repeated catalog.Index indexes = 6;
repeated catalog.View views = 7;
repeated catalog.Function functions = 15;
repeated catalog.Connection connections = 17;
repeated user.UserInfo users = 8;
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
Expand Down Expand Up @@ -290,6 +291,7 @@ message SubscribeResponse {
SystemParams system_params = 19;
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
catalog.Connection connection = 22;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ where
| Info::Schema(_)
| Info::RelationGroup(_)
| Info::User(_)
| Info::Connection(_)
| Info::Function(_) => {
notification.version > info.version.as_ref().unwrap().catalog_version
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::common::KafkaCommon;
pub const KAFKA_CONNECTOR: &str = "kafka";
pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";
pub const PRIVATELINK_CONNECTION: &str = "privatelink";

#[derive(Clone, Debug, Deserialize)]
pub struct KafkaProperties {
Expand Down
12 changes: 7 additions & 5 deletions src/ctl/src/cmd_impl/meta/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::common::CtlContext;

pub async fn create_connection(
context: &CtlContext,
connection_name: String,
provider: String,
service_name: String,
availability_zone: String,
Expand All @@ -29,14 +30,15 @@ pub async fn create_connection(
.split(',')
.map(|str| str.to_string())
.collect();
let conn_id = meta_client
.create_connection(create_connection_request::Payload::PrivateLink(
PrivateLink {
let (conn_id, _) = meta_client
.create_connection(
connection_name,
create_connection_request::Payload::PrivateLink(PrivateLink {
provider,
service_name,
availability_zones,
},
))
}),
)
.await?;

println!("Create connection success id#{}", conn_id);
Expand Down
13 changes: 11 additions & 2 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ enum MetaCommands {

/// Create a new connection object
CreateConnection {
#[clap(long)]
connection_name: String,
#[clap(long)]
provider: String,
#[clap(long)]
Expand Down Expand Up @@ -350,12 +352,19 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
}
Commands::Meta(MetaCommands::CreateConnection {
connection_name,
provider,
service_name,
availability_zones,
}) => {
cmd_impl::meta::create_connection(context, provider, service_name, availability_zones)
.await?
cmd_impl::meta::create_connection(
context,
connection_name,
provider,
service_name,
availability_zones,
)
.await?
}
Commands::Meta(MetaCommands::ListConnections) => {
cmd_impl::meta::list_connections(context).await?
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ impl Binder {
Self::resolve_single_name(name.0, "user name")
}

/// return the `connection_name`
pub fn resolve_connection_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "connection name")
}

/// Fill the [`BindContext`](super::BindContext) for table.
pub(super) fn bind_table_to_context(
&mut self,
Expand Down
28 changes: 27 additions & 1 deletion src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::create_connection_request;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_rpc_client::MetaClient;
use tokio::sync::watch::Receiver;
Expand All @@ -48,7 +49,7 @@ impl CatalogReader {
}
}

/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function).
/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
/// It will only send rpc to meta and get the catalog version as response.
/// Then it will wait for the local catalog to be synced to the version, which is performed by
/// [observer](`crate::observer::FrontendObserverNode`).
Expand Down Expand Up @@ -98,6 +99,12 @@ pub trait CatalogWriter: Send + Sync {

async fn create_function(&self, function: PbFunction) -> Result<()>;

async fn create_connection(
&self,
connection_name: String,
connection: create_connection_request::Payload,
) -> Result<()>;

async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()>;

async fn drop_materialized_view(&self, table_id: TableId) -> Result<()>;
Expand All @@ -116,6 +123,8 @@ pub trait CatalogWriter: Send + Sync {

async fn drop_function(&self, function_id: FunctionId) -> Result<()>;

async fn drop_connection(&self, connection_name: &str) -> Result<()>;

async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()>;

async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()>;
Expand Down Expand Up @@ -231,6 +240,18 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn create_connection(
&self,
connection_name: String,
connection: create_connection_request::Payload,
) -> Result<()> {
let (_, version) = self
.meta_client
.create_connection(connection_name, connection)
.await?;
self.wait_version(version).await
}

async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()> {
let version = self.meta_client.drop_table(source_id, table_id).await?;
self.wait_version(version).await
Expand Down Expand Up @@ -276,6 +297,11 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn drop_connection(&self, connection_name: &str) -> Result<()> {
let version = self.meta_client.drop_connection(connection_name).await?;
self.wait_version(version).await
}

async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> {
let version = self
.meta_client
Expand Down
34 changes: 34 additions & 0 deletions src/frontend/src/catalog/connection_catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::catalog::{connection, PbConnection};

use super::ConnectionId;

#[derive(Clone, Debug, PartialEq)]
pub struct ConnectionCatalog {
pub id: ConnectionId,
pub name: String,
pub info: connection::Info,
}

impl From<&PbConnection> for ConnectionCatalog {
fn from(prost: &PbConnection) -> Self {
Self {
id: prost.id,
name: prost.name.clone(),
info: prost.info.clone().unwrap(),
}
}
}
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_connector::sink::catalog::SinkCatalog;
use thiserror::Error;
pub(crate) mod catalog_service;

pub(crate) mod connection_catalog;
pub(crate) mod database_catalog;
pub(crate) mod function_catalog;
pub(crate) mod index_catalog;
Expand All @@ -39,6 +40,7 @@ pub use table_catalog::TableCatalog;

use crate::user::UserId;

pub(crate) type ConnectionId = u32;
pub(crate) type SourceId = u32;
pub(crate) type SinkId = u32;
pub(crate) type ViewId = u32;
Expand Down
39 changes: 37 additions & 2 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
use risingwave_common::types::DataType;
use risingwave_connector::sink::catalog::SinkCatalog;
use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};

use super::function_catalog::FunctionCatalog;
use super::source_catalog::SourceCatalog;
use super::system_catalog::get_sys_catalogs_in_schema;
use super::view_catalog::ViewCatalog;
use super::{CatalogError, CatalogResult, SinkId, SourceId, ViewId};
use crate::catalog::connection_catalog::ConnectionCatalog;
use crate::catalog::database_catalog::DatabaseCatalog;
use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::system_catalog::SystemCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
use crate::catalog::{ConnectionId, DatabaseId, IndexCatalog, SchemaId};

#[derive(Copy, Clone)]
pub enum SchemaPath<'a> {
Expand Down Expand Up @@ -94,6 +95,8 @@ pub struct Catalog {
db_name_by_id: HashMap<DatabaseId, String>,
/// all table catalogs in the cluster identified by universal unique table id.
table_by_id: HashMap<TableId, TableCatalog>,
connection_by_id: HashMap<ConnectionId, ConnectionCatalog>,
connection_id_by_name: HashMap<String, ConnectionId>,
}

#[expect(clippy::derivable_impls)]
Expand All @@ -104,6 +107,8 @@ impl Default for Catalog {
database_by_name: HashMap::new(),
db_name_by_id: HashMap::new(),
table_by_id: HashMap::new(),
connection_by_id: HashMap::new(), // TODO: move to schema_catalog
connection_id_by_name: HashMap::new(), // TODO: move to schema_catalog
}
}
}
Expand Down Expand Up @@ -195,6 +200,19 @@ impl Catalog {
.create_function(proto);
}

pub fn create_connection(&mut self, proto: &PbConnection) {
let name = proto.name.clone();
let id = proto.id;

self.connection_by_id.try_insert(id, proto.into()).unwrap();
self.connection_id_by_name.try_insert(name, id).unwrap();
}

pub fn drop_connection(&mut self, connection_name: &str) {
let id = self.connection_id_by_name.remove(connection_name).unwrap();
let _connection = self.connection_by_id.remove(&id).unwrap();
}

pub fn drop_database(&mut self, db_id: DatabaseId) {
let name = self.db_name_by_id.remove(&db_id).unwrap();
let _database = self.database_by_name.remove(&name).unwrap();
Expand Down Expand Up @@ -526,6 +544,19 @@ impl Catalog {
.ok_or_else(|| CatalogError::NotFound("function", function_name.to_string()))
}

pub fn get_connection_by_name(
&self,
connection_name: &str,
) -> CatalogResult<&ConnectionCatalog> {
let connection_id = self
.connection_id_by_name
.get(connection_name)
.ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_string()))?;
self.connection_by_id
.get(connection_id)
.ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_string()))
}

/// Check the name if duplicated with existing table, materialized view or source.
pub fn check_relation_name_duplicated(
&self,
Expand Down Expand Up @@ -610,4 +641,8 @@ impl Catalog {
.map(|(id, _)| id)
.ok_or_else(|| CatalogError::NotFound("class", class_name.to_string()))
}

pub fn get_all_connections(&self) -> Vec<ConnectionCatalog> {
self.connection_by_id.values().cloned().collect_vec()
}
}
7 changes: 2 additions & 5 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@ use risingwave_common::types::DataType;
use risingwave_connector::sink::catalog::SinkCatalog;
use risingwave_pb::catalog::{PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView};

use super::source_catalog::SourceCatalog;
use super::ViewId;
use super::{SinkId, SourceId, ViewId};
use crate::catalog::function_catalog::FunctionCatalog;
use crate::catalog::index_catalog::IndexCatalog;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::system_catalog::SystemCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::view_catalog::ViewCatalog;
use crate::catalog::SchemaId;

pub type SourceId = u32;
pub type SinkId = u32;

#[derive(Clone, Debug)]
pub struct SchemaCatalog {
id: SchemaId,
Expand Down
Loading

0 comments on commit f1b7942

Please sign in to comment.