From 729552e7e5dade7a96f3fb285aafc12c982b2df3 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Thu, 9 May 2024 14:51:55 +0800 Subject: [PATCH] chore: avoid double future --- src/catalog/src/information_schema/columns.rs | 2 +- src/catalog/src/information_schema/partitions.rs | 1 - src/catalog/src/information_schema/region_peers.rs | 1 - .../src/information_schema/table_constraints.rs | 2 +- src/catalog/src/information_schema/tables.rs | 2 +- src/catalog/src/kvbackend/manager.rs | 6 +----- src/catalog/src/lib.rs | 6 +----- src/catalog/src/memory/manager.rs | 14 ++++---------- src/servers/src/http/prometheus.rs | 2 +- 9 files changed, 10 insertions(+), 26 deletions(-) diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index d255b2d1441a..b368b2cc33f6 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -258,7 +258,7 @@ impl InformationSchemaColumnsBuilder { let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; + let mut stream = catalog_manager.tables(&catalog_name, &schema_name); while let Some(table) = stream.try_next().await? { let keys = &table.table_info().meta.primary_key_indices; diff --git a/src/catalog/src/information_schema/partitions.rs b/src/catalog/src/information_schema/partitions.rs index e7b80e2342a1..4729623cdd46 100644 --- a/src/catalog/src/information_schema/partitions.rs +++ b/src/catalog/src/information_schema/partitions.rs @@ -243,7 +243,6 @@ impl InformationSchemaPartitionsBuilder { for schema_name in catalog_manager.schema_names(&catalog_name).await? { let table_info_stream = catalog_manager .tables(&catalog_name, &schema_name) - .await .try_filter_map(|t| async move { let table_info = t.table_info(); if table_info.table_type == TableType::Temporary { diff --git a/src/catalog/src/information_schema/region_peers.rs b/src/catalog/src/information_schema/region_peers.rs index 004941a17cb5..1be1b2225474 100644 --- a/src/catalog/src/information_schema/region_peers.rs +++ b/src/catalog/src/information_schema/region_peers.rs @@ -179,7 +179,6 @@ impl InformationSchemaRegionPeersBuilder { for schema_name in catalog_manager.schema_names(&catalog_name).await? { let table_id_stream = catalog_manager .tables(&catalog_name, &schema_name) - .await .try_filter_map(|t| async move { let table_info = t.table_info(); if table_info.table_type == TableType::Temporary { diff --git a/src/catalog/src/information_schema/table_constraints.rs b/src/catalog/src/information_schema/table_constraints.rs index acac899e49eb..6bb30206d9cb 100644 --- a/src/catalog/src/information_schema/table_constraints.rs +++ b/src/catalog/src/information_schema/table_constraints.rs @@ -177,7 +177,7 @@ impl InformationSchemaTableConstraintsBuilder { let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; + let mut stream = catalog_manager.tables(&catalog_name, &schema_name); while let Some(table) = stream.try_next().await? { let keys = &table.table_info().meta.primary_key_indices; diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 4103a17b87ba..24d9cf478163 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -161,7 +161,7 @@ impl InformationSchemaTablesBuilder { let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; + let mut stream = catalog_manager.tables(&catalog_name, &schema_name); while let Some(table) = stream.try_next().await? { let table_info = table.table_info(); diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 305bc915102b..4694d9aae8bb 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -301,11 +301,7 @@ impl CatalogManager for KvBackendCatalogManager { }) } - async fn tables<'a>( - &'a self, - catalog: &'a str, - schema: &'a str, - ) -> BoxStream<'a, Result> { + fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result> { let sys_tables = try_stream!({ // System tables let sys_table_names = self.system_catalog.table_names(schema); diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 5a98a0eb0032..494a94df2699 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -59,11 +59,7 @@ pub trait CatalogManager: Send + Sync { ) -> Result>; /// Returns all tables with a stream by catalog and schema. - async fn tables<'a>( - &'a self, - catalog: &'a str, - schema: &'a str, - ) -> BoxStream<'a, Result>; + fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result>; } pub type CatalogManagerRef = Arc; diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index 7139c8256966..a5513b89048e 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -117,11 +117,7 @@ impl CatalogManager for MemoryCatalogManager { Ok(result) } - async fn tables<'a>( - &'a self, - catalog: &'a str, - schema: &'a str, - ) -> BoxStream<'a, Result> { + fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result> { let catalogs = self.catalogs.read().unwrap(); let Some(schemas) = catalogs.get(catalog) else { @@ -141,11 +137,11 @@ impl CatalogManager for MemoryCatalogManager { let tables = tables.values().cloned().collect::>(); - return Box::pin(try_stream!({ + Box::pin(try_stream!({ for table in tables { yield table; } - })); + })) } } @@ -368,9 +364,7 @@ mod tests { .await .unwrap() .unwrap(); - let stream = catalog_list - .tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) - .await; + let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); let tables = stream.try_collect::>().await.unwrap(); assert_eq!(tables.len(), 1); assert_eq!( diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 5209af11f065..b98b030c7bb2 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -771,7 +771,7 @@ async fn retrieve_field_names( if matches.is_empty() { // query all tables if no matcher is provided - while let Some(table) = manager.tables(catalog, schema).await.next().await { + while let Some(table) = manager.tables(catalog, schema).next().await { let table = table.context(CatalogSnafu)?; for column in table.field_columns() { field_columns.insert(column.name);