Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): monitor s3 sdk retry #9790

Merged
merged 33 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,21 @@ def section_object_storage(outer_panels):
)
],
),
panels.timeseries_ops(
"Operation Retry Rate",
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
"",
[
panels.target(
f"sum(irate({metric('aws_sdk_retry_counts')}[$__rate_interval])) by (instance, job, type)",
"{{type}} - {{job}} @ {{instance}}",
),

panels.target(
f"sum(irate({metric('s3_read_request_retry_count')}[$__rate_interval])) by (instance, job, type)",
"{{type}} - {{job}} @ {{instance}}",
)
],
),
panels.timeseries_dollar(
"Estimated S3 Cost (Realtime)",
"There are two types of operations: 1. GET, SELECT, and DELETE, they cost 0.0004 USD per 1000 "
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ normal = ["workspace-hack", "workspace-config", "task_stats_alloc"]
[dependencies]
anyhow = "1"
clap = { version = "4", features = ["derive"] }
prometheus = { version = "0.13" }
risingwave_common = { path = "../common" }
risingwave_compactor = { path = "../storage/compactor" }
risingwave_compute = { path = "../compute" }
Expand Down
24 changes: 13 additions & 11 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,39 @@ macro_rules! main {
#[cfg_attr(coverage, no_coverage)]
fn main() {
let opts = clap::Parser::parse();
$crate::$component(opts);
let registry = prometheus::Registry::new();
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
$crate::$component(opts, registry);
}
};
}

// Entry point functions.

pub fn compute(opts: ComputeNodeOpts) {
pub fn compute(opts: ComputeNodeOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(
risingwave_rt::LoggerSettings::new().enable_tokio_console(false),
registry.clone(),
);
risingwave_rt::main_okk(risingwave_compute::start(opts));
risingwave_rt::main_okk(risingwave_compute::start(opts, registry));
}

pub fn meta(opts: MetaNodeOpts) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new());
pub fn meta(opts: MetaNodeOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry);
risingwave_rt::main_okk(risingwave_meta::start(opts));
}

pub fn frontend(opts: FrontendOpts) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new());
pub fn frontend(opts: FrontendOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry);
risingwave_rt::main_okk(risingwave_frontend::start(opts));
}

pub fn compactor(opts: CompactorOpts) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new());
pub fn compactor(opts: CompactorOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry);
risingwave_rt::main_okk(risingwave_compactor::start(opts));
}

pub fn ctl(opts: CtlOpts) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new());
pub fn ctl(opts: CtlOpts, registry: prometheus::Registry) {
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry);

// Note: Use a simple current thread runtime for ctl.
// When there's a heavy workload, multiple thread runtime seems to respond slowly. May need
Expand Down
1 change: 1 addition & 0 deletions src/cmd_all/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ anyhow = "1"
clap = { version = "4", features = ["cargo", "derive"] }
console = "0.15"
const-str = "0.5"
prometheus = { version = "0.13" }
risingwave_cmd = { path = "../cmd" }
risingwave_common = { path = "../common" }
risingwave_compactor = { path = "../storage/compactor" }
Expand Down
18 changes: 9 additions & 9 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ impl Component {
fn parse_opts<T: FromArgMatches>(matches: &ArgMatches) -> T {
T::from_arg_matches(matches).map_err(|e| e.exit()).unwrap()
}

let registry = prometheus::Registry::new();
match self {
Self::Compute => compute(parse_opts(matches)),
Self::Meta => meta(parse_opts(matches)),
Self::Frontend => frontend(parse_opts(matches)),
Self::Compactor => compactor(parse_opts(matches)),
Self::Ctl => ctl(parse_opts(matches)),
Self::Playground => playground(parse_opts(matches)),
Self::Compute => compute(parse_opts(matches), registry),
Self::Meta => meta(parse_opts(matches), registry),
Self::Frontend => frontend(parse_opts(matches), registry),
Self::Compactor => compactor(parse_opts(matches), registry),
Self::Ctl => ctl(parse_opts(matches), registry),
Self::Playground => playground(parse_opts(matches), registry),
}
}

Expand Down Expand Up @@ -141,11 +141,11 @@ fn main() -> Result<()> {
Ok(())
}

fn playground(opts: PlaygroundOpts) {
fn playground(opts: PlaygroundOpts, registry: prometheus::Registry) {
let settings = risingwave_rt::LoggerSettings::new()
.enable_tokio_console(false)
.with_target("risingwave_storage", Level::WARN);
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::init_risingwave_logger(settings, registry);
risingwave_rt::main_okk(risingwave_cmd_all::playground(opts)).unwrap();
}

Expand Down
5 changes: 3 additions & 2 deletions src/cmd_all/src/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ pub async fn playground(opts: PlaygroundOpts) -> Result<()> {
opts.insert(0, "compute-node".into());
tracing::info!("starting compute-node thread with cli args: {:?}", opts);
let opts = risingwave_compute::ComputeNodeOpts::parse_from(opts);
let _compute_handle =
tokio::spawn(async move { risingwave_compute::start(opts).await });
let _compute_handle = tokio::spawn(async move {
risingwave_compute::start(opts, prometheus::Registry::new()).await
});
}
RisingWaveService::Frontend(mut opts) => {
opts.insert(0, "frontend-node".into());
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
toml = "0.7"
tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
tracing-subscriber = "0.3.16"
twox-hash = "1"
url = "2"
uuid = "1.2.2"
Expand Down
54 changes: 52 additions & 2 deletions src/common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.

use hytra::TrAdder;
use prometheus::core::{Atomic, GenericGauge};

use prometheus::core::{Atomic, AtomicU64, GenericCounter, GenericGauge};
use prometheus::{register_int_counter_with_registry, Registry};
use tracing::Subscriber;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
pub struct TrAdderAtomic(TrAdder<i64>);

impl Atomic for TrAdderAtomic {
Expand Down Expand Up @@ -44,3 +48,49 @@ impl Atomic for TrAdderAtomic {
}

pub type TrAdderGauge = GenericGauge<TrAdderAtomic>;

pub struct CustomLayer {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
pub aws_sdk_retry_counts: GenericCounter<AtomicU64>,
pub aws_http_timeout_retry_counts: GenericCounter<AtomicU64>,
}

impl<S> Layer<S> for CustomLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
if event.metadata().target() == "aws_smithy_client::retry"
&& event.metadata().level() == &tracing::Level::DEBUG
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
{
self.aws_sdk_retry_counts.inc();
}

if event.metadata().target() == "http_timeout_retry"
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
&& event.metadata().level() == &tracing::Level::DEBUG
{
self.aws_http_timeout_retry_counts.inc();
}
}
}

impl CustomLayer {
pub fn new(registry: Registry) -> Self {
let aws_sdk_retry_counts = register_int_counter_with_registry!(
"aws_sdk_retry_counts",
"Total number of aws sdk retry happens",
registry
)
.unwrap();

let aws_http_timeout_retry_counts = register_int_counter_with_registry!(
"aws_http_timeout_retry_counts",
"Total number of s3 http timeout retry happens",
registry
)
.unwrap();
Self {
aws_sdk_retry_counts,
aws_http_timeout_retry_counts,
}
}
}
7 changes: 5 additions & 2 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ use std::pin::Pin;
use crate::server::compute_node_serve;

/// Start compute node
pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
pub fn start(
opts: ComputeNodeOpts,
registry: prometheus::Registry,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
// WARNING: don't change the function signature. Making it `async fn` will cause
// slow compile in release mode.
Box::pin(async move {
Expand All @@ -200,7 +203,7 @@ pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>>
tracing::info!("advertise addr is {}", advertise_addr);

let (join_handle_vec, _shutdown_send) =
compute_node_serve(listen_addr, advertise_addr, opts).await;
compute_node_serve(listen_addr, advertise_addr, opts, registry).await;

for join_handle in join_handle_vec {
join_handle.await.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub async fn compute_node_serve(
listen_addr: SocketAddr,
advertise_addr: HostAddr,
opts: ComputeNodeOpts,
registry: prometheus::Registry,
) -> (Vec<JoinHandle<()>>, Sender<()>) {
// Load the configuration.
let config = load_config(&opts.config_path, Some(opts.override_config.clone()));
Expand Down Expand Up @@ -161,7 +162,7 @@ pub async fn compute_node_serve(

let mut sub_tasks: Vec<(JoinHandle<()>, Sender<()>)> = vec![];
// Initialize the metrics subsystem.
let registry = prometheus::Registry::new();

monitor_process(&registry).unwrap();
let source_metrics = Arc::new(SourceMetrics::new(registry.clone()));
let hummock_metrics = Arc::new(HummockMetrics::new(registry.clone()));
Expand Down
1 change: 1 addition & 0 deletions src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
] }
tokio-retry = "0.3"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
# This crate is excluded from hakari (see hakari.toml) after hdfs is introduced...
#
# [target.'cfg(not(madsim))'.dependencies]
Expand Down
10 changes: 10 additions & 0 deletions src/object_store/src/object/object_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct ObjectStoreMetrics {
pub operation_latency: HistogramVec,
pub operation_size: HistogramVec,
pub failure_count: GenericCounterVec<AtomicU64>,
pub request_retry_count: GenericCounterVec<AtomicU64>,
}

impl ObjectStoreMetrics {
Expand Down Expand Up @@ -81,12 +82,21 @@ impl ObjectStoreMetrics {
)
.unwrap();

let request_retry_count = register_int_counter_vec_with_registry!(
"s3_read_request_retry_count",
"The number of retry times of object store request",
&["type"],
registry
)
.unwrap();

Self {
write_bytes,
read_bytes,
operation_latency,
operation_size,
failure_count,
request_retry_count,
}
}

Expand Down
Loading