Skip to content

Commit

Permalink
enhancement(sources, sinks): add telemetry to http and grpc servers (v…
Browse files Browse the repository at this point in the history
…ectordotdev#18887)

* enhancement(sources, sinks): add telemetry to http and grpc servers

* fix syntax

* clippy

* respond to feedback and improve docs

* fmt

* fmt again

* fix cue

* remove feature flags from http internal events
  • Loading branch information
dsmith3197 authored Oct 27, 2023
1 parent a477d72 commit e779019
Show file tree
Hide file tree
Showing 25 changed files with 490 additions and 119 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ opendal = {version = "0.38", default-features = false, features = ["native-tls",

# Tower
tower = { version = "0.4.13", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
tower-http = { version = "0.4.4", default-features = false, features = ["decompression-gzip"]}
tower-http = { version = "0.4.4", default-features = false, features = ["decompression-gzip", "trace"]}
# Serde
serde = { version = "1.0.190", default-features = false, features = ["derive"] }
serde-toml-merge = { version = "0.3.3", default-features = false }
Expand Down
25 changes: 19 additions & 6 deletions lib/vector-core/src/metrics/label_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,24 @@ use metrics_tracing_context::LabelFilter;
pub(crate) struct VectorLabelFilter;

impl LabelFilter for VectorLabelFilter {
fn should_include_label(&self, _key: &KeyName, label: &Label) -> bool {
let key = label.key();
key == "component_id"
|| key == "component_type"
|| key == "component_kind"
|| key == "buffer_type"
fn should_include_label(&self, metric_key: &KeyName, label: &Label) -> bool {
let label_key = label.key();
// HTTP Server-specific labels
if metric_key.as_str().starts_with("http_server_")
&& (label_key == "method" || label_key == "path")
{
return true;
}
// gRPC Server-specific labels
if metric_key.as_str().starts_with("grpc_server_")
&& (label_key == "grpc_method" || label_key == "grpc_service")
{
return true;
}
// Global labels
label_key == "component_id"
|| label_key == "component_type"
|| label_key == "component_kind"
|| label_key == "buffer_type"
}
}
45 changes: 32 additions & 13 deletions src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ use async_graphql::{
Data, Request, Schema,
};
use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket};
use hyper::{server::conn::AddrIncoming, service::make_service_fn, Server as HyperServer};
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tower::ServiceBuilder;
use tracing::Span;
use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply};

use super::{handler, schema, ShutdownTx};
use crate::{
config,
http::build_http_trace_layer,
internal_events::{SocketBindError, SocketMode},
topology,
};
Expand All @@ -39,20 +43,35 @@ impl Server {
let (_shutdown, rx) = oneshot::channel();
// warp uses `tokio::spawn` and so needs us to enter the runtime context.
let _guard = handle.enter();
let (addr, server) = warp::serve(routes)
.try_bind_with_graceful_shutdown(
config.api.address.expect("No socket address"),
async {

let addr = config.api.address.expect("No socket address");
let incoming = AddrIncoming::bind(&addr).map_err(|error| {
emit!(SocketBindError {
mode: SocketMode::Tcp,
error: &error,
});
error
})?;

let span = Span::current();
let make_svc = make_service_fn(move |_conn| {
let svc = ServiceBuilder::new()
.layer(build_http_trace_layer(span.clone()))
.service(warp::service(routes.clone()));
futures_util::future::ok::<_, Infallible>(svc)
});

let server = async move {
HyperServer::builder(incoming)
.serve(make_svc)
.with_graceful_shutdown(async {
rx.await.ok();
},
)
.map_err(|error| {
emit!(SocketBindError {
mode: SocketMode::Tcp,
error: &error,
});
error
})?;
})
.await
.map_err(|err| {
error!("An error occurred: {:?}.", err);
})
};

// Update component schema with the config before starting the server.
schema::components::update_config(config);
Expand Down
50 changes: 47 additions & 3 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
use std::{
fmt,
task::{Context, Poll},
time::Duration,
};

use futures::future::BoxFuture;
use headers::{Authorization, HeaderMapExt};
use http::{header::HeaderValue, request::Builder, uri::InvalidUri, HeaderMap, Request, Uri};
use http::{
header::HeaderValue, request::Builder, uri::InvalidUri, HeaderMap, Request, Response, Uri,
};
use hyper::{
body::{Body, HttpBody},
client,
Expand All @@ -16,13 +19,17 @@ use hyper_openssl::HttpsConnector;
use hyper_proxy::ProxyConnector;
use snafu::{ResultExt, Snafu};
use tower::Service;
use tracing::Instrument;
use tower_http::{
classify::{ServerErrorsAsFailures, SharedClassifier},
trace::TraceLayer,
};
use tracing::{Instrument, Span};
use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;

use crate::{
config::ProxyConfig,
internal_events::http_client,
internal_events::{http_client, HttpServerRequestReceived, HttpServerResponseSent},
tls::{tls_connector_builder, MaybeTlsSettings, TlsError},
};

Expand Down Expand Up @@ -338,6 +345,43 @@ pub fn get_http_scheme_from_uri(uri: &Uri) -> &'static str {
})
}

/// Builds a [TraceLayer] configured for a HTTP server.
///
/// This layer emits HTTP specific telemetry for requests received, responses sent, and handler duration.
pub fn build_http_trace_layer(
span: Span,
) -> TraceLayer<
SharedClassifier<ServerErrorsAsFailures>,
impl Fn(&Request<Body>) -> Span + Clone,
impl Fn(&Request<Body>, &Span) + Clone,
impl Fn(&Response<Body>, Duration, &Span) + Clone,
(),
(),
(),
> {
TraceLayer::new_for_http()
.make_span_with(move |request: &Request<Body>| {
// This is an error span so that the labels are always present for metrics.
error_span!(
parent: &span,
"http-request",
method = %request.method(),
path = %request.uri().path(),
)
})
.on_request(Box::new(|_request: &Request<Body>, _span: &Span| {
emit!(HttpServerRequestReceived);
}))
.on_response(
|response: &Response<Body>, latency: Duration, _span: &Span| {
emit!(HttpServerResponseSent { response, latency });
},
)
.on_failure(())
.on_body_chunk(())
.on_eos(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
61 changes: 60 additions & 1 deletion src/internal_events/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,44 @@
use metrics::counter;
use std::time::Duration;

use http::response::Response;
use metrics::{counter, histogram};
use tonic::Code;
use vector_lib::internal_event::InternalEvent;
use vector_lib::internal_event::{error_stage, error_type};

const GRPC_STATUS_LABEL: &str = "grpc_status";

#[derive(Debug)]
pub struct GrpcServerRequestReceived;

impl InternalEvent for GrpcServerRequestReceived {
fn emit(self) {
counter!("grpc_server_messages_received_total", 1);
}
}

#[derive(Debug)]
pub struct GrpcServerResponseSent<'a, B> {
pub response: &'a Response<B>,
pub latency: Duration,
}

impl<'a, B> InternalEvent for GrpcServerResponseSent<'a, B> {
fn emit(self) {
let grpc_code = self
.response
.headers()
.get("grpc-status")
// The header value is missing on success.
.map_or(tonic::Code::Ok, |v| tonic::Code::from_bytes(v.as_bytes()));
let grpc_code = grpc_code_to_name(grpc_code);

let labels = &[(GRPC_STATUS_LABEL, grpc_code)];
counter!("grpc_server_messages_sent_total", 1, labels);
histogram!("grpc_server_handler_duration_seconds", self.latency, labels);
}
}

#[derive(Debug)]
pub struct GrpcInvalidCompressionSchemeError<'a> {
pub status: &'a tonic::Status,
Expand Down Expand Up @@ -48,3 +85,25 @@ where
);
}
}

const fn grpc_code_to_name(code: Code) -> &'static str {
match code {
Code::Ok => "Ok",
Code::Cancelled => "Cancelled",
Code::Unknown => "Unknown",
Code::InvalidArgument => "InvalidArgument",
Code::DeadlineExceeded => "DeadlineExceeded",
Code::NotFound => "NotFound",
Code::AlreadyExists => "AlreadyExists",
Code::PermissionDenied => "PermissionDenied",
Code::ResourceExhausted => "ResourceExhausted",
Code::FailedPrecondition => "FailedPrecondition",
Code::Aborted => "Aborted",
Code::OutOfRange => "OutOfRange",
Code::Unimplemented => "Unimplemented",
Code::Internal => "Internal",
Code::Unavailable => "Unavailable",
Code::DataLoss => "DataLoss",
Code::Unauthenticated => "Unauthenticated",
}
}
31 changes: 30 additions & 1 deletion src/internal_events/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::error::Error;
use std::{error::Error, time::Duration};

use http::Response;
use metrics::{counter, histogram};
use vector_lib::internal_event::InternalEvent;

Expand All @@ -8,6 +9,34 @@ use vector_lib::{
json_size::JsonSize,
};

const HTTP_STATUS_LABEL: &str = "status";

#[derive(Debug)]
pub struct HttpServerRequestReceived;

impl InternalEvent for HttpServerRequestReceived {
fn emit(self) {
counter!("http_server_requests_received_total", 1);
}
}

#[derive(Debug)]
pub struct HttpServerResponseSent<'a, B> {
pub response: &'a Response<B>,
pub latency: Duration,
}

impl<'a, B> InternalEvent for HttpServerResponseSent<'a, B> {
fn emit(self) {
let labels = &[(
HTTP_STATUS_LABEL,
self.response.status().as_u16().to_string(),
)];
counter!("http_server_responses_sent_total", 1, labels);
histogram!("http_server_handler_duration_seconds", self.latency, labels);
}
}

#[derive(Debug)]
pub struct HttpBytesReceived<'a> {
pub byte_size: usize,
Expand Down
9 changes: 1 addition & 8 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,6 @@ pub(crate) use self::gcp_pubsub::*;
pub(crate) use self::grpc::*;
#[cfg(feature = "sources-host_metrics")]
pub(crate) use self::host_metrics::*;
#[cfg(any(
feature = "sources-utils-http",
feature = "sources-utils-http-encoding",
feature = "sources-datadog_agent",
feature = "sources-splunk_hec",
))]
pub(crate) use self::http::*;
#[cfg(feature = "sources-utils-http-client")]
pub(crate) use self::http_client_source::*;
#[cfg(feature = "sinks-influxdb")]
Expand Down Expand Up @@ -268,7 +261,7 @@ pub(crate) use self::websocket::*;
pub(crate) use self::windows::*;
pub use self::{
adaptive_concurrency::*, batch::*, common::*, conditions::*, encoding_transcode::*,
heartbeat::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*,
heartbeat::*, http::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*,
};

// this version won't be needed once all `InternalEvent`s implement `name()`
Expand Down
27 changes: 15 additions & 12 deletions src/sinks/prometheus/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use indexmap::{map::Entry, IndexMap};
use serde_with::serde_as;
use snafu::Snafu;
use stream_cancel::{Trigger, Tripwire};
use tower::ServiceBuilder;
use tracing::{Instrument, Span};
use vector_lib::configurable::configurable_component;
use vector_lib::{
Expand All @@ -36,7 +37,7 @@ use crate::{
metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue},
Event, EventStatus, Finalizable,
},
http::Auth,
http::{build_http_trace_layer, Auth},
internal_events::{PrometheusNormalizationError, PrometheusServerRequestComplete},
sinks::{
util::{
Expand Down Expand Up @@ -485,19 +486,21 @@ impl PrometheusExporter {
let metrics = Arc::clone(&metrics);
let handler = handler.clone();

async move {
Ok::<_, Infallible>(service_fn(move |req| {
span.in_scope(|| {
let response = handler.handle(req, &metrics);
let inner = service_fn(move |req| {
let response = handler.handle(req, &metrics);

emit!(PrometheusServerRequestComplete {
status_code: response.status(),
});
emit!(PrometheusServerRequestComplete {
status_code: response.status(),
});

future::ok::<_, Infallible>(response)
})
}))
}
future::ok::<_, Infallible>(response)
});

let service = ServiceBuilder::new()
.layer(build_http_trace_layer(span.clone()))
.service(inner);

async move { Ok::<_, Infallible>(service) }
});

let (trigger, tripwire) = Tripwire::new();
Expand Down
Loading

0 comments on commit e779019

Please sign in to comment.