Skip to content

Commit

Permalink
chore: avoid double future (#3890)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored May 9, 2024
1 parent 7de62ef commit f995f60
Show file tree
Hide file tree
Showing 9 changed files with 10 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl InformationSchemaColumnsBuilder {
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);

while let Some(table) = stream.try_next().await? {
let keys = &table.table_info().meta.primary_key_indices;
Expand Down
1 change: 0 additions & 1 deletion src/catalog/src/information_schema/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ impl InformationSchemaPartitionsBuilder {
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let table_info_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Expand Down
1 change: 0 additions & 1 deletion src/catalog/src/information_schema/region_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ impl InformationSchemaRegionPeersBuilder {
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let table_id_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/table_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl InformationSchemaTableConstraintsBuilder {
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);

while let Some(table) = stream.try_next().await? {
let keys = &table.table_info().meta.primary_key_indices;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl InformationSchemaTablesBuilder {
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);

while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
Expand Down
6 changes: 1 addition & 5 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,7 @@ impl CatalogManager for KvBackendCatalogManager {
})
}

async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>> {
fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result<TableRef>> {
let sys_tables = try_stream!({
// System tables
let sys_table_names = self.system_catalog.table_names(schema);
Expand Down
6 changes: 1 addition & 5 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ pub trait CatalogManager: Send + Sync {
) -> Result<Option<TableRef>>;

/// Returns all tables with a stream by catalog and schema.
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>>;
fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result<TableRef>>;
}

pub type CatalogManagerRef = Arc<dyn CatalogManager>;
Expand Down
14 changes: 4 additions & 10 deletions src/catalog/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,7 @@ impl CatalogManager for MemoryCatalogManager {
Ok(result)
}

async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>> {
fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result<TableRef>> {
let catalogs = self.catalogs.read().unwrap();

let Some(schemas) = catalogs.get(catalog) else {
Expand All @@ -141,11 +137,11 @@ impl CatalogManager for MemoryCatalogManager {

let tables = tables.values().cloned().collect::<Vec<_>>();

return Box::pin(try_stream!({
Box::pin(try_stream!({
for table in tables {
yield table;
}
}));
}))
}
}

Expand Down Expand Up @@ -368,9 +364,7 @@ mod tests {
.await
.unwrap()
.unwrap();
let stream = catalog_list
.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await;
let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let tables = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(tables.len(), 1);
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/http/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ async fn retrieve_field_names(

if matches.is_empty() {
// query all tables if no matcher is provided
while let Some(table) = manager.tables(catalog, schema).await.next().await {
while let Some(table) = manager.tables(catalog, schema).next().await {
let table = table.context(CatalogSnafu)?;
for column in table.field_columns() {
field_columns.insert(column.name);
Expand Down

0 comments on commit f995f60

Please sign in to comment.