Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer grpc] update the metrics to better understand the usage. #10383

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub static LATEST_PROCESSED_VERSION: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_grpc_data_service_with_user_latest_processed_version",
"Latest processed transaction version",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
Expand All @@ -22,7 +22,7 @@ pub static PROCESSED_VERSIONS_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_with_user_processed_versions",
"Number of transactions that have been processed by data service",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
Expand All @@ -42,7 +42,7 @@ pub static PROCESSED_LATENCY_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"indexer_grpc_data_service_with_user_latest_data_latency_in_secs",
"Latency of data service based on latest processed transaction",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
Expand All @@ -62,7 +62,7 @@ pub static PROCESSED_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_grpc_data_service_with_user_processed_batch_size",
"Size of latest processed batch by data service",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
Expand All @@ -77,10 +77,11 @@ pub static CONNECTION_COUNT: Lazy<IntCounter> = Lazy::new(|| {
});

/// Count of the short connections; i.e., < 10 seconds.
pub static SHORT_CONNECTION_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"indexer_grpc_data_service_short_connection_count",
pub static SHORT_CONNECTION_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_short_connection_by_user_count",
"Count of the short connections; i.e., < 10 seconds",
&["request_token", "email"],
)
.unwrap()
});
Expand All @@ -91,7 +92,7 @@ pub static BYTES_READY_TO_TRANSFER_FROM_SERVER: Lazy<IntCounterVec> = Lazy::new(
register_int_counter_vec!(
"indexer_grpc_data_service_bytes_ready_to_transfer_from_server",
"Count of bytes ready to transfer to the client",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
48 changes: 35 additions & 13 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ type ResponseStream = Pin<Box<dyn Stream<Item = Result<TransactionsResponse, Sta

#[derive(Clone, Serialize, Deserialize, Debug)]
struct RequestMetadata {
pub request_token: String,
pub request_name: String,
pub processor_name: String,
pub request_email: String,
pub request_user_classification: String,
pub request_api_key_name: String,
// Token is no longer needed behind api gateway.
#[deprecated]
pub request_token: String,
}

const MOVING_AVERAGE_WINDOW_SIZE: u64 = 10_000;
Expand Down Expand Up @@ -150,10 +152,10 @@ impl RawData for RawDataServerWrapper {
let serving_span = tracing::span!(
tracing::Level::INFO,
"Data Serving",
request_name = request_metadata.request_name.as_str(),
request_name = request_metadata.processor_name.as_str(),
request_email = request_metadata.request_email.as_str(),
request_api_key_name = request_metadata.request_api_key_name.as_str(),
request_token = request_metadata.request_token.as_str(),
processor_name = request_metadata.processor_name.as_str(),
request_user_classification = request_metadata.request_user_classification.as_str(),
);

Expand All @@ -168,7 +170,12 @@ impl RawData for RawDataServerWrapper {
ERROR_COUNT
.with_label_values(&["redis_connection_failed"])
.inc();
SHORT_CONNECTION_COUNT.inc();
SHORT_CONNECTION_COUNT
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
])
.inc();
// Connection will be dropped anyway, so we ignore the error here.
let _result = tx
.send_timeout(
Expand All @@ -194,7 +201,12 @@ impl RawData for RawDataServerWrapper {
ERROR_COUNT
.with_label_values(&["redis_get_chain_id_failed"])
.inc();
SHORT_CONNECTION_COUNT.inc();
SHORT_CONNECTION_COUNT
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
])
.inc();
// Connection will be dropped anyway, so we ignore the error here.
let _result = tx
.send_timeout(
Expand Down Expand Up @@ -270,8 +282,9 @@ impl RawData for RawDataServerWrapper {
.sum::<usize>();
BYTES_READY_TO_TRANSFER_FROM_SERVER
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.inc_by(bytes_ready_to_transfer as u64);
// 2. Push the data to the response channel, i.e. stream the data to the client.
Expand All @@ -293,20 +306,23 @@ impl RawData for RawDataServerWrapper {
Ok(_) => {
PROCESSED_BATCH_SIZE
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.set(current_batch_size as i64);
LATEST_PROCESSED_VERSION
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.set(end_of_batch_version as i64);
PROCESSED_VERSIONS_COUNT
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.inc_by(current_batch_size as u64);
if let Some(data_latency_in_secs) = data_latency_in_secs {
Expand All @@ -315,8 +331,9 @@ impl RawData for RawDataServerWrapper {
if current_batch_size % BLOB_STORAGE_SIZE != 0 {
PROCESSED_LATENCY_IN_SECS
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.set(data_latency_in_secs);
PROCESSED_LATENCY_IN_SECS_ALL
Expand Down Expand Up @@ -353,7 +370,12 @@ impl RawData for RawDataServerWrapper {
info!("[Indexer Data] Client disconnected.");
if let Some(start_time) = connection_start_time {
if start_time.elapsed().as_secs() < SHORT_CONNECTION_DURATION_IN_SECS {
SHORT_CONNECTION_COUNT.inc();
SHORT_CONNECTION_COUNT
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
])
.inc();
}
}
}
Expand Down Expand Up @@ -471,7 +493,7 @@ fn get_request_metadata(req: &Request<GetTransactionsRequest>) -> tonic::Result<
REQUEST_HEADER_APTOS_USER_CLASSIFICATION_HEADER,
),
("request_token", GRPC_AUTH_TOKEN_HEADER),
("request_name", GRPC_REQUEST_NAME_HEADER),
("processor_name", GRPC_REQUEST_NAME_HEADER),
];
let request_metadata_map: HashMap<String, String> = request_metadata_pairs
.into_iter()
Expand Down
Loading