Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): persist hummock version checkpoint in object store #8631

Merged
merged 11 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2260,6 +2260,30 @@ def section_hummock_manager(outer_panels):
"stale SST total number"),
],
),
panels.timeseries_count(
"Delta Log Total Number",
"total number of hummock version delta log",
[
panels.target(f"{metric('storage_delta_log_count')}",
"delta log total number"),
],
),
panels.timeseries_latency(
"Version Checkpoint Latency",
"hummock version checkpoint latency",
quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('storage_version_checkpoint_latency_bucket')}[$__rate_interval])) by (le))",
f"version_checkpoint_latency_p{legend}",
),
[50, 90, 99, 999, "max"],
) + [
panels.target(
f"rate({metric('storage_version_checkpoint_latency_sum')}[$__rate_interval]) / rate({metric('storage_version_checkpoint_latency_count')}[$__rate_interval])",
"version_checkpoint_latency_avg",
),
],
),
],
)
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ message HummockVersionDeltas {
repeated HummockVersionDelta version_deltas = 1;
}

message HummockVersionCheckpoint {
message StaleObjects {
repeated uint64 id = 1;
}
HummockVersion version = 1;
map<uint64, StaleObjects> stale_objects = 2;
}

// We will have two epoch after decouple
message HummockSnapshot {
// Epoch with checkpoint, we will read durable data with it.
Expand Down
17 changes: 17 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ pub struct MetaConfig {
#[serde(default = "default::meta::vacuum_interval_sec")]
pub vacuum_interval_sec: u64,

/// Interval of hummock version checkpoint.
#[serde(default = "default::meta::hummock_version_checkpoint_interval_sec")]
pub hummock_version_checkpoint_interval_sec: u64,

/// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
/// attempt is rejected.
#[serde(default = "default::meta::min_delta_log_num_for_hummock_version_checkpoint")]
pub min_delta_log_num_for_hummock_version_checkpoint: u64,

/// Maximum allowed heartbeat interval in seconds.
#[serde(default = "default::meta::max_heartbeat_interval_sec")]
pub max_heartbeat_interval_secs: u32,
Expand Down Expand Up @@ -552,6 +561,14 @@ mod default {
30
}

pub fn hummock_version_checkpoint_interval_sec() -> u64 {
30
}

pub fn min_delta_log_num_for_hummock_version_checkpoint() -> u64 {
10
}

pub fn max_heartbeat_interval_sec() -> u32 {
300
}
Expand Down
41 changes: 23 additions & 18 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::time::Instant;

use arc_swap::ArcSwap;
use itertools::Itertools;
use prometheus::Registry;
use risingwave_backup::error::BackupError;
use risingwave_backup::storage::{BoxedMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest};
Expand All @@ -33,6 +32,7 @@ use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder;
use crate::backup_restore::metrics::BackupManagerMetrics;
use crate::hummock::{HummockManagerRef, HummockVersionSafePoint};
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv};
use crate::rpc::metrics::MetaMetrics;
use crate::storage::MetaStore;
use crate::MetaResult;

Expand Down Expand Up @@ -71,18 +71,20 @@ pub struct BackupManager<S: MetaStore> {
/// Tracks the running backup job. Concurrent jobs is not supported.
running_backup_job: tokio::sync::Mutex<Option<BackupJobHandle>>,
metrics: BackupManagerMetrics,
meta_metrics: Arc<MetaMetrics>,
}

impl<S: MetaStore> BackupManager<S> {
pub async fn new(
env: MetaSrvEnv<S>,
hummock_manager: HummockManagerRef<S>,
registry: Registry,
metrics: Arc<MetaMetrics>,
store_url: &str,
store_dir: &str,
) -> MetaResult<Arc<Self>> {
let store_config = (store_url.to_string(), store_dir.to_string());
let store = create_snapshot_store(&store_config).await?;
let store =
create_snapshot_store(&store_config, metrics.object_store_metric.clone()).await?;
tracing::info!(
"backup manager initialized: url={}, dir={}",
store_config.0,
Expand All @@ -91,7 +93,7 @@ impl<S: MetaStore> BackupManager<S> {
let instance = Arc::new(Self::with_store(
env.clone(),
hummock_manager,
registry,
metrics,
(store, store_config),
));
let (local_notification_tx, mut local_notification_rx) =
Expand Down Expand Up @@ -139,20 +141,22 @@ impl<S: MetaStore> BackupManager<S> {
fn with_store(
env: MetaSrvEnv<S>,
hummock_manager: HummockManagerRef<S>,
registry: Registry,
meta_metrics: Arc<MetaMetrics>,
backup_store: (BoxedMetaSnapshotStorage, StoreConfig),
) -> Self {
Self {
env,
hummock_manager,
backup_store: ArcSwap::from_pointee(backup_store),
running_backup_job: tokio::sync::Mutex::new(None),
metrics: BackupManagerMetrics::new(registry),
metrics: BackupManagerMetrics::new(meta_metrics.registry.clone()),
meta_metrics,
}
}

pub async fn set_store(&self, config: StoreConfig) -> MetaResult<()> {
let new_store = create_snapshot_store(&config).await?;
let new_store =
create_snapshot_store(&config, self.meta_metrics.object_store_metric.clone()).await?;
tracing::info!(
"new backup config is applied: url={}, dir={}",
config.0,
Expand All @@ -167,7 +171,7 @@ impl<S: MetaStore> BackupManager<S> {
Self::with_store(
env,
hummock_manager,
Registry::new(),
Arc::new(MetaMetrics::new()),
(
Box::<risingwave_backup::storage::DummyMetaSnapshotStorage>::default(),
StoreConfig::default(),
Expand Down Expand Up @@ -337,7 +341,12 @@ impl<S: MetaStore> BackupWorker<S> {
let mut snapshot_builder =
MetaSnapshotBuilder::new(backup_manager_clone.env.meta_store_ref());
// Reuse job id as snapshot id.
snapshot_builder.build(job_id).await?;
let hummock_manager = backup_manager_clone.hummock_manager.clone();
snapshot_builder
.build(job_id, async move {
hummock_manager.get_current_version().await
})
.await?;
let snapshot = snapshot_builder.finish()?;
backup_manager_clone
.backup_store
Expand All @@ -356,15 +365,11 @@ impl<S: MetaStore> BackupWorker<S> {
}
}

async fn create_snapshot_store(config: &StoreConfig) -> MetaResult<BoxedMetaSnapshotStorage> {
let object_store = Arc::new(
parse_remote_object_store(
&config.0,
Arc::new(ObjectStoreMetrics::unused()),
"Meta Backup",
)
.await,
);
async fn create_snapshot_store(
config: &StoreConfig,
metric: Arc<ObjectStoreMetrics>,
) -> MetaResult<BoxedMetaSnapshotStorage> {
let object_store = Arc::new(parse_remote_object_store(&config.0, metric, "Meta Backup").await);
let store = ObjectStoreMetaSnapshotStorage::new(&config.1, object_store).await?;
Ok(Box::new(store))
}
59 changes: 40 additions & 19 deletions src/meta/src/backup_restore/meta_snapshot_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use anyhow::anyhow;
use risingwave_backup::error::BackupResult;
use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::meta_snapshot::{ClusterMetadata, MetaSnapshot};
use risingwave_backup::MetaSnapshotId;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
Expand Down Expand Up @@ -44,28 +45,40 @@ impl<S: MetaStore> MetaSnapshotBuilder<S> {
}
}

pub async fn build(&mut self, id: MetaSnapshotId) -> BackupResult<()> {
pub async fn build<D: Future<Output = HummockVersion>>(
&mut self,
id: MetaSnapshotId,
hummock_version_builder: D,
) -> BackupResult<()> {
self.snapshot.format_version = VERSION;
self.snapshot.id = id;
// Get `hummock_version` before `meta_store_snapshot`.
// We have ensure the required delta logs for replay is available, see
// `HummockManager::delete_version_deltas`.
let hummock_version = hummock_version_builder.await;
// Caveat: snapshot impl of etcd meta store doesn't prevent it from expiration.
// So expired snapshot read may return error. If that happens,
// tune auto-compaction-mode and auto-compaction-retention on demand.
let meta_store_snapshot = self.meta_store.snapshot().await;
let default_cf = self.build_default_cf(&meta_store_snapshot).await?;
// hummock_version and version_stats is guaranteed to exist in a initialized cluster.
let hummock_version = {
let mut redo_state = HummockVersion::list_at_snapshot::<S>(&meta_store_snapshot)
.await?
.into_iter()
.next()
.ok_or_else(|| anyhow!("hummock version checkpoint not found in meta store"))?;
let mut redo_state = hummock_version;
let hummock_version_deltas =
HummockVersionDelta::list_at_snapshot::<S>(&meta_store_snapshot).await?;
for version_delta in &hummock_version_deltas {
if version_delta.prev_id == redo_state.id {
redo_state.apply_version_delta(version_delta);
}
}
if let Some(log) = hummock_version_deltas.iter().rev().next() {
if log.id != redo_state.id {
return Err(BackupError::Other(anyhow::anyhow!(format!(
"inconsistent hummock version: expected {}, actual {}",
log.id, redo_state.id
))));
}
}
redo_state
};
let version_stats = HummockVersionStats::list_at_snapshot::<S>(&meta_store_snapshot)
Expand Down Expand Up @@ -157,20 +170,19 @@ mod tests {
let meta_store = Arc::new(MemStore::new());

let mut builder = MetaSnapshotBuilder::new(meta_store.clone());
let err = builder.build(1).await.unwrap_err();
let err = assert_matches!(err, BackupError::Other(e) => e);
assert_eq!(
"hummock version checkpoint not found in meta store",
err.to_error_str()
);

let hummock_version = HummockVersion {
id: 1,
..Default::default()
};
let get_ckpt_builder = |v: &HummockVersion| {
let v_ = v.clone();
async move { v_ }
};
hummock_version.insert(meta_store.deref()).await.unwrap();
let mut builder = MetaSnapshotBuilder::new(meta_store.clone());
let err = builder.build(1).await.unwrap_err();
let err = builder
.build(1, get_ckpt_builder(&hummock_version))
.await
.unwrap_err();
let err = assert_matches!(err, BackupError::Other(e) => e);
assert_eq!(
"hummock version stats not found in meta store",
Expand All @@ -185,7 +197,10 @@ mod tests {
.insert(meta_store.deref())
.await
.unwrap();
let err = builder.build(1).await.unwrap_err();
let err = builder
.build(1, get_ckpt_builder(&hummock_version))
.await
.unwrap_err();
let err = assert_matches!(err, BackupError::Other(e) => e);
assert_eq!("system params not found in meta store", err.to_error_str());

Expand All @@ -194,15 +209,21 @@ mod tests {
.await
.unwrap();
let mut builder = MetaSnapshotBuilder::new(meta_store.clone());
builder.build(1).await.unwrap();
builder
.build(1, get_ckpt_builder(&hummock_version))
.await
.unwrap();

let dummy_key = vec![0u8, 1u8, 2u8];
let mut builder = MetaSnapshotBuilder::new(meta_store.clone());
meta_store
.put_cf(DEFAULT_COLUMN_FAMILY, dummy_key.clone(), vec![100])
.await
.unwrap();
builder.build(1).await.unwrap();
builder
.build(1, get_ckpt_builder(&hummock_version))
.await
.unwrap();
let snapshot = builder.finish().unwrap();
let encoded = snapshot.encode();
let decoded = MetaSnapshot::decode(&encoded).unwrap();
Expand Down
Loading