Skip to content

Commit

Permalink
chore: reduce the number of requests for meta (#1647)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 authored May 26, 2023
1 parent 89366ba commit f0a519b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 10 deletions.
38 changes: 30 additions & 8 deletions src/frontend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ impl FrontendCatalogManager {
self.datanode_clients.clone()
}

pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
let schema_key = SchemaKey {
catalog_name: catalog.into(),
schema_name: schema.into(),
}
.to_string();

let key = schema_key.as_bytes();

self.backend_cache_invalidtor.invalidate_key(key).await;
}

pub async fn invalidate_table(&self, catalog: &str, schema: &str, table: &str) {
let tg_key = TableGlobalKey {
catalog_name: catalog.into(),
Expand Down Expand Up @@ -263,6 +275,7 @@ impl CatalogManager for FrontendCatalogManager {
catalog_name: catalog.to_string(),
}
.to_string();

Ok(self.backend.get(key.as_bytes()).await?.map(|_| {
Arc::new(FrontendCatalogProvider {
catalog_name: catalog.to_string(),
Expand Down Expand Up @@ -340,18 +353,27 @@ impl CatalogProvider for FrontendCatalogProvider {
}

async fn schema(&self, name: &str) -> catalog::error::Result<Option<SchemaProviderRef>> {
let all_schemas = self.schema_names().await?;
if all_schemas.contains(&name.to_string()) {
Ok(Some(Arc::new(FrontendSchemaProvider {
catalog_name: self.catalog_name.clone(),
let catalog = &self.catalog_name;

let schema_key = SchemaKey {
catalog_name: catalog.clone(),
schema_name: name.to_string(),
}
.to_string();

let val = self.backend.get(schema_key.as_bytes()).await?;

let provider = val.map(|_| {
Arc::new(FrontendSchemaProvider {
catalog_name: catalog.clone(),
schema_name: name.to_string(),
backend: self.backend.clone(),
partition_manager: self.partition_manager.clone(),
datanode_clients: self.datanode_clients.clone(),
})))
} else {
Ok(None)
}
}) as Arc<dyn SchemaProvider>
});

Ok(provider)
}
}

Expand Down
14 changes: 12 additions & 2 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,8 @@ impl DistInstance {
}

let key = SchemaKey {
catalog_name: catalog,
schema_name: expr.database_name,
catalog_name: catalog.clone(),
schema_name: expr.database_name.clone(),
};
let value = SchemaValue {};
let client = self
Expand All @@ -475,17 +475,27 @@ impl DistInstance {
let request = CompareAndPutRequest::new()
.with_key(key.to_string())
.with_value(value.as_bytes().context(CatalogEntrySerdeSnafu)?);

let response = client
.compare_and_put(request.into())
.await
.context(RequestMetaSnafu)?;

ensure!(
response.success,
SchemaExistsSnafu {
name: key.schema_name
}
);

// Since the database created on meta does not go through KvBackend, so we manually
// invalidate the cache here.
//
// TODO(fys): when the meta invalidation cache mechanism is established, remove it.
self.catalog_manager()
.invalidate_schema(&catalog, &expr.database_name)
.await;

Ok(Output::AffectedRows(1))
}

Expand Down

0 comments on commit f0a519b

Please sign in to comment.