Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): introduce create/show connection statement #8907

Merged
merged 20 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ message Sink {
string definition = 13;
}

// TODO: add database_id and schema_id
message Connection {
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
message PrivateLinkService {
string provider = 1;
string endpoint_id = 2;
map<string, string> dns_entries = 3;
string service_name = 2;
string endpoint_id = 3;
map<string, string> dns_entries = 4;
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
}

uint32 id = 1;
Expand Down
9 changes: 6 additions & 3 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ message CreateConnectionRequest {
string service_name = 2;
repeated string availability_zones = 3;
}

string name = 1;
oneof payload {
PrivateLink private_link = 1;
PrivateLink private_link = 2;
}
}

Expand All @@ -281,7 +281,10 @@ message DropConnectionRequest {
string connection_name = 1;
}

message DropConnectionResponse {}
message DropConnectionResponse {
common.Status status = 1;
uint64 version = 2;
}

service DdlService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
Expand Down
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ message MetaSnapshot {
repeated catalog.Index indexes = 6;
repeated catalog.View views = 7;
repeated catalog.Function functions = 15;
repeated catalog.Connection connections = 17;
repeated user.UserInfo users = 8;
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
Expand Down Expand Up @@ -290,6 +291,7 @@ message SubscribeResponse {
SystemParams system_params = 19;
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
catalog.Connection connection = 22;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ where
| Info::Schema(_)
| Info::RelationGroup(_)
| Info::User(_)
| Info::Connection(_)
| Info::Function(_) => {
notification.version > info.version.as_ref().unwrap().catalog_version
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::common::KafkaCommon;
pub const KAFKA_CONNECTOR: &str = "kafka";
pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";
pub const PRIVATELINK_CONNECTION: &str = "privatelink";

#[derive(Clone, Debug, Deserialize)]
pub struct KafkaProperties {
Expand Down
12 changes: 7 additions & 5 deletions src/ctl/src/cmd_impl/meta/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::common::CtlContext;

pub async fn create_connection(
context: &CtlContext,
connection_name: String,
provider: String,
service_name: String,
availability_zone: String,
Expand All @@ -29,14 +30,15 @@ pub async fn create_connection(
.split(',')
.map(|str| str.to_string())
.collect();
let conn_id = meta_client
.create_connection(create_connection_request::Payload::PrivateLink(
PrivateLink {
let (conn_id, _) = meta_client
.create_connection(
connection_name,
create_connection_request::Payload::PrivateLink(PrivateLink {
provider,
service_name,
availability_zones,
},
))
}),
)
.await?;

println!("Create connection success id#{}", conn_id);
Expand Down
13 changes: 11 additions & 2 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ enum MetaCommands {

/// Create a new connection object
CreateConnection {
#[clap(long)]
connection_name: String,
#[clap(long)]
provider: String,
#[clap(long)]
Expand Down Expand Up @@ -350,12 +352,19 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
}
Commands::Meta(MetaCommands::CreateConnection {
connection_name,
provider,
service_name,
availability_zones,
}) => {
cmd_impl::meta::create_connection(context, provider, service_name, availability_zones)
.await?
cmd_impl::meta::create_connection(
context,
connection_name,
provider,
service_name,
availability_zones,
)
.await?
}
Commands::Meta(MetaCommands::ListConnections) => {
cmd_impl::meta::list_connections(context).await?
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ impl Binder {
Self::resolve_single_name(name.0, "user name")
}

/// return the `connection_name`
pub fn resolve_connection_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "connection name")
}

/// Fill the [`BindContext`](super::BindContext) for table.
pub(super) fn bind_table_to_context(
&mut self,
Expand Down
28 changes: 27 additions & 1 deletion src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::create_connection_request;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_rpc_client::MetaClient;
use tokio::sync::watch::Receiver;
Expand All @@ -48,7 +49,7 @@ impl CatalogReader {
}
}

/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function).
/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
/// It will only send rpc to meta and get the catalog version as response.
/// Then it will wait for the local catalog to be synced to the version, which is performed by
/// [observer](`crate::observer::FrontendObserverNode`).
Expand Down Expand Up @@ -98,6 +99,12 @@ pub trait CatalogWriter: Send + Sync {

async fn create_function(&self, function: PbFunction) -> Result<()>;

async fn create_connection(
&self,
connection_name: String,
connection: create_connection_request::Payload,
) -> Result<()>;

async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()>;

async fn drop_materialized_view(&self, table_id: TableId) -> Result<()>;
Expand All @@ -116,6 +123,8 @@ pub trait CatalogWriter: Send + Sync {

async fn drop_function(&self, function_id: FunctionId) -> Result<()>;

async fn drop_connection(&self, connection_name: &str) -> Result<()>;

async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()>;

async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()>;
Expand Down Expand Up @@ -231,6 +240,18 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn create_connection(
&self,
connection_name: String,
connection: create_connection_request::Payload,
) -> Result<()> {
let (_, version) = self
.meta_client
.create_connection(connection_name, connection)
.await?;
self.wait_version(version).await
}

async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()> {
let version = self.meta_client.drop_table(source_id, table_id).await?;
self.wait_version(version).await
Expand Down Expand Up @@ -276,6 +297,11 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn drop_connection(&self, connection_name: &str) -> Result<()> {
let version = self.meta_client.drop_connection(connection_name).await?;
self.wait_version(version).await
}

async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> {
let version = self
.meta_client
Expand Down
34 changes: 34 additions & 0 deletions src/frontend/src/catalog/connection_catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::catalog::{connection, PbConnection};

use super::ConnectionId;

#[derive(Clone, Debug, PartialEq)]
pub struct ConnectionCatalog {
pub id: ConnectionId,
pub name: String,
pub info: connection::Info,
}

impl From<&PbConnection> for ConnectionCatalog {
fn from(prost: &PbConnection) -> Self {
Self {
id: prost.id,
name: prost.name.clone(),
info: prost.info.clone().unwrap(),
}
}
}
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_connector::sink::catalog::SinkCatalog;
use thiserror::Error;
pub(crate) mod catalog_service;

pub(crate) mod connection_catalog;
pub(crate) mod database_catalog;
pub(crate) mod function_catalog;
pub(crate) mod index_catalog;
Expand All @@ -39,6 +40,7 @@ pub use table_catalog::TableCatalog;

use crate::user::UserId;

pub(crate) type ConnectionId = u32;
pub(crate) type SourceId = u32;
pub(crate) type SinkId = u32;
pub(crate) type ViewId = u32;
Expand Down
39 changes: 37 additions & 2 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
use risingwave_common::types::DataType;
use risingwave_connector::sink::catalog::SinkCatalog;
use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};

use super::function_catalog::FunctionCatalog;
use super::source_catalog::SourceCatalog;
use super::system_catalog::get_sys_catalogs_in_schema;
use super::view_catalog::ViewCatalog;
use super::{CatalogError, CatalogResult, SinkId, SourceId, ViewId};
use crate::catalog::connection_catalog::ConnectionCatalog;
use crate::catalog::database_catalog::DatabaseCatalog;
use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::system_catalog::SystemCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
use crate::catalog::{ConnectionId, DatabaseId, IndexCatalog, SchemaId};

#[derive(Copy, Clone)]
pub enum SchemaPath<'a> {
Expand Down Expand Up @@ -94,6 +95,8 @@ pub struct Catalog {
db_name_by_id: HashMap<DatabaseId, String>,
/// all table catalogs in the cluster identified by universal unique table id.
table_by_id: HashMap<TableId, TableCatalog>,
connection_by_id: HashMap<ConnectionId, ConnectionCatalog>,
connection_id_by_name: HashMap<String, ConnectionId>,
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
}

#[expect(clippy::derivable_impls)]
Expand All @@ -104,6 +107,8 @@ impl Default for Catalog {
database_by_name: HashMap::new(),
db_name_by_id: HashMap::new(),
table_by_id: HashMap::new(),
connection_by_id: HashMap::new(), // TODO: move to schema_catalog
connection_id_by_name: HashMap::new(), // TODO: move to schema_catalog
}
}
}
Expand Down Expand Up @@ -195,6 +200,19 @@ impl Catalog {
.create_function(proto);
}

pub fn create_connection(&mut self, proto: &PbConnection) {
let name = proto.name.clone();
let id = proto.id;

self.connection_by_id.try_insert(id, proto.into()).unwrap();
self.connection_id_by_name.try_insert(name, id).unwrap();
}

pub fn drop_connection(&mut self, connection_name: &str) {
let id = self.connection_id_by_name.remove(connection_name).unwrap();
let _connection = self.connection_by_id.remove(&id).unwrap();
}

pub fn drop_database(&mut self, db_id: DatabaseId) {
let name = self.db_name_by_id.remove(&db_id).unwrap();
let _database = self.database_by_name.remove(&name).unwrap();
Expand Down Expand Up @@ -526,6 +544,19 @@ impl Catalog {
.ok_or_else(|| CatalogError::NotFound("function", function_name.to_string()))
}

pub fn get_connection_by_name(
&self,
connection_name: &str,
) -> CatalogResult<&ConnectionCatalog> {
let connection_id = self
.connection_id_by_name
.get(connection_name)
.ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_string()))?;
self.connection_by_id
.get(connection_id)
.ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_string()))
}

/// Check the name if duplicated with existing table, materialized view or source.
pub fn check_relation_name_duplicated(
&self,
Expand Down Expand Up @@ -610,4 +641,8 @@ impl Catalog {
.map(|(id, _)| id)
.ok_or_else(|| CatalogError::NotFound("class", class_name.to_string()))
}

pub fn get_all_connections(&self) -> Vec<ConnectionCatalog> {
self.connection_by_id.values().cloned().collect_vec()
}
}
7 changes: 2 additions & 5 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@ use risingwave_common::types::DataType;
use risingwave_connector::sink::catalog::SinkCatalog;
use risingwave_pb::catalog::{PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView};

use super::source_catalog::SourceCatalog;
use super::ViewId;
use super::{SinkId, SourceId, ViewId};
use crate::catalog::function_catalog::FunctionCatalog;
use crate::catalog::index_catalog::IndexCatalog;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::system_catalog::SystemCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::view_catalog::ViewCatalog;
use crate::catalog::SchemaId;

pub type SourceId = u32;
pub type SinkId = u32;

#[derive(Clone, Debug)]
pub struct SchemaCatalog {
id: SchemaId,
Expand Down
Loading