diff --git a/proto/catalog.proto b/proto/catalog.proto
index 8e29b7062c582..37e467f7d01c8 100644
--- a/proto/catalog.proto
+++ b/proto/catalog.proto
@@ -89,11 +89,13 @@ message Sink {
   string definition = 13;
 }
 
+// TODO: add database_id and schema_id
 message Connection {
   message PrivateLinkService {
     string provider = 1;
-    string endpoint_id = 2;
-    map<string, string> dns_entries = 3;
+    string service_name = 2;
+    string endpoint_id = 3;
+    map<string, string> dns_entries = 4;
   }
 
   uint32 id = 1;
diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto
index ab568ee6da27f..7d9d44578f36f 100644
--- a/proto/ddl_service.proto
+++ b/proto/ddl_service.proto
@@ -259,9 +259,9 @@ message CreateConnectionRequest {
     string service_name = 2;
     repeated string availability_zones = 3;
   }
-
+  string name = 1;
   oneof payload {
-    PrivateLink private_link = 1;
+    PrivateLink private_link = 2;
   }
 }
 
@@ -281,7 +281,10 @@ message DropConnectionRequest {
   string connection_name = 1;
 }
 
-message DropConnectionResponse {}
+message DropConnectionResponse {
+  common.Status status = 1;
+  uint64 version = 2;
+}
 
 service DdlService {
   rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
diff --git a/proto/meta.proto b/proto/meta.proto
index e3a500ad4ac37..d5fbe119754d0 100644
--- a/proto/meta.proto
+++ b/proto/meta.proto
@@ -240,6 +240,7 @@ message MetaSnapshot {
   repeated catalog.Index indexes = 6;
   repeated catalog.View views = 7;
   repeated catalog.Function functions = 15;
+  repeated catalog.Connection connections = 17;
   repeated user.UserInfo users = 8;
   repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
   repeated common.WorkerNode nodes = 10;
@@ -290,6 +291,7 @@ message SubscribeResponse {
     SystemParams system_params = 19;
     hummock.WriteLimits hummock_write_limits = 20;
     RelationGroup relation_group = 21;
+    catalog.Connection connection = 22;
   }
 }
 
diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs
index 271fb39675e81..559bfe9076211 100644
--- a/src/common/common_service/src/observer_manager.rs
+++ b/src/common/common_service/src/observer_manager.rs
@@ -120,6 +120,7 @@ where
             | Info::Schema(_)
             | Info::RelationGroup(_)
             | Info::User(_)
+            | Info::Connection(_)
             | Info::Function(_) => {
                 notification.version > info.version.as_ref().unwrap().catalog_version
             }
diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs
index 27e9406119ef6..afd039e922160 100644
--- a/src/connector/src/source/kafka/mod.rs
+++ b/src/connector/src/source/kafka/mod.rs
@@ -30,6 +30,7 @@ use crate::common::KafkaCommon;
 pub const KAFKA_CONNECTOR: &str = "kafka";
 pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
 pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";
+pub const PRIVATELINK_CONNECTION: &str = "privatelink";
 
 #[derive(Clone, Debug, Deserialize)]
 pub struct KafkaProperties {
diff --git a/src/ctl/src/cmd_impl/meta/connection.rs b/src/ctl/src/cmd_impl/meta/connection.rs
index 32e1c420c48b3..28e3862cc7006 100644
--- a/src/ctl/src/cmd_impl/meta/connection.rs
+++ b/src/ctl/src/cmd_impl/meta/connection.rs
@@ -20,6 +20,7 @@ use crate::common::CtlContext;
 
 pub async fn create_connection(
     context: &CtlContext,
+    connection_name: String,
     provider: String,
     service_name: String,
     availability_zone: String,
@@ -29,14 +30,15 @@ pub async fn create_connection(
         .split(',')
         .map(|str| str.to_string())
         .collect();
-    let conn_id = meta_client
-        .create_connection(create_connection_request::Payload::PrivateLink(
-            PrivateLink {
+    let (conn_id, _) = meta_client
+        .create_connection(
+            connection_name,
+            create_connection_request::Payload::PrivateLink(PrivateLink {
                 provider,
                 service_name,
                 availability_zones,
-            },
-        ))
+            }),
+        )
         .await?;
 
     println!("Create connection success id#{}", conn_id);
diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs
index 5c6b57f888bf7..7d73d6fe5a316 100644
--- a/src/ctl/src/lib.rs
+++ b/src/ctl/src/lib.rs
@@ -215,6 +215,8 @@ enum MetaCommands {
 
     /// Create a new connection object
     CreateConnection {
+        #[clap(long)]
+        connection_name: String,
         #[clap(long)]
         provider: String,
         #[clap(long)]
@@ -350,12 +352,19 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
             cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
         }
         Commands::Meta(MetaCommands::CreateConnection {
+            connection_name,
             provider,
             service_name,
             availability_zones,
         }) => {
-            cmd_impl::meta::create_connection(context, provider, service_name, availability_zones)
-                .await?
+            cmd_impl::meta::create_connection(
+                context,
+                connection_name,
+                provider,
+                service_name,
+                availability_zones,
+            )
+            .await?
         }
         Commands::Meta(MetaCommands::ListConnections) => {
             cmd_impl::meta::list_connections(context).await?
diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs
index 48fd90bd7da23..e0af5bb533ab0 100644
--- a/src/frontend/src/binder/relation/mod.rs
+++ b/src/frontend/src/binder/relation/mod.rs
@@ -213,6 +213,11 @@ impl Binder {
         Self::resolve_single_name(name.0, "user name")
     }
 
+    /// return the `connection_name`
+    pub fn resolve_connection_name(name: ObjectName) -> Result<String> {
+        Self::resolve_single_name(name.0, "connection name")
+    }
+
     /// Fill the [`BindContext`](super::BindContext) for table.
     pub(super) fn bind_table_to_context(
         &mut self,
diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs
index 04c888cc93872..d442e254ae36b 100644
--- a/src/frontend/src/catalog/catalog_service.rs
+++ b/src/frontend/src/catalog/catalog_service.rs
@@ -24,6 +24,7 @@ use risingwave_pb::catalog::{
     PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
 };
 use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
+use risingwave_pb::ddl_service::create_connection_request;
 use risingwave_pb::stream_plan::StreamFragmentGraph;
 use risingwave_rpc_client::MetaClient;
 use tokio::sync::watch::Receiver;
@@ -48,7 +49,7 @@ impl CatalogReader {
     }
 }
 
-/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function).
+/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
 /// It will only send rpc to meta and get the catalog version as response.
 /// Then it will wait for the local catalog to be synced to the version, which is performed by
 /// [observer](`crate::observer::FrontendObserverNode`).
@@ -98,6 +99,12 @@ pub trait CatalogWriter: Send + Sync {
 
     async fn create_function(&self, function: PbFunction) -> Result<()>;
 
+    async fn create_connection(
+        &self,
+        connection_name: String,
+        connection: create_connection_request::Payload,
+    ) -> Result<()>;
+
     async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()>;
 
     async fn drop_materialized_view(&self, table_id: TableId) -> Result<()>;
@@ -116,6 +123,8 @@ pub trait CatalogWriter: Send + Sync {
 
     async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
 
+    async fn drop_connection(&self, connection_name: &str) -> Result<()>;
+
     async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()>;
 
     async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()>;
@@ -231,6 +240,18 @@ impl CatalogWriter for CatalogWriterImpl {
         self.wait_version(version).await
     }
 
+    async fn create_connection(
+        &self,
+        connection_name: String,
+        connection: create_connection_request::Payload,
+    ) -> Result<()> {
+        let (_, version) = self
+            .meta_client
+            .create_connection(connection_name, connection)
+            .await?;
+        self.wait_version(version).await
+    }
+
     async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()> {
         let version = self.meta_client.drop_table(source_id, table_id).await?;
         self.wait_version(version).await
@@ -276,6 +297,11 @@ impl CatalogWriter for CatalogWriterImpl {
         self.wait_version(version).await
     }
 
+    async fn drop_connection(&self, connection_name: &str) -> Result<()> {
+        let version = self.meta_client.drop_connection(connection_name).await?;
+        self.wait_version(version).await
+    }
+
     async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> {
         let version = self
             .meta_client
diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs
new file mode 100644
index 0000000000000..e0fb2f453eb03
--- /dev/null
+++ b/src/frontend/src/catalog/connection_catalog.rs
@@ -0,0 +1,34 @@
+// Copyright 2023 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use risingwave_pb::catalog::{connection, PbConnection};
+
+use super::ConnectionId;
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct ConnectionCatalog {
+    pub id: ConnectionId,
+    pub name: String,
+    pub info: connection::Info,
+}
+
+impl From<&PbConnection> for ConnectionCatalog {
+    fn from(prost: &PbConnection) -> Self {
+        Self {
+            id: prost.id,
+            name: prost.name.clone(),
+            info: prost.info.clone().unwrap(),
+        }
+    }
+}
diff --git a/src/frontend/src/catalog/mod.rs b/src/frontend/src/catalog/mod.rs
index dc4d9e9729683..e75e2920dea79 100644
--- a/src/frontend/src/catalog/mod.rs
+++ b/src/frontend/src/catalog/mod.rs
@@ -24,6 +24,7 @@ use risingwave_connector::sink::catalog::SinkCatalog;
 use thiserror::Error;
 pub(crate) mod catalog_service;
 
+pub(crate) mod connection_catalog;
 pub(crate) mod database_catalog;
 pub(crate) mod function_catalog;
 pub(crate) mod index_catalog;
@@ -39,6 +40,7 @@ pub use table_catalog::TableCatalog;
 
 use crate::user::UserId;
 
+pub(crate) type ConnectionId = u32;
 pub(crate) type SourceId = u32;
 pub(crate) type SinkId = u32;
 pub(crate) type ViewId = u32;
diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs
index b563dd38bb8e4..5d8a32c4a17b0 100644
--- a/src/frontend/src/catalog/root_catalog.rs
+++ b/src/frontend/src/catalog/root_catalog.rs
@@ -21,7 +21,7 @@ use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
 use risingwave_common::types::DataType;
 use risingwave_connector::sink::catalog::SinkCatalog;
 use risingwave_pb::catalog::{
-    PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
+    PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
 };
 
 use super::function_catalog::FunctionCatalog;
@@ -29,11 +29,12 @@ use super::source_catalog::SourceCatalog;
 use super::system_catalog::get_sys_catalogs_in_schema;
 use super::view_catalog::ViewCatalog;
 use super::{CatalogError, CatalogResult, SinkId, SourceId, ViewId};
+use crate::catalog::connection_catalog::ConnectionCatalog;
 use crate::catalog::database_catalog::DatabaseCatalog;
 use crate::catalog::schema_catalog::SchemaCatalog;
 use crate::catalog::system_catalog::SystemCatalog;
 use crate::catalog::table_catalog::TableCatalog;
-use crate::catalog::{DatabaseId, IndexCatalog, SchemaId};
+use crate::catalog::{ConnectionId, DatabaseId, IndexCatalog, SchemaId};
 
 #[derive(Copy, Clone)]
 pub enum SchemaPath<'a> {
@@ -94,6 +95,8 @@ pub struct Catalog {
     db_name_by_id: HashMap<DatabaseId, String>,
     /// all table catalogs in the cluster identified by universal unique table id.
     table_by_id: HashMap<TableId, TableCatalog>,
+    connection_by_id: HashMap<ConnectionId, ConnectionCatalog>,
+    connection_id_by_name: HashMap<String, ConnectionId>,
 }
 
 #[expect(clippy::derivable_impls)]
@@ -104,6 +107,8 @@ impl Default for Catalog {
             database_by_name: HashMap::new(),
             db_name_by_id: HashMap::new(),
             table_by_id: HashMap::new(),
+            connection_by_id: HashMap::new(), // TODO: move to schema_catalog
+            connection_id_by_name: HashMap::new(), // TODO: move to schema_catalog
         }
     }
 }
@@ -195,6 +200,19 @@ impl Catalog {
             .create_function(proto);
     }
 
+    pub fn create_connection(&mut self, proto: &PbConnection) {
+        let name = proto.name.clone();
+        let id = proto.id;
+
+        self.connection_by_id.try_insert(id, proto.into()).unwrap();
+        self.connection_id_by_name.try_insert(name, id).unwrap();
+    }
+
+    pub fn drop_connection(&mut self, connection_name: &str) {
+        let id = self.connection_id_by_name.remove(connection_name).unwrap();
+        let _connection = self.connection_by_id.remove(&id).unwrap();
+    }
+
     pub fn drop_database(&mut self, db_id: DatabaseId) {
         let name = self.db_name_by_id.remove(&db_id).unwrap();
         let _database = self.database_by_name.remove(&name).unwrap();
@@ -526,6 +544,19 @@ impl Catalog {
             .ok_or_else(|| CatalogError::NotFound("function", function_name.to_string()))
     }
 
+    pub fn get_connection_by_name(
+        &self,
+        connection_name: &str,
+    ) -> CatalogResult<&ConnectionCatalog> {
+        let connection_id = self
+            .connection_id_by_name
+            .get(connection_name)
+            .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_string()))?;
+        self.connection_by_id
+            .get(connection_id)
+            .ok_or_else(|| CatalogError::NotFound("connection", connection_name.to_string()))
+    }
+
     /// Check the name if duplicated with existing table, materialized view or source.
     pub fn check_relation_name_duplicated(
         &self,
@@ -610,4 +641,8 @@ impl Catalog {
             .map(|(id, _)| id)
             .ok_or_else(|| CatalogError::NotFound("class", class_name.to_string()))
     }
+
+    pub fn get_all_connections(&self) -> Vec<ConnectionCatalog> {
+        self.connection_by_id.values().cloned().collect_vec()
+    }
 }
diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs
index e2bea264d915c..e2783c2afb976 100644
--- a/src/frontend/src/catalog/schema_catalog.rs
+++ b/src/frontend/src/catalog/schema_catalog.rs
@@ -21,18 +21,15 @@ use risingwave_common::types::DataType;
 use risingwave_connector::sink::catalog::SinkCatalog;
 use risingwave_pb::catalog::{PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView};
 
-use super::source_catalog::SourceCatalog;
-use super::ViewId;
+use super::{SinkId, SourceId, ViewId};
 use crate::catalog::function_catalog::FunctionCatalog;
 use crate::catalog::index_catalog::IndexCatalog;
+use crate::catalog::source_catalog::SourceCatalog;
 use crate::catalog::system_catalog::SystemCatalog;
 use crate::catalog::table_catalog::TableCatalog;
 use crate::catalog::view_catalog::ViewCatalog;
 use crate::catalog::SchemaId;
 
-pub type SourceId = u32;
-pub type SinkId = u32;
-
 #[derive(Clone, Debug)]
 pub struct SchemaCatalog {
     id: SchemaId,
diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs
new file mode 100644
index 0000000000000..6d15af3133956
--- /dev/null
+++ b/src/frontend/src/handler/create_connection.rs
@@ -0,0 +1,124 @@
+// Copyright 2023 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::HashMap;
+
+use pgwire::pg_response::{PgResponse, StatementType};
+use risingwave_common::error::ErrorCode::ProtocolError;
+use risingwave_common::error::{Result, RwError};
+use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
+use risingwave_pb::ddl_service::create_connection_request;
+use risingwave_sqlparser::ast::CreateConnectionStatement;
+use serde_json;
+
+use super::RwPgResponse;
+use crate::binder::Binder;
+use crate::catalog::CatalogError;
+use crate::handler::HandlerArgs;
+
+pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
+pub(crate) const CONNECTION_PROVIDER_PROP: &str = "provider";
+pub(crate) const CONNECTION_SERVICE_NAME_PROP: &str = "service.name";
+pub(crate) const CONNECTION_AVAIL_ZONE_PROP: &str = "availability.zones";
+
+#[inline(always)]
+fn get_connection_property_required(
+    with_properties: &HashMap<String, String>,
+    property: &str,
+) -> Result<String> {
+    with_properties
+        .get(property)
+        .map(|s| s.to_lowercase())
+        .ok_or(RwError::from(ProtocolError(format!(
+            "Required property \"{property}\" was not provided"
+        ))))
+}
+
+fn resolve_private_link_properties(
+    with_properties: &HashMap<String, String>,
+) -> Result<create_connection_request::PrivateLink> {
+    let provider = get_connection_property_required(with_properties, CONNECTION_PROVIDER_PROP)?;
+    let service_name =
+        get_connection_property_required(with_properties, CONNECTION_SERVICE_NAME_PROP)?;
+    let availability_zones_str =
+        get_connection_property_required(with_properties, CONNECTION_AVAIL_ZONE_PROP)?;
+    let availability_zones: Vec<String> =
+        serde_json::from_str(&availability_zones_str).map_err(|e| {
+            RwError::from(ProtocolError(format!(
+                "Can not parse {}: {}",
+                CONNECTION_AVAIL_ZONE_PROP, e
+            )))
+        })?;
+    Ok(create_connection_request::PrivateLink {
+        provider,
+        service_name,
+        availability_zones,
+    })
+}
+
+fn resolve_create_connection_payload(
+    with_properties: &HashMap<String, String>,
+) -> Result<create_connection_request::Payload> {
+    let connection_type = get_connection_property_required(with_properties, CONNECTION_TYPE_PROP)?;
+    let create_connection_payload = match connection_type.as_str() {
+        PRIVATELINK_CONNECTION => create_connection_request::Payload::PrivateLink(
+            resolve_private_link_properties(with_properties)?,
+        ),
+        _ => {
+            return Err(RwError::from(ProtocolError(format!(
+                "Connection type \"{connection_type}\" is not supported"
+            ))));
+        }
+    };
+    Ok(create_connection_payload)
+}
+
+pub async fn handle_create_connection(
+    handler_args: HandlerArgs,
+    stmt: CreateConnectionStatement,
+) -> Result<RwPgResponse> {
+    let session = handler_args.session.clone();
+    let connection_name = Binder::resolve_connection_name(stmt.connection_name)?;
+
+    {
+        let catalog_reader = session.env().catalog_reader();
+        let reader = catalog_reader.read_guard();
+        if reader.get_connection_by_name(&connection_name).is_ok() {
+            return if stmt.if_not_exists {
+                Ok(PgResponse::empty_result_with_notice(
+                    StatementType::CREATE_CONNECTION,
+                    format!("connection \"{}\" exists, skipping", connection_name),
+                ))
+            } else {
+                Err(CatalogError::Duplicated("connection", connection_name).into())
+            };
+        }
+    }
+
+    let with_properties = handler_args
+        .with_options
+        .inner()
+        .clone()
+        .into_iter()
+        .collect();
+
+    let create_connection_payload = resolve_create_connection_payload(&with_properties)?;
+
+    let catalog_writer = session.env().catalog_writer();
+    catalog_writer
+        .create_connection(connection_name, create_connection_payload)
+        .await?;
+
+    Ok(PgResponse::empty_result(StatementType::CREATE_CONNECTION))
+}
diff --git a/src/frontend/src/handler/drop_connection.rs b/src/frontend/src/handler/drop_connection.rs
new file mode 100644
index 0000000000000..2a7038458a81e
--- /dev/null
+++ b/src/frontend/src/handler/drop_connection.rs
@@ -0,0 +1,30 @@
+// Copyright 2023 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use risingwave_common::error::{ErrorCode, Result, RwError};
+use risingwave_sqlparser::ast::ObjectName;
+
+use super::RwPgResponse;
+use crate::handler::HandlerArgs;
+
+pub fn handle_drop_connection(
+    _handler_args: HandlerArgs,
+    _connection_name: ObjectName,
+    _if_exists: bool,
+) -> Result<RwPgResponse> {
+    Err(RwError::from(ErrorCode::NotImplemented(
+        "DROP CONNECTION is not implemented\n".to_string(),
+        None.into(),
+    )))
+}
diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs
index cd17124d20600..35b9f46f1fa6a 100644
--- a/src/frontend/src/handler/mod.rs
+++ b/src/frontend/src/handler/mod.rs
@@ -38,6 +38,7 @@ mod alter_relation_rename;
 mod alter_system;
 mod alter_table_column;
 pub mod alter_user;
+pub mod create_connection;
 mod create_database;
 pub mod create_function;
 pub mod create_index;
@@ -50,6 +51,7 @@ pub mod create_table_as;
 pub mod create_user;
 pub mod create_view;
 mod describe;
+mod drop_connection;
 mod drop_database;
 pub mod drop_function;
 mod drop_index;
@@ -148,6 +150,11 @@ impl HandlerArgs {
             } => {
                 *if_not_exists = false;
             }
+            Statement::CreateConnection {
+                stmt: CreateConnectionStatement { if_not_exists, .. },
+            } => {
+                *if_not_exists = false;
+            }
             _ => {}
         }
         stmt.to_string()
@@ -172,6 +179,9 @@ pub async fn handle(
             create_source::handle_create_source(handler_args, stmt).await
         }
         Statement::CreateSink { stmt } => create_sink::handle_create_sink(handler_args, stmt).await,
+        Statement::CreateConnection { stmt } => {
+            create_connection::handle_create_connection(handler_args, stmt).await
+        }
         Statement::CreateFunction {
             or_replace,
             temporary,
@@ -309,6 +319,9 @@ pub async fn handle(
             ObjectType::View => {
                 drop_view::handle_drop_view(handler_args, object_name, if_exists).await
             }
+            ObjectType::Connection => {
+                drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
+            }
         },
         Statement::DropFunction {
             if_exists,
diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs
index ba239a7b2b70d..ee8281619b48c 100644
--- a/src/frontend/src/handler/show.rs
+++ b/src/frontend/src/handler/show.rs
@@ -19,7 +19,10 @@ use pgwire::types::Row;
 use risingwave_common::catalog::{ColumnDesc, DEFAULT_SCHEMA_NAME};
 use risingwave_common::error::{ErrorCode, Result};
 use risingwave_common::types::DataType;
+use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
+use risingwave_pb::catalog::connection;
 use risingwave_sqlparser::ast::{Ident, ObjectName, ShowCreateType, ShowObject};
+use serde_json;
 
 use super::RwPgResponse;
 use crate::binder::{Binder, Relation};
@@ -117,6 +120,58 @@ pub fn handle_show_object(handler_args: HandlerArgs, command: ShowObject) -> Res
                 ],
             ));
         }
+        ShowObject::Connection => {
+            let connections = catalog_reader.get_all_connections();
+            let rows = connections
+                .into_iter()
+                .map(|c| {
+                    let name = c.name;
+                    let conn_type = match c.info {
+                        connection::Info::PrivateLinkService(_) => {
+                            PRIVATELINK_CONNECTION.to_string()
+                        }
+                    };
+                    let properties = match c.info {
+                        connection::Info::PrivateLinkService(i) => {
+                            format!(
+                                "provider: {}\nservice_name: {}\nendpoint_id: {}\navailability_zones: {}",
+                                i.provider,
+                                i.service_name,
+                                i.endpoint_id,
+                                serde_json::to_string(&i.dns_entries.keys().collect_vec()).unwrap()
+                            )
+                        }
+                    };
+                    Row::new(vec![
+                        Some(name.into()),
+                        Some(conn_type.into()),
+                        Some(properties.into()),
+                    ])
+                })
+                .collect_vec();
+            return Ok(PgResponse::new_for_stream(
+                StatementType::SHOW_COMMAND,
+                None,
+                rows.into(),
+                vec![
+                    PgFieldDescriptor::new(
+                        "Name".to_owned(),
+                        DataType::Varchar.to_oid(),
+                        DataType::Varchar.type_len(),
+                    ),
+                    PgFieldDescriptor::new(
+                        "Type".to_owned(),
+                        DataType::Varchar.to_oid(),
+                        DataType::Varchar.type_len(),
+                    ),
+                    PgFieldDescriptor::new(
+                        "Properties".to_owned(),
+                        DataType::Varchar.to_oid(),
+                        DataType::Varchar.type_len(),
+                    ),
+                ],
+            ));
+        }
     };
 
     let rows = names
diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs
index db18d2849148b..8006d4b0ec557 100644
--- a/src/frontend/src/observer/observer_manager.rs
+++ b/src/frontend/src/observer/observer_manager.rs
@@ -50,7 +50,11 @@ impl ObserverState for FrontendObserverNode {
         };
 
         match info.to_owned() {
-            Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) | Info::Function(_) => {
+            Info::Database(_)
+            | Info::Schema(_)
+            | Info::RelationGroup(_)
+            | Info::Function(_)
+            | Info::Connection(_) => {
                 self.handle_catalog_notification(resp);
             }
             Info::Node(node) => {
@@ -102,6 +106,7 @@ impl ObserverState for FrontendObserverNode {
             indexes,
             views,
             functions,
+            connections,
             users,
             parallel_unit_mappings,
             nodes,
@@ -118,27 +123,30 @@ impl ObserverState for FrontendObserverNode {
         for schema in schemas {
             catalog_guard.create_schema(&schema)
         }
-        for table in tables {
-            catalog_guard.create_table(&table)
-        }
         for source in sources {
             catalog_guard.create_source(&source)
         }
-        for user in users {
-            user_guard.create_user(user)
+        for sink in sinks {
+            catalog_guard.create_sink(&sink)
+        }
+        for table in tables {
+            catalog_guard.create_table(&table)
         }
         for index in indexes {
             catalog_guard.create_index(&index)
         }
-        for sink in sinks {
-            catalog_guard.create_sink(&sink)
-        }
         for view in views {
             catalog_guard.create_view(&view)
         }
         for function in functions {
             catalog_guard.create_function(&function)
         }
+        for connection in connections {
+            catalog_guard.create_connection(&connection)
+        }
+        for user in users {
+            user_guard.create_user(user)
+        }
         self.worker_node_manager.refresh(
             nodes,
             parallel_unit_mappings
@@ -281,6 +289,11 @@ impl FrontendObserverNode {
                 ),
                 _ => panic!("receive an unsupported notify {:?}", resp),
             },
+            Info::Connection(connection) => match resp.operation() {
+                Operation::Add => catalog_guard.create_connection(connection),
+                Operation::Delete => catalog_guard.drop_connection(connection.get_name().as_str()),
+                _ => panic!("receive an unsupported notify {:?}", resp),
+            },
             _ => unreachable!(),
         }
         assert!(
diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs
index 0aa52f8d3eba6..eb1b15b3bf941 100644
--- a/src/frontend/src/test_utils.rs
+++ b/src/frontend/src/test_utils.rs
@@ -34,7 +34,7 @@ use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
 use risingwave_pb::catalog::{
     PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
 };
-use risingwave_pb::ddl_service::DdlProgress;
+use risingwave_pb::ddl_service::{create_connection_request, DdlProgress};
 use risingwave_pb::hummock::HummockSnapshot;
 use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
 use risingwave_pb::meta::{CreatingJobInfo, SystemParams};
@@ -298,6 +298,14 @@ impl CatalogWriter for MockCatalogWriter {
         unreachable!()
     }
 
+    async fn create_connection(
+        &self,
+        _connection_name: String,
+        _connection: create_connection_request::Payload,
+    ) -> Result<()> {
+        unreachable!()
+    }
+
     async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()> {
         if let Some(source_id) = source_id {
             self.drop_table_or_source_id(source_id);
@@ -387,6 +395,10 @@ impl CatalogWriter for MockCatalogWriter {
         unreachable!()
     }
 
+    async fn drop_connection(&self, _connection_name: &str) -> Result<()> {
+        unreachable!()
+    }
+
     async fn drop_database(&self, database_id: u32) -> Result<()> {
         self.catalog.write().drop_database(database_id);
         Ok(())
diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs
index 91429d184e697..d402127a87334 100644
--- a/src/frontend/src/utils/with_options.rs
+++ b/src/frontend/src/utils/with_options.rs
@@ -19,7 +19,8 @@ use std::num::NonZeroU32;
 use itertools::Itertools;
 use risingwave_common::error::{ErrorCode, RwError};
 use risingwave_sqlparser::ast::{
-    CreateSinkStatement, CreateSourceStatement, SqlOption, Statement, Value,
+    CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, SqlOption, Statement,
+    Value,
 };
 
 mod options {
@@ -146,6 +147,12 @@ impl TryFrom<&Statement> for WithOptions {
                     CreateSinkStatement {
                         with_properties, ..
                     },
+            }
+            | Statement::CreateConnection {
+                stmt:
+                    CreateConnectionStatement {
+                        with_properties, ..
+                    },
             } => Self::try_from(with_properties.0.as_slice()),
 
             _ => Ok(Default::default()),
diff --git a/src/meta/src/backup_restore/meta_snapshot_builder.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs
index dda21b1a1752c..01b1af7882409 100644
--- a/src/meta/src/backup_restore/meta_snapshot_builder.rs
+++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs
@@ -21,7 +21,9 @@ use risingwave_backup::error::{BackupError, BackupResult};
 use risingwave_backup::meta_snapshot::{ClusterMetadata, MetaSnapshot};
 use risingwave_backup::MetaSnapshotId;
 use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
-use risingwave_pb::catalog::{Database, Function, Index, Schema, Sink, Source, Table, View};
+use risingwave_pb::catalog::{
+    Connection, Database, Function, Index, Schema, Sink, Source, Table, View,
+};
 use risingwave_pb::hummock::{HummockVersion, HummockVersionDelta, HummockVersionStats};
 use risingwave_pb::meta::SystemParams;
 use risingwave_pb::user::UserInfo;
@@ -109,6 +111,7 @@ impl<S: MetaStore> MetaSnapshotBuilder<S> {
         let source = Source::list_at_snapshot::<S>(&meta_store_snapshot).await?;
         let view = View::list_at_snapshot::<S>(&meta_store_snapshot).await?;
         let function = Function::list_at_snapshot::<S>(&meta_store_snapshot).await?;
+        let connection = Connection::list_at_snapshot::<S>(&meta_store_snapshot).await?;
         let system_param = SystemParams::get_at_snapshot::<S>(&meta_store_snapshot)
             .await?
             .ok_or_else(|| anyhow!("system params not found in meta store"))?;
@@ -134,6 +137,7 @@ impl<S: MetaStore> MetaSnapshotBuilder<S> {
             table_fragments,
             user_info,
             function,
+            connection,
             system_param,
             tracking_id,
         };
diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs
index 97ed1f442934d..48061dd4c417a 100644
--- a/src/meta/src/backup_restore/restore.rs
+++ b/src/meta/src/backup_restore/restore.rs
@@ -173,6 +173,7 @@ async fn restore_metadata<S: MetaStore>(meta_store: S, snapshot: MetaSnapshot) -
     restore_metadata_model(&meta_store, &snapshot.metadata.view).await?;
     restore_metadata_model(&meta_store, &snapshot.metadata.source).await?;
     restore_metadata_model(&meta_store, &snapshot.metadata.function).await?;
+    restore_metadata_model(&meta_store, &snapshot.metadata.connection).await?;
     restore_system_param_model(&meta_store, &[snapshot.metadata.system_param]).await?;
     Ok(())
 }
diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs
index d84862cc7778d..1dd5c8b79304f 100644
--- a/src/meta/src/manager/catalog/mod.rs
+++ b/src/meta/src/manager/catalog/mod.rs
@@ -353,12 +353,15 @@ where
         let conn_id = connection.id;
         let conn_name = connection.name.clone();
         let mut connections = BTreeMapTransaction::new(&mut core.connections);
-        connections.insert(conn_id, connection);
+        connections.insert(conn_id, connection.to_owned());
         commit_meta!(self, connections)?;
 
         core.connection_by_name.insert(conn_name, conn_id);
-        // Currently we don't need to notify frontend, so just fill 0 here
-        Ok(0)
+
+        let version = self
+            .notify_frontend(Operation::Add, Info::Connection(connection))
+            .await;
+        Ok(version)
     }
 
     pub async fn drop_connection(&self, conn_name: &str) -> MetaResult<NotificationVersion> {
@@ -370,10 +373,13 @@ where
             .ok_or_else(|| anyhow!("connection {} not found", conn_name))?;
 
         let mut connections = BTreeMapTransaction::new(&mut core.connections);
-        connections.remove(conn_id);
+        let connection = connections.remove(conn_id).unwrap();
         commit_meta!(self, connections)?;
-        // Currently we don't need to notify frontend, so just fill 0 here
-        Ok(0)
+
+        let version = self
+            .notify_frontend(Operation::Delete, Info::Connection(connection))
+            .await;
+        Ok(version)
     }
 
     pub async fn create_schema(&self, schema: &Schema) -> MetaResult<NotificationVersion> {
diff --git a/src/meta/src/rpc/cloud_provider.rs b/src/meta/src/rpc/cloud_provider.rs
index 7a53e3cf6ee07..34dcdc47ec514 100644
--- a/src/meta/src/rpc/cloud_provider.rs
+++ b/src/meta/src/rpc/cloud_provider.rs
@@ -84,6 +84,7 @@ impl AwsEc2Client {
 
         Ok(PrivateLinkService {
             provider: CLOUD_PROVIDER_AWS.to_string(),
+            service_name: service_name.to_string(),
             endpoint_id,
             dns_entries: azid_to_dns_map,
         })
diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs
index 4d32fd756a015..d1865451a5b41 100644
--- a/src/meta/src/rpc/service/ddl_service.rs
+++ b/src/meta/src/rpc/service/ddl_service.rs
@@ -617,7 +617,7 @@ where
                 let id = self.gen_unique_id::<{ IdCategory::Connection }>().await?;
                 let connection = Connection {
                     id,
-                    name: link.service_name.clone(),
+                    name: req.name,
                     info: Some(connection::Info::PrivateLinkService(private_link_svc)),
                 };
 
@@ -650,11 +650,15 @@ where
     ) -> Result<Response<DropConnectionResponse>, Status> {
         let req = request.into_inner();
 
-        self.ddl_controller
+        let version = self
+            .ddl_controller
             .run_command(DdlCommand::DropConnection(req.connection_name))
             .await?;
 
-        Ok(Response::new(DropConnectionResponse {}))
+        Ok(Response::new(DropConnectionResponse {
+            status: None,
+            version,
+        }))
     }
 }
 
diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs
index 911fe6006992a..58602b6780625 100644
--- a/src/rpc_client/src/meta_client.rs
+++ b/src/rpc_client/src/meta_client.rs
@@ -80,6 +80,7 @@ use crate::error::{Result, RpcError};
 use crate::hummock_meta_client::{CompactTaskItem, HummockMetaClient};
 use crate::{meta_rpc_client_method_impl, ExtraInfoSourceRef};
 
+type ConnectionId = u32;
 type DatabaseId = u32;
 type SchemaId = u32;
 
@@ -125,10 +126,17 @@ impl MetaClient {
         .await
     }
 
-    pub async fn create_connection(&self, req: create_connection_request::Payload) -> Result<u32> {
-        let request = CreateConnectionRequest { payload: Some(req) };
+    pub async fn create_connection(
+        &self,
+        connection_name: String,
+        req: create_connection_request::Payload,
+    ) -> Result<(ConnectionId, CatalogVersion)> {
+        let request = CreateConnectionRequest {
+            name: connection_name,
+            payload: Some(req),
+        };
         let resp = self.inner.create_connection(request).await?;
-        Ok(resp.connection_id)
+        Ok((resp.connection_id, resp.version))
     }
 
     pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
@@ -137,12 +145,12 @@ impl MetaClient {
         Ok(resp.connections)
     }
 
-    pub async fn drop_connection(&self, connection_name: &str) -> Result<()> {
+    pub async fn drop_connection(&self, connection_name: &str) -> Result<CatalogVersion> {
         let request = DropConnectionRequest {
             connection_name: connection_name.to_string(),
         };
-        let _ = self.inner.drop_connection(request).await?;
-        Ok(())
+        let resp = self.inner.drop_connection(request).await?;
+        Ok(resp.version)
     }
 
     pub(crate) fn parse_meta_addr(meta_addr: &str) -> Result<MetaAddressStrategy> {
diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs
index b1f5619e4a874..7e2f69f9131a8 100644
--- a/src/sqlparser/src/ast/mod.rs
+++ b/src/sqlparser/src/ast/mod.rs
@@ -746,6 +746,7 @@ pub enum ShowObject {
     Source { schema: Option<Ident> },
     Sink { schema: Option<Ident> },
     Columns { table: ObjectName },
+    Connection,
 }
 
 impl fmt::Display for ShowObject {
@@ -776,6 +777,9 @@ impl fmt::Display for ShowObject {
             ShowObject::Source { schema } => write!(f, "SOURCES{}", fmt_schema(schema)),
             ShowObject::Sink { schema } => write!(f, "SINKS{}", fmt_schema(schema)),
             ShowObject::Columns { table } => write!(f, "COLUMNS FROM {}", table),
+            ShowObject::Connection => f.write_str("CONNECTIONS"), /* TODO: format schema after
+                                                                   * adding database_id and
+                                                                   * schema_id */
         }
     }
 }
@@ -976,6 +980,8 @@ pub enum Statement {
     CreateSource { stmt: CreateSourceStatement },
     /// CREATE SINK
     CreateSink { stmt: CreateSinkStatement },
+    /// CREATE CONNECTION
+    CreateConnection { stmt: CreateConnectionStatement },
     /// CREATE FUNCTION
     ///
     /// Postgres: https://www.postgresql.org/docs/15/sql-createfunction.html
@@ -1411,6 +1417,7 @@ impl fmt::Display for Statement {
                 stmt,
             ),
             Statement::CreateSink { stmt } => write!(f, "CREATE SINK {}", stmt,),
+            Statement::CreateConnection { stmt } => write!(f, "CREATE CONNECTION {}", stmt,),
             Statement::AlterTable { name, operation } => {
                 write!(f, "ALTER TABLE {} {}", name, operation)
             }
@@ -1969,6 +1976,7 @@ pub enum ObjectType {
     Sink,
     Database,
     User,
+    Connection,
 }
 
 impl fmt::Display for ObjectType {
@@ -1983,6 +1991,7 @@ impl fmt::Display for ObjectType {
             ObjectType::Sink => "SINK",
             ObjectType::Database => "DATABASE",
             ObjectType::User => "USER",
+            ObjectType::Connection => "CONNECTION",
         })
     }
 }
@@ -2007,9 +2016,11 @@ impl ParseTo for ObjectType {
             ObjectType::Database
         } else if parser.parse_keyword(Keyword::USER) {
             ObjectType::User
+        } else if parser.parse_keyword(Keyword::CONNECTION) {
+            ObjectType::Connection
         } else {
             return parser.expected(
-                "TABLE, VIEW, INDEX, MATERIALIZED VIEW, SOURCE, SINK, SCHEMA, DATABASE or USER after DROP",
+                "TABLE, VIEW, INDEX, MATERIALIZED VIEW, SOURCE, SINK, SCHEMA, DATABASE, USER or CONNECTION after DROP",
                 parser.peek_token(),
             );
         };
diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs
index 3494bd9db92e2..05d6820e89934 100644
--- a/src/sqlparser/src/ast/statement.rs
+++ b/src/sqlparser/src/ast/statement.rs
@@ -492,6 +492,48 @@ impl fmt::Display for CreateSinkStatement {
     }
 }
 
+// sql_grammar!(CreateConnectionStatement {
+//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
+//     connection_name: Ident,
+//     with_properties: AstOption<WithProperties>,
+// });
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
+pub struct CreateConnectionStatement {
+    pub if_not_exists: bool,
+    pub connection_name: ObjectName,
+    pub with_properties: WithProperties,
+}
+
+impl ParseTo for CreateConnectionStatement {
+    fn parse_to(p: &mut Parser) -> Result<Self, ParserError> {
+        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
+        impl_parse_to!(connection_name: ObjectName, p);
+        impl_parse_to!(with_properties: WithProperties, p);
+        if with_properties.0.is_empty() {
+            return Err(ParserError::ParserError(
+                "connection properties not provided".to_string(),
+            ));
+        }
+
+        Ok(Self {
+            if_not_exists,
+            connection_name,
+            with_properties,
+        })
+    }
+}
+
+impl fmt::Display for CreateConnectionStatement {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let mut v: Vec<String> = vec![];
+        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
+        impl_fmt_display!(connection_name, v, self);
+        impl_fmt_display!(with_properties, v, self);
+        v.iter().join(" ").fmt(f)
+    }
+}
+
 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
 pub struct AstVec<T>(pub Vec<T>);
diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs
index aa1c08f1574a6..95064d0605c4d 100644
--- a/src/sqlparser/src/keywords.rs
+++ b/src/sqlparser/src/keywords.rs
@@ -136,6 +136,8 @@ define_keywords!(
     CONDITION,
     CONFLUENT,
     CONNECT,
+    CONNECTION,
+    CONNECTIONS,
     CONSTRAINT,
     CONTAINS,
     CONVERT,
diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs
index ea0799e463015..be3f257318a7f 100644
--- a/src/sqlparser/src/parser.rs
+++ b/src/sqlparser/src/parser.rs
@@ -1725,6 +1725,8 @@ impl Parser {
             self.parse_create_source(or_replace)
         } else if self.parse_keyword(Keyword::SINK) {
             self.parse_create_sink(or_replace)
+        } else if self.parse_keyword(Keyword::CONNECTION) {
+            self.parse_create_connection()
         } else if self.parse_keyword(Keyword::FUNCTION) {
             self.parse_create_function(or_replace, temporary)
         } else if or_replace {
@@ -1821,6 +1823,17 @@ impl Parser {
         })
     }
 
+    // CREATE
+    // CONNECTION
+    // [IF NOT EXISTS]?
+    // <connection_name: Ident>
+    // [WITH (properties)]?
+    pub fn parse_create_connection(&mut self) -> Result<Statement, ParserError> {
+        Ok(Statement::CreateConnection {
+            stmt: CreateConnectionStatement::parse_to(self)?,
+        })
+    }
+
     pub fn parse_create_function(
         &mut self,
         or_replace: bool,
@@ -3574,6 +3587,9 @@ impl Parser {
                         return self.expected("from after columns", self.peek_token());
                     }
                 }
+                Keyword::CONNECTIONS => {
+                    return Ok(Statement::ShowObjects(ShowObject::Connection));
+                }
                 _ => {}
             }
         }
diff --git a/src/storage/backup/src/meta_snapshot.rs b/src/storage/backup/src/meta_snapshot.rs
index ea8d84266fee1..7872c09cb930c 100644
--- a/src/storage/backup/src/meta_snapshot.rs
+++ b/src/storage/backup/src/meta_snapshot.rs
@@ -18,7 +18,9 @@ use std::fmt::{Display, Formatter};
 use bytes::{Buf, BufMut};
 use itertools::Itertools;
 use risingwave_common::util::iter_util::ZipEqFast;
-use risingwave_pb::catalog::{Database, Function, Index, Schema, Sink, Source, Table, View};
+use risingwave_pb::catalog::{
+    Connection, Database, Function, Index, Schema, Sink, Source, Table, View,
+};
 use risingwave_pb::hummock::{CompactionGroup, HummockVersion, HummockVersionStats};
 use risingwave_pb::meta::{SystemParams, TableFragments};
 use risingwave_pb::user::UserInfo;
@@ -88,6 +90,8 @@ impl Display for MetaSnapshot {
         writeln!(f, "{:#?}", self.metadata.source)?;
         writeln!(f, "view:")?;
         writeln!(f, "{:#?}", self.metadata.view)?;
+        writeln!(f, "connection:")?;
+        writeln!(f, "{:#?}", self.metadata.connection)?;
         writeln!(f, "table_fragments:")?;
         writeln!(f, "{:#?}", self.metadata.table_fragments)?;
         writeln!(f, "user_info:")?;
@@ -120,6 +124,7 @@ pub struct ClusterMetadata {
     pub table_fragments: Vec<TableFragments>,
     pub user_info: Vec<UserInfo>,
     pub function: Vec<Function>,
+    pub connection: Vec<Connection>,
     pub system_param: SystemParams,
     pub tracking_id: String,
 }
@@ -143,6 +148,7 @@ impl ClusterMetadata {
         Self::encode_prost_message_list(&self.source.iter().collect_vec(), buf);
         Self::encode_prost_message_list(&self.view.iter().collect_vec(), buf);
         Self::encode_prost_message_list(&self.function.iter().collect_vec(), buf);
+        Self::encode_prost_message_list(&self.connection.iter().collect_vec(), buf);
         Self::encode_prost_message(&self.system_param, buf);
         Self::encode_prost_message(&self.tracking_id, buf);
     }
@@ -167,6 +173,7 @@ impl ClusterMetadata {
         let source: Vec<Source> = Self::decode_prost_message_list(&mut buf)?;
         let view: Vec<View> = Self::decode_prost_message_list(&mut buf)?;
         let function: Vec<Function> = Self::decode_prost_message_list(&mut buf)?;
+        let connection: Vec<Connection> = Self::decode_prost_message_list(&mut buf)?;
         let system_param: SystemParams = Self::decode_prost_message(&mut buf)?;
         let tracking_id: String = Self::decode_prost_message(&mut buf)?;
 
@@ -185,6 +192,7 @@ impl ClusterMetadata {
             table_fragments,
             user_info,
             function,
+            connection,
             system_param,
             tracking_id,
         })
diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs
index cc2abfd9900dd..5229a96125b81 100644
--- a/src/utils/pgwire/src/pg_response.rs
+++ b/src/utils/pgwire/src/pg_response.rs
@@ -51,6 +51,7 @@ pub enum StatementType {
     CREATE_USER,
     CREATE_INDEX,
     CREATE_FUNCTION,
+    CREATE_CONNECTION,
     DESCRIBE,
     GRANT_PRIVILEGE,
     DROP_TABLE,
@@ -63,6 +64,7 @@ pub enum StatementType {
     DROP_SCHEMA,
     DROP_DATABASE,
     DROP_USER,
+    DROP_CONNECTION,
     ALTER_INDEX,
     ALTER_VIEW,
     ALTER_TABLE,
@@ -193,6 +195,9 @@ impl StatementType {
                 risingwave_sqlparser::ast::ObjectType::Sink => Ok(StatementType::DROP_SINK),
                 risingwave_sqlparser::ast::ObjectType::Database => Ok(StatementType::DROP_DATABASE),
                 risingwave_sqlparser::ast::ObjectType::User => Ok(StatementType::DROP_USER),
+                risingwave_sqlparser::ast::ObjectType::Connection => {
+                    Ok(StatementType::DROP_CONNECTION)
+                }
             },
             Statement::Explain { .. } => Ok(StatementType::EXPLAIN),
             Statement::Flush => Ok(StatementType::FLUSH),