Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor CacheInvalidator #3550

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,20 @@ use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
use common_meta::instruction::CacheIdent;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use moka::future::{Cache as AsyncCache, CacheBuilder};
use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
use table::dist_table::DistTable;
use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::TableRef;

Expand Down Expand Up @@ -79,24 +78,18 @@ fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {

#[async_trait::async_trait]
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_id(ctx, table_id)
.await
}

async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
let table_cache_key = format_full_table_name(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.await?;
self.table_cache.invalidate(&table_cache_key).await;

Ok(())
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> MetaResult<()> {
for cache in &caches {
if let CacheIdent::TableName(table_name) = cache {
let table_cache_key = format_full_table_name(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
self.table_cache.invalidate(&table_cache_key).await;
}
}
self.cache_invalidator.invalidate(ctx, caches).await
}
}

Expand Down
46 changes: 19 additions & 27 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@

use std::sync::Arc;

use table::metadata::TableId;

use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteKey;
use crate::key::TableMetaKey;
use crate::table_name::TableName;

/// KvBackend cache invalidator
#[async_trait::async_trait]
Expand All @@ -46,10 +44,7 @@ pub struct Context {

#[async_trait::async_trait]
pub trait CacheInvalidator: Send + Sync {
// Invalidates table cache
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> Result<()>;

async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> Result<()>;
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> Result<()>;
}

pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
Expand All @@ -58,11 +53,7 @@ pub struct DummyCacheInvalidator;

#[async_trait::async_trait]
impl CacheInvalidator for DummyCacheInvalidator {
async fn invalidate_table_id(&self, _ctx: &Context, _table_id: TableId) -> Result<()> {
Ok(())
}

async fn invalidate_table_name(&self, _ctx: &Context, _table_name: TableName) -> Result<()> {
async fn invalidate(&self, _ctx: &Context, _caches: Vec<CacheIdent>) -> Result<()> {
Ok(())
}
}
Expand All @@ -72,21 +63,22 @@ impl<T> CacheInvalidator for T
where
T: KvCacheInvalidator,
{
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> {
let key: TableNameKey = (&table_name).into();

self.invalidate_key(&key.as_raw_key()).await;

Ok(())
}

async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> {
let key = TableInfoKey::new(table_id);
self.invalidate_key(&key.as_raw_key()).await;

let key = &TableRouteKey { table_id };
self.invalidate_key(&key.as_raw_key()).await;

async fn invalidate(&self, _ctx: &Context, caches: Vec<CacheIdent>) -> Result<()> {
for cache in caches {
match cache {
CacheIdent::TableId(table_id) => {
let key = TableInfoKey::new(table_id);
self.invalidate_key(&key.as_raw_key()).await;

let key = &TableRouteKey { table_id };
self.invalidate_key(&key.as_raw_key()).await;
}
CacheIdent::TableName(table_name) => {
let key: TableNameKey = (&table_name).into();
self.invalidate_key(&key.as_raw_key()).await
}
}
}
Ok(())
}
}
11 changes: 9 additions & 2 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::cache_invalidator::Context;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{self, ConvertAlterTableRequestSnafu, Error, InvalidProtoMsgSnafu, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
Expand Down Expand Up @@ -333,11 +334,17 @@ impl AlterTableProcedure {

if matches!(alter_kind, Kind::RenameTable { .. }) {
cache_invalidator
.invalidate_table_name(&Context::default(), self.data.table_ref().into())
.invalidate(
&Context::default(),
vec![CacheIdent::TableName(self.data.table_ref().into())],
)
.await?;
} else {
cache_invalidator
.invalidate_table_id(&Context::default(), self.data.table_id())
.invalidate(
&Context::default(),
vec![CacheIdent::TableId(self.data.table_id())],
)
.await?;
};

Expand Down
14 changes: 8 additions & 6 deletions src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::cache_invalidator::Context;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
Expand Down Expand Up @@ -120,13 +121,14 @@ impl DropTableExecutor {
subject: Some("Invalidate table cache by dropping table".to_string()),
};

// TODO(weny): merge these two invalidation instructions.
cache_invalidator
.invalidate_table_name(&ctx, self.table.table_ref().into())
.await?;

cache_invalidator
.invalidate_table_id(&ctx, self.table_id)
.invalidate(
&ctx,
vec![
CacheIdent::TableName(self.table.table_ref().into()),
CacheIdent::TableId(self.table_id),
],
)
.await?;

Ok(())
Expand Down
19 changes: 12 additions & 7 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl OpenRegion {
}

/// The instruction of downgrading leader region.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DowngradeRegion {
/// The [RegionId].
pub region_id: RegionId,
Expand All @@ -137,7 +137,7 @@ impl Display for DowngradeRegion {
}

/// Upgrades a follower region to leader region.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct UpgradeRegion {
/// The [RegionId].
pub region_id: RegionId,
Expand All @@ -151,7 +151,14 @@ pub struct UpgradeRegion {
pub wait_for_replay_timeout: Option<Duration>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Display)]
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq, Eq)]
/// The identifier of cache.
pub enum CacheIdent {
TableId(TableId),
TableName(TableName),
}

#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens a region.
///
Expand All @@ -165,10 +172,8 @@ pub enum Instruction {
UpgradeRegion(UpgradeRegion),
/// Downgrades a region.
DowngradeRegion(DowngradeRegion),
/// Invalidates a specified table cache.
InvalidateTableIdCache(TableId),
/// Invalidates a specified table name index cache.
InvalidateTableNameCache(TableName),
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
}

/// The reply of [UpgradeRegion].
Expand Down
4 changes: 1 addition & 3 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ impl RegionHeartbeatResponseHandler {
Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
InvalidHeartbeatResponseSnafu.fail()
}
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
}
}
}
Expand Down
25 changes: 6 additions & 19 deletions src/frontend/src/heartbeat/handler/invalidate_table_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use common_meta::heartbeat::handler::{
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_telemetry::error;
use futures::future::Either;

#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
Expand All @@ -32,8 +31,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::InvalidateTableIdCache { .. }))
| Some((_, Instruction::InvalidateTableNameCache { .. }))
Some((_, Instruction::InvalidateCaches(_)))
)
}

Expand All @@ -42,22 +40,11 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
let cache_invalidator = self.cache_invalidator.clone();

let (meta, invalidator) = match ctx.incoming_message.take() {
Some((meta, Instruction::InvalidateTableIdCache(table_id))) => (
meta,
Either::Left(async move {
cache_invalidator
.invalidate_table_id(&Context::default(), table_id)
.await
}),
),
Some((meta, Instruction::InvalidateTableNameCache(table_name))) => (
meta,
Either::Right(async move {
cache_invalidator
.invalidate_table_name(&Context::default(), table_name)
.await
}),
),
Some((meta, Instruction::InvalidateCaches(caches))) => (meta, async move {
cache_invalidator
.invalidate(&Context::default(), caches)
.await
}),
_ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"),
};

Expand Down
11 changes: 8 additions & 3 deletions src/frontend/src/heartbeat/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::instruction::{CacheIdent, Instruction, InstructionReply, SimpleReply};
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::TableMetaKey;
use partition::manager::TableRouteCacheInvalidator;
Expand Down Expand Up @@ -74,7 +74,7 @@ async fn test_invalidate_table_cache_handler() {
handle_instruction(
executor.clone(),
mailbox.clone(),
Instruction::InvalidateTableIdCache(table_id),
Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]),
)
.await;

Expand All @@ -90,7 +90,12 @@ async fn test_invalidate_table_cache_handler() {
.contains_key(&table_info_key.as_raw_key()));

// removes a invalid key
handle_instruction(executor, mailbox, Instruction::InvalidateTableIdCache(0)).await;
handle_instruction(
executor,
mailbox,
Instruction::InvalidateCaches(vec![CacheIdent::TableId(0)]),
)
.await;

let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
Expand Down
13 changes: 3 additions & 10 deletions src/meta-srv/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, Context};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::instruction::Instruction;
use common_meta::table_name::TableName;
use common_meta::instruction::{CacheIdent, Instruction};
use snafu::ResultExt;
use table::metadata::TableId;

use crate::metasrv::MetasrvInfo;
use crate::service::mailbox::{BroadcastChannel, MailboxRef};
Expand Down Expand Up @@ -65,13 +63,8 @@ impl MetasrvCacheInvalidator {

#[async_trait]
impl CacheInvalidator for MetasrvCacheInvalidator {
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
let instruction = Instruction::InvalidateTableIdCache(table_id);
self.broadcast(ctx, instruction).await
}

async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
let instruction = Instruction::InvalidateTableNameCache(table_name);
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> MetaResult<()> {
let instruction = Instruction::InvalidateCaches(caches);
self.broadcast(ctx, instruction).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::instruction::Instruction;
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::RegionIdent;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
Expand All @@ -35,7 +35,7 @@ impl InvalidateCache {
ctx: &RegionFailoverContext,
table_id: TableId,
) -> Result<()> {
let instruction = Instruction::InvalidateTableIdCache(table_id);
let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]);

let msg = &MailboxMessage::json_message(
"Invalidate Table Cache",
Expand Down Expand Up @@ -133,7 +133,10 @@ mod tests {
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::InvalidateTableIdCache(table_id)).unwrap(),
serde_json::to_string(&Instruction::InvalidateCaches(vec![
CacheIdent::TableId(table_id)
]))
.unwrap(),
))
);
}
Expand Down
Loading