Skip to content

Commit

Permalink
chore: add metrics for log ingestion (#4411)
Browse files Browse the repository at this point in the history
* chore: add metrics for log ingestion

* chore: record result as well
  • Loading branch information
shuiyisong authored Jul 23, 2024
1 parent 49f22f0 commit 547730a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
41 changes: 38 additions & 3 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use axum::http::header::CONTENT_TYPE;
use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, TypedHeader};
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu};
use pipeline::util::to_pipeline_version;
Expand All @@ -40,6 +41,10 @@ use crate::error::{
use crate::http::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
};
use crate::query_handler::LogHandlerRef;

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
Expand Down Expand Up @@ -298,14 +303,27 @@ async fn ingest_logs_inner(
pipeline_data: PipelineValue,
query_ctx: QueryContextRef,
) -> Result<HttpResponse> {
let start = std::time::Instant::now();
let db = query_ctx.get_db_string();
let exec_timer = std::time::Instant::now();

let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

let transform_timer = std::time::Instant::now();
let transformed_data: Rows = pipeline
.exec(pipeline_data)
.map_err(|reason| PipelineTransformSnafu { reason }.build())
.inspect(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.map_err(|reason| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
PipelineTransformSnafu { reason }.build()
})
.context(PipelineSnafu)?;

let insert_request = RowInsertRequest {
Expand All @@ -317,9 +335,26 @@ async fn ingest_logs_inner(
};
let output = state.insert_logs(insert_requests, query_ctx).await;

if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
METRIC_HTTP_LOGS_INGESTION_COUNTER
.with_label_values(&[db.as_str()])
.inc_by(*rows as u64);
METRIC_HTTP_LOGS_INGESTION_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
} else {
METRIC_HTTP_LOGS_INGESTION_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
}

let response = GreptimedbV1Response::from_output(vec![output])
.await
.with_execution_time(start.elapsed().as_millis() as u64);
.with_execution_time(exec_timer.elapsed().as_millis() as u64);
Ok(response)
}

Expand Down
24 changes: 24 additions & 0 deletions src/servers/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub(crate) const METRIC_POSTGRES_SIMPLE_QUERY: &str = "simple";
pub(crate) const METRIC_POSTGRES_EXTENDED_QUERY: &str = "extended";
pub(crate) const METRIC_METHOD_LABEL: &str = "method";
pub(crate) const METRIC_PATH_LABEL: &str = "path";
pub(crate) const METRIC_RESULT_LABEL: &str = "result";

pub(crate) const METRIC_SUCCESS_VALUE: &str = "success";
pub(crate) const METRIC_FAILURE_VALUE: &str = "failure";

lazy_static! {
pub static ref METRIC_ERROR_COUNTER: IntCounterVec = register_int_counter_vec!(
Expand Down Expand Up @@ -130,6 +134,26 @@ lazy_static! {
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
"greptime_servers_http_logs_ingestion_counter",
"servers http logs ingestion counter",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_INGESTION_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_logs_ingestion_elapsed",
"servers http logs ingestion elapsed",
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_logs_transform_elapsed",
"servers http logs transform elapsed",
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!(
"greptime_servers_mysql_connection_count",
"servers mysql connection count"
Expand Down

0 comments on commit 547730a

Please sign in to comment.