From a8477e4142e5b6a959b919942ac770b697784ce4 Mon Sep 17 00:00:00 2001 From: JohnsonLee <53596783+J0HN50N133@users.noreply.github.com> Date: Mon, 9 Sep 2024 08:47:59 +0800 Subject: [PATCH] fix: table resolving logic related to pg_catalog (#4580) * fix: table resolving logic related to pg_catalog refer to https://github.com/GreptimeTeam/greptimedb/issues/3560#issuecomment-2287794348 and #4543 * refactor: remove CatalogProtocol type * fix: sqlness * fix: forbid create database pg_catalog with mysql client * refactor: use QueryContext as arguments rather than Channel * refactor: pass None as default behaviour in information_schema * test: fix test --- src/catalog/src/kvbackend/manager.rs | 128 +++++++++++++----- src/catalog/src/lib.rs | 47 ++++++- src/catalog/src/memory/manager.rs | 51 +++++-- .../information_schema/columns.rs | 4 +- .../information_schema/key_column_usage.rs | 4 +- .../information_schema/partitions.rs | 4 +- .../information_schema/region_peers.rs | 4 +- .../information_schema/schemata.rs | 2 +- .../information_schema/table_constraints.rs | 4 +- .../information_schema/tables.rs | 4 +- .../system_schema/information_schema/views.rs | 4 +- src/catalog/src/system_schema/pg_catalog.rs | 14 +- .../src/system_schema/pg_catalog/pg_class.rs | 9 +- .../system_schema/pg_catalog/pg_namespace.rs | 7 +- src/catalog/src/table_source.rs | 26 ++-- src/catalog/src/table_source/dummy_catalog.rs | 2 +- src/frontend/src/instance.rs | 3 +- src/frontend/src/instance/prom_store.rs | 2 +- src/frontend/src/script.rs | 8 +- src/operator/src/delete.rs | 2 +- src/operator/src/insert.rs | 2 +- .../src/req_convert/delete/row_to_region.rs | 2 +- .../src/req_convert/insert/stmt_to_region.rs | 2 +- src/operator/src/request.rs | 2 +- src/operator/src/statement.rs | 6 +- src/operator/src/statement/copy_database.rs | 2 +- src/operator/src/statement/ddl.rs | 26 +++- src/operator/src/statement/describe.rs | 2 +- src/operator/src/statement/show.rs | 2 +- src/pipeline/src/manager/pipeline_operator.rs | 9 +- src/query/src/datafusion.rs | 26 ++-- src/query/src/datafusion/planner.rs | 2 +- src/query/src/dist_plan/planner.rs | 1 + src/query/src/planner.rs | 13 +- src/query/src/promql/planner.rs | 6 +- src/query/src/sql.rs | 1 + src/servers/src/http/prometheus.rs | 15 +- src/session/src/context.rs | 7 +- tests-integration/src/grpc.rs | 5 +- tests-integration/src/instance.rs | 2 +- tests-integration/src/tests/instance_test.rs | 3 - tests-integration/tests/region_migration.rs | 9 +- .../common/create/create_database.result | 1 - .../common/create/create_database_opts.result | 3 - .../common/information_schema/tables.result | 17 +-- .../common/show/show_databases_tables.result | 2 - .../common/system/information_schema.result | 14 -- .../common/system/pg_catalog.result | 86 ++---------- .../standalone/common/view/create.result | 12 +- 49 files changed, 359 insertions(+), 250 deletions(-) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index d39e1abdb9f8..feb5e31d09bb 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -36,6 +36,7 @@ use futures_util::{StreamExt, TryStreamExt}; use meta_client::client::MetaClient; use moka::sync::Cache; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; +use session::context::{Channel, QueryContext}; use snafu::prelude::*; use table::dist_table::DistTable; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; @@ -152,7 +153,11 @@ impl CatalogManager for KvBackendCatalogManager { Ok(keys) } - async fn schema_names(&self, catalog: &str) -> Result> { + async fn schema_names( + &self, + catalog: &str, + query_ctx: Option<&QueryContext>, + ) -> Result> { let stream = self .table_metadata_manager .schema_manager() @@ -163,12 +168,17 @@ impl CatalogManager for KvBackendCatalogManager { .map_err(BoxedError::new) .context(ListSchemasSnafu { catalog })?; - keys.extend(self.system_catalog.schema_names()); + keys.extend(self.system_catalog.schema_names(query_ctx)); Ok(keys.into_iter().collect()) } - async fn table_names(&self, catalog: &str, schema: &str) -> Result> { + async fn table_names( + &self, + catalog: &str, + schema: &str, + query_ctx: Option<&QueryContext>, + ) -> Result> { let stream = self .table_metadata_manager .table_name_manager() @@ -181,7 +191,7 @@ impl CatalogManager for KvBackendCatalogManager { .into_iter() .map(|(k, _)| k) .collect::>(); - tables.extend_from_slice(&self.system_catalog.table_names(schema)); + tables.extend_from_slice(&self.system_catalog.table_names(schema, query_ctx)); Ok(tables.into_iter().collect()) } @@ -194,8 +204,13 @@ impl CatalogManager for KvBackendCatalogManager { .context(TableMetadataManagerSnafu) } - async fn schema_exists(&self, catalog: &str, schema: &str) -> Result { - if self.system_catalog.schema_exists(schema) { + async fn schema_exists( + &self, + catalog: &str, + schema: &str, + query_ctx: Option<&QueryContext>, + ) -> Result { + if self.system_catalog.schema_exists(schema, query_ctx) { return Ok(true); } @@ -206,8 +221,14 @@ impl CatalogManager for KvBackendCatalogManager { .context(TableMetadataManagerSnafu) } - async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result { - if self.system_catalog.table_exists(schema, table) { + async fn table_exists( + &self, + catalog: &str, + schema: &str, + table: &str, + query_ctx: Option<&QueryContext>, + ) -> Result { + if self.system_catalog.table_exists(schema, table, query_ctx) { return Ok(true); } @@ -225,10 +246,12 @@ impl CatalogManager for KvBackendCatalogManager { catalog_name: &str, schema_name: &str, table_name: &str, + query_ctx: Option<&QueryContext>, ) -> Result> { - if let Some(table) = self - .system_catalog - .table(catalog_name, schema_name, table_name) + let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel()); + if let Some(table) = + self.system_catalog + .table(catalog_name, schema_name, table_name, query_ctx) { return Ok(Some(table)); } @@ -236,23 +259,45 @@ impl CatalogManager for KvBackendCatalogManager { let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu { name: "table_cache", })?; - - table_cache + if let Some(table) = table_cache .get_by_ref(&TableName { catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), table_name: table_name.to_string(), }) .await - .context(GetTableCacheSnafu) + .context(GetTableCacheSnafu)? + { + return Ok(Some(table)); + } + + if channel == Channel::Postgres { + // falldown to pg_catalog + if let Some(table) = + self.system_catalog + .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx) + { + return Ok(Some(table)); + } + } + + return Ok(None); } - fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result> { + fn tables<'a>( + &'a self, + catalog: &'a str, + schema: &'a str, + query_ctx: Option<&'a QueryContext>, + ) -> BoxStream<'a, Result> { let sys_tables = try_stream!({ // System tables - let sys_table_names = self.system_catalog.table_names(schema); + let sys_table_names = self.system_catalog.table_names(schema, query_ctx); for table_name in sys_table_names { - if let Some(table) = self.system_catalog.table(catalog, schema, &table_name) { + if let Some(table) = + self.system_catalog + .table(catalog, schema, &table_name, query_ctx) + { yield table; } } @@ -320,18 +365,27 @@ struct SystemCatalog { } impl SystemCatalog { - // TODO(j0hn50n133): remove the duplicated hard-coded table names logic - fn schema_names(&self) -> Vec { - vec![ - INFORMATION_SCHEMA_NAME.to_string(), - PG_CATALOG_NAME.to_string(), - ] + fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec { + let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel()); + match channel { + // pg_catalog only visible under postgres protocol + Channel::Postgres => vec![ + INFORMATION_SCHEMA_NAME.to_string(), + PG_CATALOG_NAME.to_string(), + ], + _ => { + vec![INFORMATION_SCHEMA_NAME.to_string()] + } + } } - fn table_names(&self, schema: &str) -> Vec { + fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec { + let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel()); match schema { INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(), - PG_CATALOG_NAME => self.pg_catalog_provider.table_names(), + PG_CATALOG_NAME if channel == Channel::Postgres => { + self.pg_catalog_provider.table_names() + } DEFAULT_SCHEMA_NAME => { vec![NUMBERS_TABLE_NAME.to_string()] } @@ -339,23 +393,35 @@ impl SystemCatalog { } } - fn schema_exists(&self, schema: &str) -> bool { - schema == INFORMATION_SCHEMA_NAME || schema == PG_CATALOG_NAME + fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool { + let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel()); + match channel { + Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME, + _ => schema == INFORMATION_SCHEMA_NAME, + } } - fn table_exists(&self, schema: &str, table: &str) -> bool { + fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool { + let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel()); if schema == INFORMATION_SCHEMA_NAME { self.information_schema_provider.table(table).is_some() } else if schema == DEFAULT_SCHEMA_NAME { table == NUMBERS_TABLE_NAME - } else if schema == PG_CATALOG_NAME { + } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres { self.pg_catalog_provider.table(table).is_some() } else { false } } - fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Option { + fn table( + &self, + catalog: &str, + schema: &str, + table_name: &str, + query_ctx: Option<&QueryContext>, + ) -> Option { + let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel()); if schema == INFORMATION_SCHEMA_NAME { let information_schema_provider = self.catalog_cache.get_with_by_ref(catalog, move || { @@ -366,7 +432,7 @@ impl SystemCatalog { )) }); information_schema_provider.table(table_name) - } else if schema == PG_CATALOG_NAME { + } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres { if catalog == DEFAULT_CATALOG_NAME { self.pg_catalog_provider.table(table_name) } else { diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 394500bb757e..3444c0e089e6 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -20,8 +20,10 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use api::v1::CreateTableExpr; +use common_catalog::consts::{INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME}; use futures::future::BoxFuture; use futures_util::stream::BoxStream; +use session::context::QueryContext; use table::metadata::TableId; use table::TableRef; @@ -44,15 +46,35 @@ pub trait CatalogManager: Send + Sync { async fn catalog_names(&self) -> Result>; - async fn schema_names(&self, catalog: &str) -> Result>; + async fn schema_names( + &self, + catalog: &str, + query_ctx: Option<&QueryContext>, + ) -> Result>; - async fn table_names(&self, catalog: &str, schema: &str) -> Result>; + async fn table_names( + &self, + catalog: &str, + schema: &str, + query_ctx: Option<&QueryContext>, + ) -> Result>; async fn catalog_exists(&self, catalog: &str) -> Result; - async fn schema_exists(&self, catalog: &str, schema: &str) -> Result; + async fn schema_exists( + &self, + catalog: &str, + schema: &str, + query_ctx: Option<&QueryContext>, + ) -> Result; - async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result; + async fn table_exists( + &self, + catalog: &str, + schema: &str, + table: &str, + query_ctx: Option<&QueryContext>, + ) -> Result; /// Returns the table by catalog, schema and table name. async fn table( @@ -60,10 +82,25 @@ pub trait CatalogManager: Send + Sync { catalog: &str, schema: &str, table_name: &str, + query_ctx: Option<&QueryContext>, ) -> Result>; /// Returns all tables with a stream by catalog and schema. - fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result>; + fn tables<'a>( + &'a self, + catalog: &'a str, + schema: &'a str, + query_ctx: Option<&'a QueryContext>, + ) -> BoxStream<'a, Result>; + + /// Check if `schema` is a reserved schema name + fn is_reserved_schema_name(&self, schema: &str) -> bool { + // We have to check whether a schema name is reserved before create schema. + // We need this rather than use schema_exists directly because `pg_catalog` is + // only visible via postgres protocol. So if we don't check, a mysql client may + // create a schema named `pg_catalog` which is somehow malformed. + schema == INFORMATION_SCHEMA_NAME || schema == PG_CATALOG_NAME + } } pub type CatalogManagerRef = Arc; diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index 3c27d4736b61..62ff863c4657 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -26,6 +26,7 @@ use common_catalog::consts::{ use common_meta::key::flow::FlowMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use futures_util::stream::BoxStream; +use session::context::QueryContext; use snafu::OptionExt; use table::TableRef; @@ -53,7 +54,11 @@ impl CatalogManager for MemoryCatalogManager { Ok(self.catalogs.read().unwrap().keys().cloned().collect()) } - async fn schema_names(&self, catalog: &str) -> Result> { + async fn schema_names( + &self, + catalog: &str, + _query_ctx: Option<&QueryContext>, + ) -> Result> { Ok(self .catalogs .read() @@ -67,7 +72,12 @@ impl CatalogManager for MemoryCatalogManager { .collect()) } - async fn table_names(&self, catalog: &str, schema: &str) -> Result> { + async fn table_names( + &self, + catalog: &str, + schema: &str, + _query_ctx: Option<&QueryContext>, + ) -> Result> { Ok(self .catalogs .read() @@ -87,11 +97,22 @@ impl CatalogManager for MemoryCatalogManager { self.catalog_exist_sync(catalog) } - async fn schema_exists(&self, catalog: &str, schema: &str) -> Result { + async fn schema_exists( + &self, + catalog: &str, + schema: &str, + _query_ctx: Option<&QueryContext>, + ) -> Result { self.schema_exist_sync(catalog, schema) } - async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result { + async fn table_exists( + &self, + catalog: &str, + schema: &str, + table: &str, + _query_ctx: Option<&QueryContext>, + ) -> Result { let catalogs = self.catalogs.read().unwrap(); Ok(catalogs .get(catalog) @@ -108,6 +129,7 @@ impl CatalogManager for MemoryCatalogManager { catalog: &str, schema: &str, table_name: &str, + _query_ctx: Option<&QueryContext>, ) -> Result> { let result = try { self.catalogs @@ -121,7 +143,12 @@ impl CatalogManager for MemoryCatalogManager { Ok(result) } - fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result> { + fn tables<'a>( + &'a self, + catalog: &'a str, + schema: &'a str, + _query_ctx: Option<&QueryContext>, + ) -> BoxStream<'a, Result> { let catalogs = self.catalogs.read().unwrap(); let Some(schemas) = catalogs.get(catalog) else { @@ -371,11 +398,12 @@ mod tests { DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_NAME, + None, ) .await .unwrap() .unwrap(); - let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, None); let tables = stream.try_collect::>().await.unwrap(); assert_eq!(tables.len(), 1); assert_eq!( @@ -384,7 +412,12 @@ mod tests { ); assert!(catalog_list - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "not_exists") + .table( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "not_exists", + None + ) .await .unwrap() .is_none()); @@ -411,7 +444,7 @@ mod tests { }; catalog.register_table_sync(register_table_req).unwrap(); assert!(catalog - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None) .await .unwrap() .is_some()); @@ -423,7 +456,7 @@ mod tests { }; catalog.deregister_table_sync(deregister_table_req).unwrap(); assert!(catalog - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None) .await .unwrap() .is_none()); diff --git a/src/catalog/src/system_schema/information_schema/columns.rs b/src/catalog/src/system_schema/information_schema/columns.rs index b291e0234137..152fc33a045f 100644 --- a/src/catalog/src/system_schema/information_schema/columns.rs +++ b/src/catalog/src/system_schema/information_schema/columns.rs @@ -257,8 +257,8 @@ impl InformationSchemaColumnsBuilder { .context(UpgradeWeakCatalogManagerRefSnafu)?; let predicates = Predicates::from_scan_request(&request); - for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name); + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { + let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); while let Some(table) = stream.try_next().await? { let keys = &table.table_info().meta.primary_key_indices; diff --git a/src/catalog/src/system_schema/information_schema/key_column_usage.rs b/src/catalog/src/system_schema/information_schema/key_column_usage.rs index f7cedfee2a2e..56713dabba28 100644 --- a/src/catalog/src/system_schema/information_schema/key_column_usage.rs +++ b/src/catalog/src/system_schema/information_schema/key_column_usage.rs @@ -212,8 +212,8 @@ impl InformationSchemaKeyColumnUsageBuilder { .context(UpgradeWeakCatalogManagerRefSnafu)?; let predicates = Predicates::from_scan_request(&request); - for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name); + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { + let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); while let Some(table) = stream.try_next().await? { let mut primary_constraints = vec![]; diff --git a/src/catalog/src/system_schema/information_schema/partitions.rs b/src/catalog/src/system_schema/information_schema/partitions.rs index 3e49a2ddbd3b..93d60679901e 100644 --- a/src/catalog/src/system_schema/information_schema/partitions.rs +++ b/src/catalog/src/system_schema/information_schema/partitions.rs @@ -240,9 +240,9 @@ impl InformationSchemaPartitionsBuilder { let predicates = Predicates::from_scan_request(&request); - for schema_name in catalog_manager.schema_names(&catalog_name).await? { + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { let table_info_stream = catalog_manager - .tables(&catalog_name, &schema_name) + .tables(&catalog_name, &schema_name, None) .try_filter_map(|t| async move { let table_info = t.table_info(); if table_info.table_type == TableType::Temporary { diff --git a/src/catalog/src/system_schema/information_schema/region_peers.rs b/src/catalog/src/system_schema/information_schema/region_peers.rs index 4bcc28144701..5496879af0fb 100644 --- a/src/catalog/src/system_schema/information_schema/region_peers.rs +++ b/src/catalog/src/system_schema/information_schema/region_peers.rs @@ -176,9 +176,9 @@ impl InformationSchemaRegionPeersBuilder { let predicates = Predicates::from_scan_request(&request); - for schema_name in catalog_manager.schema_names(&catalog_name).await? { + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { let table_id_stream = catalog_manager - .tables(&catalog_name, &schema_name) + .tables(&catalog_name, &schema_name, None) .try_filter_map(|t| async move { let table_info = t.table_info(); if table_info.table_type == TableType::Temporary { diff --git a/src/catalog/src/system_schema/information_schema/schemata.rs b/src/catalog/src/system_schema/information_schema/schemata.rs index ca594b61a68f..02d6e606e797 100644 --- a/src/catalog/src/system_schema/information_schema/schemata.rs +++ b/src/catalog/src/system_schema/information_schema/schemata.rs @@ -171,7 +171,7 @@ impl InformationSchemaSchemataBuilder { let table_metadata_manager = utils::table_meta_manager(&self.catalog_manager)?; let predicates = Predicates::from_scan_request(&request); - for schema_name in catalog_manager.schema_names(&catalog_name).await? { + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { let opts = if let Some(table_metadata_manager) = &table_metadata_manager { table_metadata_manager .schema_manager() diff --git a/src/catalog/src/system_schema/information_schema/table_constraints.rs b/src/catalog/src/system_schema/information_schema/table_constraints.rs index ac3d468c367f..50e2469946f2 100644 --- a/src/catalog/src/system_schema/information_schema/table_constraints.rs +++ b/src/catalog/src/system_schema/information_schema/table_constraints.rs @@ -176,8 +176,8 @@ impl InformationSchemaTableConstraintsBuilder { .context(UpgradeWeakCatalogManagerRefSnafu)?; let predicates = Predicates::from_scan_request(&request); - for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name); + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { + let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); while let Some(table) = stream.try_next().await? { let keys = &table.table_info().meta.primary_key_indices; diff --git a/src/catalog/src/system_schema/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs index 638ff073ef82..976c920b9ab9 100644 --- a/src/catalog/src/system_schema/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -234,8 +234,8 @@ impl InformationSchemaTablesBuilder { .context(UpgradeWeakCatalogManagerRefSnafu)?; let predicates = Predicates::from_scan_request(&request); - for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name); + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { + let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); while let Some(table) = stream.try_next().await? { let table_info = table.table_info(); diff --git a/src/catalog/src/system_schema/information_schema/views.rs b/src/catalog/src/system_schema/information_schema/views.rs index daf41f5e5972..082e6c2ff5d3 100644 --- a/src/catalog/src/system_schema/information_schema/views.rs +++ b/src/catalog/src/system_schema/information_schema/views.rs @@ -192,8 +192,8 @@ impl InformationSchemaViewsBuilder { .context(CastManagerSnafu)? .view_info_cache()?; - for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name); + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { + let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); while let Some(table) = stream.try_next().await? { let table_info = table.table_info(); diff --git a/src/catalog/src/system_schema/pg_catalog.rs b/src/catalog/src/system_schema/pg_catalog.rs index 36b7d7119eb4..bc9c246e25fc 100644 --- a/src/catalog/src/system_schema/pg_catalog.rs +++ b/src/catalog/src/system_schema/pg_catalog.rs @@ -18,15 +18,16 @@ mod pg_namespace; mod table_names; use std::collections::HashMap; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, LazyLock, Weak}; -use common_catalog::consts::{self, PG_CATALOG_NAME}; +use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, PG_CATALOG_NAME}; use datatypes::schema::ColumnSchema; use lazy_static::lazy_static; use paste::paste; use pg_catalog_memory_table::get_schema_columns; use pg_class::PGClass; use pg_namespace::PGNamespace; +use session::context::{Channel, QueryContext}; use table::TableRef; pub use table_names::*; @@ -142,3 +143,12 @@ impl SystemSchemaProviderInner for PGCatalogProvider { &self.catalog_name } } + +/// Provide query context to call the [`CatalogManager`]'s method. +static PG_QUERY_CTX: LazyLock = LazyLock::new(|| { + QueryContext::with_channel(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Channel::Postgres) +}); + +fn query_ctx() -> Option<&'static QueryContext> { + Some(&PG_QUERY_CTX) +} diff --git a/src/catalog/src/system_schema/pg_catalog/pg_class.rs b/src/catalog/src/system_schema/pg_catalog/pg_class.rs index d32d56d315c7..30476cc25301 100644 --- a/src/catalog/src/system_schema/pg_catalog/pg_class.rs +++ b/src/catalog/src/system_schema/pg_catalog/pg_class.rs @@ -32,7 +32,7 @@ use store_api::storage::ScanRequest; use table::metadata::TableType; use super::pg_namespace::oid_map::PGNamespaceOidMapRef; -use super::{OID_COLUMN_NAME, PG_CLASS}; +use super::{query_ctx, OID_COLUMN_NAME, PG_CLASS}; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; @@ -202,8 +202,11 @@ impl PGClassBuilder { .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; let predicates = Predicates::from_scan_request(&request); - for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name); + for schema_name in catalog_manager + .schema_names(&catalog_name, query_ctx()) + .await? + { + let mut stream = catalog_manager.tables(&catalog_name, &schema_name, query_ctx()); while let Some(table) = stream.try_next().await? { let table_info = table.table_info(); self.add_class( diff --git a/src/catalog/src/system_schema/pg_catalog/pg_namespace.rs b/src/catalog/src/system_schema/pg_catalog/pg_namespace.rs index e78534b37db1..c6db980de717 100644 --- a/src/catalog/src/system_schema/pg_catalog/pg_namespace.rs +++ b/src/catalog/src/system_schema/pg_catalog/pg_namespace.rs @@ -31,7 +31,7 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef}; use snafu::{OptionExt, ResultExt}; use store_api::storage::ScanRequest; -use super::{PGNamespaceOidMapRef, OID_COLUMN_NAME, PG_NAMESPACE}; +use super::{query_ctx, PGNamespaceOidMapRef, OID_COLUMN_NAME, PG_NAMESPACE}; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; @@ -180,7 +180,10 @@ impl PGNamespaceBuilder { .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; let predicates = Predicates::from_scan_request(&request); - for schema_name in catalog_manager.schema_names(&catalog_name).await? { + for schema_name in catalog_manager + .schema_names(&catalog_name, query_ctx()) + .await? + { self.add_namespace(&predicates, &schema_name); } self.finish() diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index 220cc3d5ec43..d6d81fa1342b 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -23,7 +23,7 @@ use datafusion::datasource::view::ViewTable; use datafusion::datasource::{provider_as_source, TableProvider}; use datafusion::logical_expr::TableSource; use itertools::Itertools; -use session::context::QueryContext; +use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; @@ -45,6 +45,7 @@ pub struct DfTableSourceProvider { disallow_cross_catalog_query: bool, default_catalog: String, default_schema: String, + query_ctx: QueryContextRef, plan_decoder: SubstraitPlanDecoderRef, enable_ident_normalization: bool, } @@ -53,7 +54,7 @@ impl DfTableSourceProvider { pub fn new( catalog_manager: CatalogManagerRef, disallow_cross_catalog_query: bool, - query_ctx: &QueryContext, + query_ctx: QueryContextRef, plan_decoder: SubstraitPlanDecoderRef, enable_ident_normalization: bool, ) -> Self { @@ -63,6 +64,7 @@ impl DfTableSourceProvider { resolved_tables: HashMap::new(), default_catalog: query_ctx.current_catalog().to_owned(), default_schema: query_ctx.current_schema(), + query_ctx, plan_decoder, enable_ident_normalization, } @@ -71,8 +73,7 @@ impl DfTableSourceProvider { pub fn resolve_table_ref(&self, table_ref: TableReference) -> Result { if self.disallow_cross_catalog_query { match &table_ref { - TableReference::Bare { .. } => (), - TableReference::Partial { .. } => {} + TableReference::Bare { .. } | TableReference::Partial { .. } => {} TableReference::Full { catalog, schema, .. } => { @@ -107,7 +108,7 @@ impl DfTableSourceProvider { let table = self .catalog_manager - .table(catalog_name, schema_name, table_name) + .table(catalog_name, schema_name, table_name, Some(&self.query_ctx)) .await? .with_context(|| TableNotExistSnafu { table: format_full_table_name(catalog_name, schema_name, table_name), @@ -210,12 +211,12 @@ mod tests { #[test] fn test_validate_table_ref() { - let query_ctx = &QueryContext::with("greptime", "public"); + let query_ctx = Arc::new(QueryContext::with("greptime", "public")); let table_provider = DfTableSourceProvider::new( MemoryCatalogManager::with_default_setup(), true, - query_ctx, + query_ctx.clone(), DummyDecoder::arc(), true, ); @@ -308,7 +309,7 @@ mod tests { #[tokio::test] async fn test_resolve_view() { - let query_ctx = &QueryContext::with("greptime", "public"); + let query_ctx = Arc::new(QueryContext::with("greptime", "public")); let backend = Arc::new(MemoryKvBackend::default()); let layered_cache_builder = LayeredCacheRegistryBuilder::default() .add_cache_registry(CacheRegistryBuilder::default().build()); @@ -344,8 +345,13 @@ mod tests { .await .unwrap(); - let mut table_provider = - DfTableSourceProvider::new(catalog_manager, true, query_ctx, MockDecoder::arc(), true); + let mut table_provider = DfTableSourceProvider::new( + catalog_manager, + true, + query_ctx.clone(), + MockDecoder::arc(), + true, + ); // View not found let table_ref = TableReference::bare("not_exists_view"); diff --git a/src/catalog/src/table_source/dummy_catalog.rs b/src/catalog/src/table_source/dummy_catalog.rs index 602a5c9cbe0f..09a703e4ddac 100644 --- a/src/catalog/src/table_source/dummy_catalog.rs +++ b/src/catalog/src/table_source/dummy_catalog.rs @@ -112,7 +112,7 @@ impl SchemaProvider for DummySchemaProvider { async fn table(&self, name: &str) -> datafusion::error::Result>> { let table = self .catalog_manager - .table(&self.catalog_name, &self.schema_name, name) + .table(&self.catalog_name, &self.schema_name, name, None) .await? .with_context(|| TableNotExistSnafu { table: format_full_table_name(&self.catalog_name, &self.schema_name, name), diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 798264b2a254..2c5544c51a8c 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -356,9 +356,10 @@ impl SqlQueryHandler for Instance { async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { self.catalog_manager - .schema_exists(catalog, schema) + .schema_exists(catalog, schema, None) .await .context(error::CatalogSnafu) + .map(|b| b && !self.catalog_manager.is_reserved_schema_name(schema)) } } diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 20f66ae853fa..8f1098b058f1 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -102,7 +102,7 @@ impl Instance { ) -> Result { let table = self .catalog_manager - .table(catalog_name, schema_name, table_name) + .table(catalog_name, schema_name, table_name, Some(ctx)) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { diff --git a/src/frontend/src/script.rs b/src/frontend/src/script.rs index 91f793b69743..43d28c012a4d 100644 --- a/src/frontend/src/script.rs +++ b/src/frontend/src/script.rs @@ -152,7 +152,12 @@ mod python { if let Some(table) = self .catalog_manager - .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) + .table( + &expr.catalog_name, + &expr.schema_name, + &expr.table_name, + None, + ) .await .context(CatalogSnafu)? { @@ -185,6 +190,7 @@ mod python { &table_name.catalog_name, &table_name.schema_name, &table_name.table_name, + None, ) .await .context(CatalogSnafu)? diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index ac78350a5089..756195c83ae7 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -232,7 +232,7 @@ impl Deleter { async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result { self.catalog_manager - .table(catalog, schema, table) + .table(catalog, schema, table, None) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 018021f47100..974fa9b58732 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -608,7 +608,7 @@ impl Inserter { table: &str, ) -> Result> { self.catalog_manager - .table(catalog, schema, table) + .table(catalog, schema, table, None) .await .context(CatalogSnafu) } diff --git a/src/operator/src/req_convert/delete/row_to_region.rs b/src/operator/src/req_convert/delete/row_to_region.rs index 1b1316c904a5..d04659c6c8c5 100644 --- a/src/operator/src/req_convert/delete/row_to_region.rs +++ b/src/operator/src/req_convert/delete/row_to_region.rs @@ -64,7 +64,7 @@ impl<'a> RowToRegion<'a> { let catalog_name = self.ctx.current_catalog(); let schema_name = self.ctx.current_schema(); self.catalog_manager - .table(catalog_name, &schema_name, table_name) + .table(catalog_name, &schema_name, table_name, None) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index 37d55e6c9e90..8124edc19514 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -139,7 +139,7 @@ impl<'a> StatementToRegion<'a> { async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result { self.catalog_manager - .table(catalog, schema, table) + .table(catalog, schema, table, None) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index 64a6a75c31a7..0c1db682c3da 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -219,7 +219,7 @@ impl Requester { ) -> Result> { let table = self .catalog_manager - .table(catalog, schema, table_name) + .table(catalog, schema, table_name, None) .await .context(CatalogSnafu)?; diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index a2f02514956c..35e6752d08d3 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -286,7 +286,7 @@ impl StatementExecutor { let table_ref = self .catalog_manager - .table(&catalog, &schema, &table) + .table(&catalog, &schema, &table, Some(&query_ctx)) .await .context(CatalogSnafu)? .context(TableNotFoundSnafu { table_name: &table })?; @@ -313,7 +313,7 @@ impl StatementExecutor { let catalog = query_ctx.current_catalog(); ensure!( self.catalog_manager - .schema_exists(catalog, db.as_ref()) + .schema_exists(catalog, db.as_ref(), Some(&query_ctx)) .await .context(CatalogSnafu)?, SchemaNotFoundSnafu { schema_info: &db } @@ -382,7 +382,7 @@ impl StatementExecutor { table, } = table_ref; self.catalog_manager - .table(catalog, schema, table) + .table(catalog, schema, table, None) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index 134dd2355926..662c2a9fbf9d 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -57,7 +57,7 @@ impl StatementExecutor { ); let table_names = self .catalog_manager - .table_names(&req.catalog_name, &req.schema_name) + .table_names(&req.catalog_name, &req.schema_name, Some(&ctx)) .await .context(CatalogSnafu)?; diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index afef6d590d5d..aa1a07087545 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -106,7 +106,7 @@ impl StatementExecutor { .context(error::ExternalSnafu)?; let table_ref = self .catalog_manager - .table(&catalog, &schema, &table) + .table(&catalog, &schema, &table, Some(&ctx)) .await .context(CatalogSnafu)? .context(TableNotFoundSnafu { table_name: &table })?; @@ -207,6 +207,7 @@ impl StatementExecutor { &create_table.catalog_name, &create_table.schema_name, &create_table.table_name, + Some(&query_ctx), ) .await .context(CatalogSnafu)? @@ -487,7 +488,12 @@ impl StatementExecutor { // if view or table exists. if let Some(table) = self .catalog_manager - .table(&expr.catalog_name, &expr.schema_name, &expr.view_name) + .table( + &expr.catalog_name, + &expr.schema_name, + &expr.view_name, + Some(&ctx), + ) .await .context(CatalogSnafu)? { @@ -656,7 +662,7 @@ impl StatementExecutor { ) -> Result { let view_info = if let Some(view) = self .catalog_manager - .table(&catalog, &schema, &view) + .table(&catalog, &schema, &view, None) .await .context(CatalogSnafu)? { @@ -766,6 +772,7 @@ impl StatementExecutor { &table_name.catalog_name, &table_name.schema_name, &table_name.table_name, + Some(&query_context), ) .await .context(CatalogSnafu)? @@ -816,7 +823,7 @@ impl StatementExecutor { if self .catalog_manager - .schema_exists(&catalog, &schema) + .schema_exists(&catalog, &schema, None) .await .context(CatalogSnafu)? { @@ -858,6 +865,7 @@ impl StatementExecutor { &table_name.catalog_name, &table_name.schema_name, &table_name.table_name, + Some(&query_context), ) .await .context(CatalogSnafu)? @@ -944,7 +952,12 @@ impl StatementExecutor { let table = self .catalog_manager - .table(&catalog_name, &schema_name, &table_name) + .table( + &catalog_name, + &schema_name, + &table_name, + Some(&query_context), + ) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { @@ -1167,9 +1180,10 @@ impl StatementExecutor { if !self .catalog_manager - .schema_exists(catalog, database) + .schema_exists(catalog, database, None) .await .context(CatalogSnafu)? + && !self.catalog_manager.is_reserved_schema_name(database) { self.create_database_procedure( catalog.to_string(), diff --git a/src/operator/src/statement/describe.rs b/src/operator/src/statement/describe.rs index d40990e4dda7..02dd58dbd4b6 100644 --- a/src/operator/src/statement/describe.rs +++ b/src/operator/src/statement/describe.rs @@ -39,7 +39,7 @@ impl StatementExecutor { let table = self .catalog_manager - .table(&catalog, &schema, &table) + .table(&catalog, &schema, &table, Some(&query_ctx)) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs index 5b4817226b6e..eb69983f01c1 100644 --- a/src/operator/src/statement/show.rs +++ b/src/operator/src/statement/show.rs @@ -143,7 +143,7 @@ impl StatementExecutor { let table_ref = self .catalog_manager - .table(&catalog, &schema, &view) + .table(&catalog, &schema, &view, Some(&query_ctx)) .await .context(CatalogSnafu)? .context(ViewNotFoundSnafu { view_name: &view })?; diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 049cd80b452a..2e838144a483 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -110,7 +110,12 @@ impl PipelineOperator { // exist in catalog, just open if let Some(table) = self .catalog_manager - .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) + .table( + &expr.catalog_name, + &expr.schema_name, + &expr.table_name, + Some(&ctx), + ) .await .context(CatalogSnafu)? { @@ -130,7 +135,7 @@ impl PipelineOperator { // get from catalog let table = self .catalog_manager - .table(catalog, schema, table_name) + .table(catalog, schema, table_name, Some(&ctx)) .await .context(CatalogSnafu)? .context(PipelineTableNotFoundSnafu)?; diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 907b14c20d70..03eadfde970d 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -116,7 +116,7 @@ impl DatafusionQueryEngine { let default_catalog = &query_ctx.current_catalog().to_owned(); let default_schema = &query_ctx.current_schema(); let table_name = dml.table_name.resolve(default_catalog, default_schema); - let table = self.find_table(&table_name).await?; + let table = self.find_table(&table_name, &query_ctx).await?; let output = self .exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx.clone()) @@ -241,14 +241,18 @@ impl DatafusionQueryEngine { .context(TableMutationSnafu) } - async fn find_table(&self, table_name: &ResolvedTableReference) -> Result { + async fn find_table( + &self, + table_name: &ResolvedTableReference, + query_context: &QueryContextRef, + ) -> Result { let catalog_name = table_name.catalog.as_ref(); let schema_name = table_name.schema.as_ref(); let table_name = table_name.table.as_ref(); self.state .catalog_manager() - .table(catalog_name, schema_name, table_name) + .table(catalog_name, schema_name, table_name, Some(query_context)) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { table: table_name }) @@ -529,7 +533,7 @@ mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef}; - use session::context::QueryContext; + use session::context::{QueryContext, QueryContextBuilder}; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use super::*; @@ -618,12 +622,16 @@ mod tests { .as_any() .downcast_ref::() .unwrap(); + let query_ctx = Arc::new(QueryContextBuilder::default().build()); let table = engine - .find_table(&ResolvedTableReference { - catalog: "greptime".into(), - schema: "public".into(), - table: "numbers".into(), - }) + .find_table( + &ResolvedTableReference { + catalog: "greptime".into(), + schema: "public".into(), + table: "numbers".into(), + }, + &query_ctx, + ) .await .unwrap(); diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 3c66efeb18b0..c3e8e1f5442f 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -61,7 +61,7 @@ impl DfContextProviderAdapter { let mut table_provider = DfTableSourceProvider::new( engine_state.catalog_manager().clone(), engine_state.disallow_cross_catalog_query(), - query_ctx.as_ref(), + query_ctx.clone(), Arc::new(DefaultPlanDecoder::new(session_state.clone(), &query_ctx)?), session_state .config_options() diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 73168ff1bda8..a94a79846105 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -128,6 +128,7 @@ impl DistExtensionPlanner { &table_name.catalog_name, &table_name.schema_name, &table_name.table_name, + None, ) .await .context(CatalogSnafu)? diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index d59ee8a72ed7..4c0986033586 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -68,7 +68,7 @@ impl DfLogicalPlanner { let table_provider = DfTableSourceProvider::new( self.engine_state.catalog_manager().clone(), self.engine_state.disallow_cross_catalog_query(), - query_ctx.as_ref(), + query_ctx.clone(), Arc::new(DefaultPlanDecoder::new( self.session_state.clone(), &query_ctx, @@ -144,14 +144,15 @@ impl DfLogicalPlanner { #[tracing::instrument(skip_all)] async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result { + let plan_decoder = Arc::new(DefaultPlanDecoder::new( + self.session_state.clone(), + &query_ctx, + )?); let table_provider = DfTableSourceProvider::new( self.engine_state.catalog_manager().clone(), self.engine_state.disallow_cross_catalog_query(), - query_ctx.as_ref(), - Arc::new(DefaultPlanDecoder::new( - self.session_state.clone(), - &query_ctx, - )?), + query_ctx, + plan_decoder, self.session_state .config_options() .sql_parser diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index e42449d0dcb3..ad00c900c803 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -2379,7 +2379,7 @@ mod test { DfTableSourceProvider::new( catalog_list, false, - QueryContext::arc().as_ref(), + QueryContext::arc(), DummyDecoder::arc(), false, ) @@ -3219,7 +3219,7 @@ mod test { DfTableSourceProvider::new( catalog_list.clone(), false, - QueryContext::arc().as_ref(), + QueryContext::arc(), DummyDecoder::arc(), true, ), @@ -3249,7 +3249,7 @@ mod test { DfTableSourceProvider::new( catalog_list.clone(), false, - QueryContext::arc().as_ref(), + QueryContext::arc(), DummyDecoder::arc(), true, ), diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 8b0c09bb6238..ca79ef7416cc 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -232,6 +232,7 @@ async fn query_from_information_schema_table( query_ctx.current_catalog(), INFORMATION_SCHEMA_NAME, table_name, + Some(&query_ctx), ) .await .context(error::CatalogSnafu)? diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 2aef58c48c06..941cac253972 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -405,11 +405,11 @@ async fn get_all_column_names( schema: &str, manager: &CatalogManagerRef, ) -> std::result::Result, catalog::error::Error> { - let table_names = manager.table_names(catalog, schema).await?; + let table_names = manager.table_names(catalog, schema, None).await?; let mut labels = HashSet::new(); for table_name in table_names { - let Some(table) = manager.table(catalog, schema, &table_name).await? else { + let Some(table) = manager.table(catalog, schema, &table_name, None).await? else { continue; }; for column in table.primary_key_columns() { @@ -436,6 +436,7 @@ async fn retrieve_series_from_query_result( query_ctx.current_catalog(), &query_ctx.current_schema(), table_name, + Some(query_ctx), ) .await .context(CatalogSnafu)? @@ -691,7 +692,7 @@ pub async fn label_values_query( if label_name == METRIC_NAME_LABEL { let mut table_names = match handler .catalog_manager() - .table_names(&catalog, &schema) + .table_names(&catalog, &schema, Some(&query_ctx)) .await { Ok(table_names) => table_names, @@ -777,7 +778,11 @@ async fn retrieve_field_names( if matches.is_empty() { // query all tables if no matcher is provided - while let Some(table) = manager.tables(catalog, &schema).next().await { + while let Some(table) = manager + .tables(catalog, &schema, Some(query_ctx)) + .next() + .await + { let table = table.context(CatalogSnafu)?; for column in table.field_columns() { field_columns.insert(column.name); @@ -788,7 +793,7 @@ async fn retrieve_field_names( for table_name in matches { let table = manager - .table(catalog, &schema, &table_name) + .table(catalog, &schema, &table_name, Some(query_ctx)) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 28ecca6a3fd9..70168d9498eb 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -261,6 +261,7 @@ impl QueryContext { impl QueryContextBuilder { pub fn build(self) -> QueryContext { + let channel = self.channel.unwrap_or_default(); QueryContext { current_catalog: self .current_catalog @@ -270,8 +271,10 @@ impl QueryContextBuilder { .sql_dialect .unwrap_or_else(|| Arc::new(GreptimeDbDialect {})), extensions: self.extensions.unwrap_or_default(), - configuration_parameter: self.configuration_parameter.unwrap_or_default(), - channel: self.channel.unwrap_or_default(), + configuration_parameter: self + .configuration_parameter + .unwrap_or_else(|| Arc::new(ConfigurationVariables::default())), + channel, } } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 5975138431ef..fa88de07de4f 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -181,7 +181,8 @@ mod test { .table( "greptime", "database_created_through_grpc", - "table_created_through_grpc" + "table_created_through_grpc", + None, ) .await .unwrap() @@ -510,7 +511,7 @@ CREATE TABLE {table_name} ( let table = instance .frontend() .catalog_manager() - .table("greptime", "public", table_name) + .table("greptime", "public", table_name, None) .await .unwrap() .unwrap(); diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index a456f0a75db4..b3f966c811e5 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -278,7 +278,7 @@ mod tests { assert!(instance .frontend() .catalog_manager() - .table("greptime", "public", "demo") + .table("greptime", "public", "demo", None) .await .unwrap() .is_none()) diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index aefa437532f0..b0bc7f4c881f 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -462,7 +462,6 @@ async fn test_execute_show_databases_tables(instance: Arc) { +--------------------+ | greptime_private | | information_schema | -| pg_catalog | | public | +--------------------+\ "; @@ -1900,7 +1899,6 @@ async fn test_show_databases(instance: Arc) { +--------------------+ | greptime_private | | information_schema | -| pg_catalog | | public | +--------------------+"; check_output_stream(output, expected).await; @@ -1914,7 +1912,6 @@ async fn test_show_databases(instance: Arc) { | Database | +--------------------+ | information_schema | -| pg_catalog | +--------------------+"; check_output_stream(output, expected).await; } diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 98e10c8b2ddb..3f72ee0ccad7 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -1013,7 +1013,7 @@ async fn prepare_testing_metric_table(cluster: &GreptimeDbCluster) -> TableId { let table = cluster .frontend .catalog_manager() - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy") + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy", None) .await .unwrap() .unwrap(); @@ -1039,7 +1039,12 @@ async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId { let table = cluster .frontend .catalog_manager() - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME) + .table( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + TEST_TABLE_NAME, + None, + ) .await .unwrap() .unwrap(); diff --git a/tests/cases/standalone/common/create/create_database.result b/tests/cases/standalone/common/create/create_database.result index f485162edd39..3eafc27d6f47 100644 --- a/tests/cases/standalone/common/create/create_database.result +++ b/tests/cases/standalone/common/create/create_database.result @@ -18,7 +18,6 @@ show databases; | greptime_private | | illegal-database | | information_schema | -| pg_catalog | | public | +--------------------+ diff --git a/tests/cases/standalone/common/create/create_database_opts.result b/tests/cases/standalone/common/create/create_database_opts.result index 93ac8bfceffc..0177a922981c 100644 --- a/tests/cases/standalone/common/create/create_database_opts.result +++ b/tests/cases/standalone/common/create/create_database_opts.result @@ -10,7 +10,6 @@ SHOW DATABASES; | greptime_private | | information_schema | | mydb | -| pg_catalog | | public | +--------------------+ @@ -22,7 +21,6 @@ SHOW FULL DATABASES; | greptime_private | | | information_schema | | | mydb | ttl='1h' | -| pg_catalog | | | public | | +--------------------+----------+ @@ -78,7 +76,6 @@ SHOW DATABASES; +--------------------+ | greptime_private | | information_schema | -| pg_catalog | | public | +--------------------+ diff --git a/tests/cases/standalone/common/information_schema/tables.result b/tests/cases/standalone/common/information_schema/tables.result index 28416fc0720c..93a93a9c9805 100644 --- a/tests/cases/standalone/common/information_schema/tables.result +++ b/tests/cases/standalone/common/information_schema/tables.result @@ -24,16 +24,13 @@ Affected Rows: 0 select table_catalog, table_schema, table_name from information_schema.tables where table_schema != 'information_schema'; -+---------------+--------------+--------------+ -| table_catalog | table_schema | table_name | -+---------------+--------------+--------------+ -| greptime | abc | t | -| greptime | abcde | t | -| greptime | pg_catalog | pg_class | -| greptime | pg_catalog | pg_type | -| greptime | pg_catalog | pg_namespace | -| greptime | public | numbers | -+---------------+--------------+--------------+ ++---------------+--------------+------------+ +| table_catalog | table_schema | table_name | ++---------------+--------------+------------+ +| greptime | abc | t | +| greptime | abcde | t | +| greptime | public | numbers | ++---------------+--------------+------------+ use public; diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index 706b23905bbf..fa50fb2aab3a 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -5,7 +5,6 @@ SHOW DATABASES; +--------------------+ | greptime_private | | information_schema | -| pg_catalog | | public | +--------------------+ @@ -16,7 +15,6 @@ SHOW FULL DATABASES; +--------------------+---------+ | greptime_private | | | information_schema | | -| pg_catalog | | | public | | +--------------------+---------+ diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index f3ee1db0168b..cbb0c12b6f3e 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -45,9 +45,6 @@ order by table_schema, table_name; |greptime|information_schema|tables|LOCALTEMPORARY|3|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|triggers|LOCALTEMPORARY|24|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|views|LOCALTEMPORARY|32|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| -|greptime|pg_catalog|pg_class|LOCALTEMPORARY|256|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| -|greptime|pg_catalog|pg_namespace|LOCALTEMPORARY|258|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| -|greptime|pg_catalog|pg_type|LOCALTEMPORARY|257|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|public|numbers|LOCALTEMPORARY|2|0|0|0|0|0|test_engine|11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| +++++++++++++++++++++++++ @@ -413,16 +410,6 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | views | table_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | views | table_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | views | view_definition | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | pg_catalog | pg_class | oid | 1 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | pg_catalog | pg_class | relkind | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | pg_catalog | pg_class | relname | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | pg_catalog | pg_class | relnamespace | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | pg_catalog | pg_class | relowner | 5 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | pg_catalog | pg_namespace | nspname | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | pg_catalog | pg_namespace | oid | 1 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | pg_catalog | pg_type | oid | 1 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | pg_catalog | pg_type | typlen | 3 | | | 5 | 0 | | | | | | select,insert | | Int16 | smallint | FIELD | | No | smallint | | | -| greptime | pg_catalog | pg_type | typname | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | public | numbers | number | 1 | | | 10 | 0 | | | | PRI | | select,insert | | UInt32 | int unsigned | TAG | | No | int unsigned | | | +---------------+--------------------+---------------------------------------+-----------------------------------+------------------+--------------------------+------------------------+-------------------+---------------+--------------------+--------------------+----------------+------------+-------+---------------+-----------------------+----------------------+-----------------+---------------+----------------+-------------+-----------------+----------------+--------+ @@ -596,7 +583,6 @@ select * from schemata where catalog_name = 'greptime' and schema_name != 'publi +--------------+--------------------+----------------------------+------------------------+----------+---------+ | greptime | greptime_private | utf8 | utf8_bin | | | | greptime | information_schema | utf8 | utf8_bin | | | -| greptime | pg_catalog | utf8 | utf8_bin | | | +--------------+--------------------+----------------------------+------------------------+----------+---------+ -- test engines diff --git a/tests/cases/standalone/common/system/pg_catalog.result b/tests/cases/standalone/common/system/pg_catalog.result index 261211902dbb..d30355352fab 100644 --- a/tests/cases/standalone/common/system/pg_catalog.result +++ b/tests/cases/standalone/common/system/pg_catalog.result @@ -5,30 +5,7 @@ Error: 1004(InvalidArguments), Schema pg_catalog already exists select * from pg_catalog.pg_type order by oid; -+-----+-----------+--------+ -| oid | typname | typlen | -+-----+-----------+--------+ -| 1 | String | -1 | -| 2 | Binary | -1 | -| 3 | Int8 | 1 | -| 4 | Int16 | 2 | -| 5 | Int32 | 4 | -| 6 | Int64 | 8 | -| 7 | UInt8 | 1 | -| 8 | UInt16 | 2 | -| 9 | UInt32 | 4 | -| 10 | UInt64 | 8 | -| 11 | Float32 | 4 | -| 12 | Float64 | 8 | -| 13 | Decimal | 16 | -| 14 | Date | 4 | -| 15 | DateTime | 8 | -| 16 | Timestamp | 8 | -| 17 | Time | 8 | -| 18 | Duration | 8 | -| 19 | Interval | 16 | -| 20 | List | -1 | -+-----+-----------+--------+ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_type -- \d SELECT n.nspname as "Schema", @@ -44,11 +21,7 @@ WHERE c.relkind IN ('r','p','v','m','S','f','') AND pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1,2; -+--------+---------+-------+-------+ -| Schema | Name | Type | Owner | -+--------+---------+-------+-------+ -| public | numbers | table | | -+--------+---------+-------+-------+ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_class -- \dt SELECT n.nspname as "Schema", @@ -64,11 +37,7 @@ WHERE c.relkind IN ('r','p','') AND pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1,2; -+--------+---------+-------+-------+ -| Schema | Name | Type | Owner | -+--------+---------+-------+-------+ -| public | numbers | table | | -+--------+---------+-------+-------+ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_class -- make sure oid of namespace keep stable SELECT * FROM pg_namespace ORDER BY oid; @@ -100,11 +69,7 @@ where relnamespace = ( where nspname = 'my_db' ); -+---------+ -| relname | -+---------+ -| foo | -+---------+ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_class -- \dt SELECT n.nspname as "Schema", @@ -120,12 +85,7 @@ WHERE c.relkind IN ('r','p','') AND pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1,2; -+--------+---------+-------+-------+ -| Schema | Name | Type | Owner | -+--------+---------+-------+-------+ -| my_db | foo | table | | -| public | numbers | table | | -+--------+---------+-------+-------+ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_class -- show tables in `my_db`, `public` select relname @@ -137,12 +97,7 @@ where relnamespace in ( ) order by relname; -+---------+ -| relname | -+---------+ -| foo | -| numbers | -+---------+ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_class select relname from pg_catalog.pg_class @@ -152,11 +107,7 @@ where relnamespace in ( where nspname like 'my%' ); -+---------+ -| relname | -+---------+ -| foo | -+---------+ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_class select relnamespace, relname, relkind from pg_catalog.pg_class @@ -169,11 +120,7 @@ where relnamespace in ( ) order by relnamespace, relname; -+--------------+---------+---------+ -| relnamespace | relname | relkind | -+--------------+---------+---------+ -| 434869349 | foo | r | -+--------------+---------+---------+ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_class use public; @@ -190,24 +137,11 @@ Affected Rows: 0 -- pg_class desc table pg_class; -+--------------+--------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------------+--------+-----+------+---------+---------------+ -| oid | UInt32 | | NO | | FIELD | -| relname | String | | NO | | FIELD | -| relnamespace | UInt32 | | NO | | FIELD | -| relkind | String | | NO | | FIELD | -| relowner | UInt32 | | NO | | FIELD | -+--------------+--------+-----+------+---------+---------------+ +Error: 4001(TableNotFound), Table not found: pg_class desc table pg_namespace; -+---------+--------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+---------+--------+-----+------+---------+---------------+ -| oid | UInt32 | | NO | | FIELD | -| nspname | String | | NO | | FIELD | -+---------+--------+-----+------+---------+---------------+ +Error: 4001(TableNotFound), Table not found: pg_namespace drop table my_db.foo; diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index bb0ea87834e0..4cf8084cd364 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -77,11 +77,7 @@ WHERE c.relkind IN ('v','') AND pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1,2; -+--------+-----------+------+-------+ -| Schema | Name | Type | Owner | -+--------+-----------+------+-------+ -| public | test_view | view | | -+--------+-----------+------+-------+ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_class -- SQLNESS REPLACE (\s\d+\s) ID -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) DATETIME @@ -110,9 +106,6 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; |greptime|information_schema|optimizer_trace|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|parameters|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|partitions|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| -|greptime|pg_catalog|pg_class|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| -|greptime|pg_catalog|pg_namespace|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| -|greptime|pg_catalog|pg_type|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|profiling|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|referential_constraints|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|region_peers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| @@ -205,6 +198,5 @@ WHERE c.relkind IN ('v','') AND pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1,2; -++ -++ +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_class