Skip to content

Commit

Permalink
feat(metrics): monitor s3 sdk retry (#9790)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Jun 13, 2023
1 parent 16a0efc commit 9177034
Show file tree
Hide file tree
Showing 28 changed files with 175 additions and 43 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,21 @@ def section_object_storage(outer_panels):
)
],
),
panels.timeseries_ops(
"Operation Retry Rate",
"",
[
panels.target(
f"sum(irate({metric('aws_sdk_retry_counts')}[$__rate_interval])) by (instance, job, type)",
"{{type}} - {{job}} @ {{instance}}",
),

panels.target(
f"sum(irate({metric('s3_read_request_retry_count')}[$__rate_interval])) by (instance, job, type)",
"{{type}} - {{job}} @ {{instance}}",
)
],
),
panels.timeseries_dollar(
"Estimated S3 Cost (Realtime)",
"There are two types of operations: 1. GET, SELECT, and DELETE, they cost 0.0004 USD per 1000 "
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ normal = ["workspace-hack", "workspace-config", "task_stats_alloc"]
[dependencies]
anyhow = "1"
clap = { version = "4", features = ["derive"] }
prometheus = { version = "0.13" }
risingwave_common = { path = "../common" }
risingwave_compactor = { path = "../storage/compactor" }
risingwave_compute = { path = "../compute" }
Expand Down
24 changes: 13 additions & 11 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,39 @@ macro_rules! main {
#[cfg_attr(coverage, no_coverage)]
fn main() {
let opts = clap::Parser::parse();
$crate::$component(opts);
let registry = prometheus::Registry::new();
$crate::$component(opts, registry);
}
};
}

// Entry point functions.

pub fn compute(opts: ComputeNodeOpts) {
pub fn compute(opts: ComputeNodeOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(
risingwave_rt::LoggerSettings::new().enable_tokio_console(false),
registry.clone(),
);
risingwave_rt::main_okk(risingwave_compute::start(opts));
risingwave_rt::main_okk(risingwave_compute::start(opts, registry));
}

pub fn meta(opts: MetaNodeOpts) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new());
pub fn meta(opts: MetaNodeOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry);
risingwave_rt::main_okk(risingwave_meta::start(opts));
}

pub fn frontend(opts: FrontendOpts) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new());
pub fn frontend(opts: FrontendOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry);
risingwave_rt::main_okk(risingwave_frontend::start(opts));
}

pub fn compactor(opts: CompactorOpts) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new());
pub fn compactor(opts: CompactorOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry);
risingwave_rt::main_okk(risingwave_compactor::start(opts));
}

pub fn ctl(opts: CtlOpts) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new());
pub fn ctl(opts: CtlOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry);

// Note: Use a simple current thread runtime for ctl.
// When there's a heavy workload, multiple thread runtime seems to respond slowly. May need
Expand Down
1 change: 1 addition & 0 deletions src/cmd_all/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ anyhow = "1"
clap = { version = "4", features = ["cargo", "derive"] }
console = "0.15"
const-str = "0.5"
prometheus = { version = "0.13" }
risingwave_cmd = { path = "../cmd" }
risingwave_common = { path = "../common" }
risingwave_compactor = { path = "../storage/compactor" }
Expand Down
18 changes: 9 additions & 9 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ impl Component {
fn parse_opts<T: FromArgMatches>(matches: &ArgMatches) -> T {
T::from_arg_matches(matches).map_err(|e| e.exit()).unwrap()
}

let registry = prometheus::Registry::new();
match self {
Self::Compute => compute(parse_opts(matches)),
Self::Meta => meta(parse_opts(matches)),
Self::Frontend => frontend(parse_opts(matches)),
Self::Compactor => compactor(parse_opts(matches)),
Self::Ctl => ctl(parse_opts(matches)),
Self::Playground => playground(parse_opts(matches)),
Self::Compute => compute(parse_opts(matches), registry),
Self::Meta => meta(parse_opts(matches), registry),
Self::Frontend => frontend(parse_opts(matches), registry),
Self::Compactor => compactor(parse_opts(matches), registry),
Self::Ctl => ctl(parse_opts(matches), registry),
Self::Playground => playground(parse_opts(matches), registry),
}
}

Expand Down Expand Up @@ -188,10 +188,10 @@ fn main() -> Result<()> {
Ok(())
}

fn playground(opts: PlaygroundOpts) {
fn playground(opts: PlaygroundOpts, registry: prometheus::Registry) {
let settings = risingwave_rt::LoggerSettings::new()
.enable_tokio_console(false)
.with_target("risingwave_storage", Level::WARN);
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::init_risingwave_logger(settings, registry);
risingwave_rt::main_okk(risingwave_cmd_all::playground(opts)).unwrap();
}
5 changes: 3 additions & 2 deletions src/cmd_all/src/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ pub async fn playground(opts: PlaygroundOpts) -> Result<()> {
opts.insert(0, "compute-node".into());
tracing::info!("starting compute-node thread with cli args: {:?}", opts);
let opts = risingwave_compute::ComputeNodeOpts::parse_from(opts);
let _compute_handle =
tokio::spawn(async move { risingwave_compute::start(opts).await });
let _compute_handle = tokio::spawn(async move {
risingwave_compute::start(opts, prometheus::Registry::new()).await
});
}
RisingWaveService::Frontend(mut opts) => {
opts.insert(0, "frontend-node".into());
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
toml = "0.7"
tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
tracing-subscriber = "0.3.16"
twox-hash = "1"
url = "2"
uuid = "1.2.2"
Expand Down
42 changes: 40 additions & 2 deletions src/common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.

use hytra::TrAdder;
use prometheus::core::{Atomic, GenericGauge};

use prometheus::core::{Atomic, AtomicU64, GenericCounter, GenericGauge};
use prometheus::{register_int_counter_with_registry, Registry};
use tracing::Subscriber;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
pub struct TrAdderAtomic(TrAdder<i64>);

impl Atomic for TrAdderAtomic {
Expand Down Expand Up @@ -44,3 +48,37 @@ impl Atomic for TrAdderAtomic {
}

pub type TrAdderGauge = GenericGauge<TrAdderAtomic>;

/// [`MetricsLayer`] is a struct used for monitoring the frequency of certain specific logs and
/// counting them using Prometheus metrics. Currently, it is used to monitor the frequency of retry
/// occurrences of aws sdk.
pub struct MetricsLayer {
pub aws_sdk_retry_counts: GenericCounter<AtomicU64>,
}

impl<S> Layer<S> for MetricsLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, _event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
// Currently one retry will only generate one debug log,
// so we can monitor the number of retry only through the metadata target.
// Refer to <https://docs.rs/aws-smithy-client/0.55.3/src/aws_smithy_client/retry.rs.html>
self.aws_sdk_retry_counts.inc();
}
}

impl MetricsLayer {
pub fn new(registry: Registry) -> Self {
let aws_sdk_retry_counts = register_int_counter_with_registry!(
"aws_sdk_retry_counts",
"Total number of aws sdk retry happens",
registry
)
.unwrap();

Self {
aws_sdk_retry_counts,
}
}
}
7 changes: 5 additions & 2 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ use std::pin::Pin;
use crate::server::compute_node_serve;

/// Start compute node
pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
pub fn start(
opts: ComputeNodeOpts,
registry: prometheus::Registry,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
// WARNING: don't change the function signature. Making it `async fn` will cause
// slow compile in release mode.
Box::pin(async move {
Expand All @@ -200,7 +203,7 @@ pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>>
tracing::info!("advertise addr is {}", advertise_addr);

let (join_handle_vec, _shutdown_send) =
compute_node_serve(listen_addr, advertise_addr, opts).await;
compute_node_serve(listen_addr, advertise_addr, opts, registry).await;

for join_handle in join_handle_vec {
join_handle.await.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub async fn compute_node_serve(
listen_addr: SocketAddr,
advertise_addr: HostAddr,
opts: ComputeNodeOpts,
registry: prometheus::Registry,
) -> (Vec<JoinHandle<()>>, Sender<()>) {
// Load the configuration.
let config = load_config(&opts.config_path, Some(opts.override_config.clone()));
Expand Down Expand Up @@ -161,7 +162,7 @@ pub async fn compute_node_serve(

let mut sub_tasks: Vec<(JoinHandle<()>, Sender<()>)> = vec![];
// Initialize the metrics subsystem.
let registry = prometheus::Registry::new();

monitor_process(&registry).unwrap();
let source_metrics = Arc::new(SourceMetrics::new(registry.clone()));
let hummock_metrics = Arc::new(HummockMetrics::new(registry.clone()));
Expand Down
10 changes: 10 additions & 0 deletions src/object_store/src/object/object_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct ObjectStoreMetrics {
pub operation_latency: HistogramVec,
pub operation_size: HistogramVec,
pub failure_count: GenericCounterVec<AtomicU64>,
pub request_retry_count: GenericCounterVec<AtomicU64>,
}

impl ObjectStoreMetrics {
Expand Down Expand Up @@ -81,12 +82,21 @@ impl ObjectStoreMetrics {
)
.unwrap();

let request_retry_count = register_int_counter_vec_with_registry!(
"s3_read_request_retry_count",
"The number of retry times of object store request",
&["type"],
registry
)
.unwrap();

Self {
write_bytes,
read_bytes,
operation_latency,
operation_size,
failure_count,
request_retry_count,
}
}

Expand Down
38 changes: 35 additions & 3 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,25 @@ impl ObjectStore for S3ObjectStore {
let resp = tokio_retry::RetryIf::spawn(
Self::get_retry_strategy(),
|| async {
self.obj_store_request(path, start_pos, end_pos)
match self
.obj_store_request(path, start_pos, end_pos)
.send()
.await
{
Ok(resp) => Ok(resp),
Err(err) => {
if let SdkError::DispatchFailure(e) = &err
&& e.is_timeout()
{
self.metrics
.request_retry_count
.with_label_values(&["read"])
.inc();
}

Err(err)
}
}
},
Self::should_retry,
)
Expand Down Expand Up @@ -426,7 +442,23 @@ impl ObjectStore for S3ObjectStore {
// retry if occurs AWS EC2 HTTP timeout error.
let resp = tokio_retry::RetryIf::spawn(
Self::get_retry_strategy(),
|| async { self.obj_store_request(path, start_pos, None).send().await },
|| async {
match self.obj_store_request(path, start_pos, None).send().await {
Ok(resp) => Ok(resp),
Err(err) => {
if let SdkError::DispatchFailure(e) = &err
&& e.is_timeout()
{
self.metrics
.request_retry_count
.with_label_values(&["streaming_read"])
.inc();
}

Err(err)
}
}
},
Self::should_retry,
)
.await?;
Expand Down Expand Up @@ -731,7 +763,7 @@ impl S3ObjectStore {
fn should_retry(err: &SdkError<GetObjectError>) -> bool {
if let SdkError::DispatchFailure(e) = err {
if e.is_timeout() {
tracing::warn!("{:?} occurs, trying to retry S3 get_object request.", e);
tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e);
return true;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/backup/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ normal = ["workspace-hack"]

[dependencies]
clap = { version = "4", features = ["derive"] }
prometheus = { version = "0.13" }
risingwave_backup = { path = "../../backup" }
risingwave_meta = { path = "../../../meta" }
risingwave_rt = { path = "../../../utils/runtime" }
Expand Down
Loading

0 comments on commit 9177034

Please sign in to comment.