Skip to content

Commit

Permalink
[indexer grpc] update the metrics for usage analysis. (#10383)
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos committed Oct 23, 2023
1 parent 85be081 commit d5528d2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 21 deletions.
17 changes: 9 additions & 8 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub static LATEST_PROCESSED_VERSION: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_grpc_data_service_with_user_latest_processed_version",
"Latest processed transaction version",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
Expand All @@ -22,7 +22,7 @@ pub static PROCESSED_VERSIONS_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_with_user_processed_versions",
"Number of transactions that have been processed by data service",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
Expand All @@ -42,7 +42,7 @@ pub static PROCESSED_LATENCY_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"indexer_grpc_data_service_with_user_latest_data_latency_in_secs",
"Latency of data service based on latest processed transaction",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
Expand All @@ -62,7 +62,7 @@ pub static PROCESSED_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_grpc_data_service_with_user_processed_batch_size",
"Size of latest processed batch by data service",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
Expand All @@ -77,10 +77,11 @@ pub static CONNECTION_COUNT: Lazy<IntCounter> = Lazy::new(|| {
});

/// Count of the short connections; i.e., < 10 seconds.
pub static SHORT_CONNECTION_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"indexer_grpc_data_service_short_connection_count",
pub static SHORT_CONNECTION_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_short_connection_by_user_count",
"Count of the short connections; i.e., < 10 seconds",
&["request_token", "email"],
)
.unwrap()
});
Expand All @@ -91,7 +92,7 @@ pub static BYTES_READY_TO_TRANSFER_FROM_SERVER: Lazy<IntCounterVec> = Lazy::new(
register_int_counter_vec!(
"indexer_grpc_data_service_bytes_ready_to_transfer_from_server",
"Count of bytes ready to transfer to the client",
&["request_token", "email"],
&["request_token", "email", "processor"],
)
.unwrap()
});
48 changes: 35 additions & 13 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ type ResponseStream = Pin<Box<dyn Stream<Item = Result<TransactionsResponse, Sta

#[derive(Clone, Serialize, Deserialize, Debug)]
struct RequestMetadata {
pub request_token: String,
pub request_name: String,
pub processor_name: String,
pub request_email: String,
pub request_user_classification: String,
pub request_api_key_name: String,
// Token is no longer needed behind api gateway.
#[deprecated]
pub request_token: String,
}

const MOVING_AVERAGE_WINDOW_SIZE: u64 = 10_000;
Expand Down Expand Up @@ -150,10 +152,10 @@ impl RawData for RawDataServerWrapper {
let serving_span = tracing::span!(
tracing::Level::INFO,
"Data Serving",
request_name = request_metadata.request_name.as_str(),
request_name = request_metadata.processor_name.as_str(),
request_email = request_metadata.request_email.as_str(),
request_api_key_name = request_metadata.request_api_key_name.as_str(),
request_token = request_metadata.request_token.as_str(),
processor_name = request_metadata.processor_name.as_str(),
request_user_classification = request_metadata.request_user_classification.as_str(),
);

Expand All @@ -168,7 +170,12 @@ impl RawData for RawDataServerWrapper {
ERROR_COUNT
.with_label_values(&["redis_connection_failed"])
.inc();
SHORT_CONNECTION_COUNT.inc();
SHORT_CONNECTION_COUNT
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
])
.inc();
// Connection will be dropped anyway, so we ignore the error here.
let _result = tx
.send_timeout(
Expand All @@ -194,7 +201,12 @@ impl RawData for RawDataServerWrapper {
ERROR_COUNT
.with_label_values(&["redis_get_chain_id_failed"])
.inc();
SHORT_CONNECTION_COUNT.inc();
SHORT_CONNECTION_COUNT
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
])
.inc();
// Connection will be dropped anyway, so we ignore the error here.
let _result = tx
.send_timeout(
Expand Down Expand Up @@ -270,8 +282,9 @@ impl RawData for RawDataServerWrapper {
.sum::<usize>();
BYTES_READY_TO_TRANSFER_FROM_SERVER
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.inc_by(bytes_ready_to_transfer as u64);
// 2. Push the data to the response channel, i.e. stream the data to the client.
Expand All @@ -293,20 +306,23 @@ impl RawData for RawDataServerWrapper {
Ok(_) => {
PROCESSED_BATCH_SIZE
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.set(current_batch_size as i64);
LATEST_PROCESSED_VERSION
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.set(end_of_batch_version as i64);
PROCESSED_VERSIONS_COUNT
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.inc_by(current_batch_size as u64);
if let Some(data_latency_in_secs) = data_latency_in_secs {
Expand All @@ -315,8 +331,9 @@ impl RawData for RawDataServerWrapper {
if current_batch_size % BLOB_STORAGE_SIZE != 0 {
PROCESSED_LATENCY_IN_SECS
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.processor_name.as_str(),
])
.set(data_latency_in_secs);
PROCESSED_LATENCY_IN_SECS_ALL
Expand Down Expand Up @@ -353,7 +370,12 @@ impl RawData for RawDataServerWrapper {
info!("[Indexer Data] Client disconnected.");
if let Some(start_time) = connection_start_time {
if start_time.elapsed().as_secs() < SHORT_CONNECTION_DURATION_IN_SECS {
SHORT_CONNECTION_COUNT.inc();
SHORT_CONNECTION_COUNT
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
])
.inc();
}
}
}
Expand Down Expand Up @@ -471,7 +493,7 @@ fn get_request_metadata(req: &Request<GetTransactionsRequest>) -> tonic::Result<
REQUEST_HEADER_APTOS_USER_CLASSIFICATION_HEADER,
),
("request_token", GRPC_AUTH_TOKEN_HEADER),
("request_name", GRPC_REQUEST_NAME_HEADER),
("processor_name", GRPC_REQUEST_NAME_HEADER),
];
let request_metadata_map: HashMap<String, String> = request_metadata_pairs
.into_iter()
Expand Down

0 comments on commit d5528d2

Please sign in to comment.