Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metric): add recovery metrics #9022

Merged
merged 4 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

66 changes: 59 additions & 7 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
panels = Panels(datasource)
logging.basicConfig(level=logging.WARN)


def section_cluster_node(panels):
return [
panels.row("Cluster Node"),
Expand Down Expand Up @@ -65,6 +66,49 @@ def section_cluster_node(panels):
]


def section_recovery_node(panels):
return [
panels.row("Recovery"),
panels.timeseries_ops(
"Recovery Successful Rate",
"The rate of successful recovery attempts",
[
panels.target(f"sum(rate({metric('recovery_latency_count')}[$__rate_interval])) by (instance)",
"{{instance}}")
],
["last"],
),
panels.timeseries_count(
"Failed recovery attempts",
"Total number of failed reocovery attempts",
[
panels.target(f"sum({metric('recovery_failure_cnt')}) by (instance)",
"{{instance}}")
],
["last"],
),
panels.timeseries_latency(
"Recovery latency",
"Time spent in a successful recovery attempt",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('recovery_latency_bucket')}[$__rate_interval])) by (le, instance))",
f"recovery latency p{legend}" +
" - {{instance}}",
),
[50, 90, 99, "max"],
),
panels.target(
f"sum by (le) (rate({metric('recovery_latency_sum')}[$__rate_interval])) / sum by (le) (rate({metric('recovery_latency_count')}[$__rate_interval]))",
"recovery latency avg",
),
],
["last"],
)
]


def section_compaction(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand Down Expand Up @@ -330,7 +374,7 @@ def section_compaction(outer_panels):
],
),

panels.timeseries_count(
panels.timeseries_count(
"Hummock Sstable Stat",
"Avg count gotten from sstable_distinct_epoch_count, for observing sstable_distinct_epoch_count",
[
Expand All @@ -344,7 +388,7 @@ def section_compaction(outer_panels):
panels.timeseries_latency(
"Hummock Remote Read Duration",
"Total time of operations which read from remote storage when enable prefetch",
[
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('state_store_remote_read_time_per_task_bucket')}[$__rate_interval])) by (le, job, instance, table_id))",
Expand Down Expand Up @@ -501,7 +545,7 @@ def section_object_storage(outer_panels):
"Estimated S3 Cost (Monthly)",
"This metric uses the total size of data in S3 at this second to derive the cost of storing data "
"for a whole month. The price is 0.023 USD per GB. Please checkout AWS's pricing model for more "
"accurate calculation.",
"accurate calculation.",
[
panels.target(
f"sum({metric('storage_level_total_file_size')}) by (instance) * 0.023 / 1000 / 1000",
Expand Down Expand Up @@ -1153,6 +1197,7 @@ def section_batch_exchange(outer_panels):
),
]


def section_frontend(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand Down Expand Up @@ -1184,7 +1229,7 @@ def section_frontend(outer_panels):
"",
[
panels.target(f"{metric('distributed_running_query_num')}",
"The number of running query in distributed execution mode"),
"The number of running query in distributed execution mode"),
],
["last"],
),
Expand All @@ -1193,7 +1238,7 @@ def section_frontend(outer_panels):
"",
[
panels.target(f"{metric('distributed_rejected_query_counter')}",
"The number of rejected query in distributed execution mode"),
"The number of rejected query in distributed execution mode"),
],
["last"],
),
Expand All @@ -1202,7 +1247,7 @@ def section_frontend(outer_panels):
"",
[
panels.target(f"{metric('distributed_completed_query_counter')}",
"The number of completed query in distributed execution mode"),
"The number of completed query in distributed execution mode"),
],
["last"],
),
Expand Down Expand Up @@ -1745,6 +1790,7 @@ def section_hummock_tiered_cache(outer_panels):
)
]


def section_hummock_manager(outer_panels):
panels = outer_panels.sub_panel()
total_key_size_filter = "metric='total_key_size'"
Expand Down Expand Up @@ -1891,6 +1937,7 @@ def section_hummock_manager(outer_panels):
)
]


def section_backup_manager(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand All @@ -1916,7 +1963,7 @@ def section_backup_manager(outer_panels):
f"histogram_quantile({quantile}, sum(rate({metric('backup_job_latency_bucket')}[$__rate_interval])) by (le, state))",
f"Job Process Time p{legend}" +
" - {{state}}",
),
),
[50, 99, 999, "max"],
),
],
Expand All @@ -1925,6 +1972,7 @@ def section_backup_manager(outer_panels):
)
]


def grpc_metrics_target(panels, name, filter):
return panels.timeseries_latency_small(
f"{name} latency",
Expand Down Expand Up @@ -2166,6 +2214,7 @@ def section_grpc_hummock_meta_client(outer_panels):
),
]


def section_memory_manager(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand Down Expand Up @@ -2236,6 +2285,7 @@ def section_memory_manager(outer_panels):
),
]


def section_connector_node(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand All @@ -2256,6 +2306,7 @@ def section_connector_node(outer_panels):
)
]


templating = Templating()
if namespace_filter_enabled:
templating = Templating(
Expand Down Expand Up @@ -2295,6 +2346,7 @@ def section_connector_node(outer_panels):
version=dashboard_version,
panels=[
*section_cluster_node(panels),
*section_recovery_node(panels),
*section_streaming(panels),
*section_streaming_actors(panels),
*section_streaming_exchange(panels),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

166 changes: 90 additions & 76 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use risingwave_pb::common::{ActorInfo, WorkerNode, WorkerType};
use risingwave_pb::stream_plan::barrier::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::stream_service::{
BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest,
BarrierCompleteResponse, BroadcastActorInfoTableRequest, BuildActorsRequest,
ForceStopActorsRequest, UpdateActorsRequest,
};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, error, warn};
Expand Down Expand Up @@ -119,90 +120,103 @@ where
.await
.expect("clean dirty fragments");
let retry_strategy = Self::get_retry_strategy();
let (new_epoch, _responses) = tokio_retry::Retry::spawn(retry_strategy, || async {
let mut info = self.resolve_actor_info_for_recovery().await;
let mut new_epoch = prev_epoch.next();

// Migrate actors in expired CN to newly joined one.
let migrated = self.migrate_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "migrate actors failed");
})?;
if migrated {
info = self.resolve_actor_info_for_recovery().await;
}

// Reset all compute nodes, stop and drop existing actors.
self.reset_compute_nodes(&info).await.inspect_err(|err| {
error!(err = ?err, "reset compute nodes failed");
})?;

// update and build all actors.
self.update_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "update actors failed");
})?;
self.build_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "build_actors failed");
})?;

// get split assignments for all actors
let source_split_assignments = self.source_manager.list_assignments().await;
let command = Command::Plain(Some(Mutation::Add(AddMutation {
// Actors built during recovery is not treated as newly added actors.
actor_dispatchers: Default::default(),
added_actors: Default::default(),
actor_splits: build_actor_connector_splits(&source_split_assignments),
})));

let prev_epoch = new_epoch;
new_epoch = prev_epoch.next();
// checkpoint, used as init barrier to initialize all executors.
let command_ctx = Arc::new(CommandContext::new(
self.fragment_manager.clone(),
self.snapshot_manager.clone(),
self.env.stream_client_pool_ref(),
info,
prev_epoch,
new_epoch,
command,
true,
self.source_manager.clone(),
));

#[cfg(not(all(test, feature = "failpoints")))]
{
use risingwave_common::util::epoch::INVALID_EPOCH;

let mce = self
.hummock_manager
.get_current_version()
.await
.max_committed_epoch;

if mce != INVALID_EPOCH {
command_ctx.wait_epoch_commit(mce).await?;
// We take retry into consideration because this is the latency user sees for a cluster to
// get recovered.
let recovery_timer = self.metrics.recovery_latency.start_timer();
let (new_epoch, _responses) = tokio_retry::Retry::spawn(retry_strategy, || async {
let recovery_result: MetaResult<(Epoch, Vec<BarrierCompleteResponse>)> = try {
let mut info = self.resolve_actor_info_for_recovery().await;
let mut new_epoch = prev_epoch.next();

// Migrate actors in expired CN to newly joined one.
let migrated = self.migrate_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "migrate actors failed");
})?;
if migrated {
info = self.resolve_actor_info_for_recovery().await;
}
}

let (barrier_complete_tx, mut barrier_complete_rx) =
tokio::sync::mpsc::unbounded_channel();
self.inject_barrier(command_ctx.clone(), &barrier_complete_tx)
.await;
match barrier_complete_rx.recv().await.unwrap().result {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
error!(err = ?err, "post_collect failed");
return Err(err);
// Reset all compute nodes, stop and drop existing actors.
self.reset_compute_nodes(&info).await.inspect_err(|err| {
error!(err = ?err, "reset compute nodes failed");
})?;

// update and build all actors.
self.update_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "update actors failed");
})?;
self.build_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "build_actors failed");
})?;

// get split assignments for all actors
let source_split_assignments = self.source_manager.list_assignments().await;
let command = Command::Plain(Some(Mutation::Add(AddMutation {
// Actors built during recovery is not treated as newly added actors.
actor_dispatchers: Default::default(),
added_actors: Default::default(),
actor_splits: build_actor_connector_splits(&source_split_assignments),
})));

let prev_epoch = new_epoch;
new_epoch = prev_epoch.next();
// checkpoint, used as init barrier to initialize all executors.
let command_ctx = Arc::new(CommandContext::new(
self.fragment_manager.clone(),
self.snapshot_manager.clone(),
self.env.stream_client_pool_ref(),
info,
prev_epoch,
new_epoch,
command,
true,
self.source_manager.clone(),
));

#[cfg(not(all(test, feature = "failpoints")))]
{
use risingwave_common::util::epoch::INVALID_EPOCH;

let mce = self
.hummock_manager
.get_current_version()
.await
.max_committed_epoch;

if mce != INVALID_EPOCH {
command_ctx.wait_epoch_commit(mce).await?;
}
Ok((new_epoch, response))
}
Err(err) => {
error!(err = ?err, "inject_barrier failed");
Err(err)
}

let (barrier_complete_tx, mut barrier_complete_rx) =
tokio::sync::mpsc::unbounded_channel();
self.inject_barrier(command_ctx.clone(), &barrier_complete_tx)
.await;
let res = match barrier_complete_rx.recv().await.unwrap().result {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
error!(err = ?err, "post_collect failed");
Err(err)
} else {
Ok((new_epoch, response))
}
}
Err(err) => {
error!(err = ?err, "inject_barrier failed");
Err(err)
}
};
res?
};
if recovery_result.is_err() {
self.metrics.recovery_failure_cnt.inc();
}
recovery_result
})
.await
.expect("Retry until recovery success.");
recovery_timer.observe_duration();
tracing::info!("recovery success");

new_epoch
Expand Down
Loading