Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for HTTP proxy: replace the HTTP client with reqwest #3424

Merged
merged 12 commits into from
Oct 28, 2024
Prev Previous commit
Next Next commit
Restore HTTP request metrics
sandhose committed Oct 25, 2024
commit fdf388179c799a951e3512ccf87682fcaba4e830
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/http/Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ opentelemetry-semantic-conventions.workspace = true
opentelemetry.workspace = true
reqwest.workspace = true
rustls-platform-verifier.workspace = true
tokio.workspace = true
tower.workspace = true
tower-http.workspace = true
tracing.workspace = true
11 changes: 11 additions & 0 deletions crates/http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -9,10 +9,21 @@
#![deny(rustdoc::missing_crate_level_docs)]
#![allow(clippy::module_name_repetitions)]

use std::sync::LazyLock;

mod ext;
mod reqwest;

pub use self::{
ext::{set_propagator, CorsLayerExt},
reqwest::{client as reqwest_client, RequestBuilderExt},
};

static METER: LazyLock<opentelemetry::metrics::Meter> = LazyLock::new(|| {
opentelemetry::global::meter_with_version(
env!("CARGO_PKG_NAME"),
Some(env!("CARGO_PKG_VERSION")),
Some(opentelemetry_semantic_conventions::SCHEMA_URL),
None,
)
});
108 changes: 87 additions & 21 deletions crates/http/src/reqwest.rs
Original file line number Diff line number Diff line change
@@ -3,29 +3,58 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.

use std::{future::Future, str::FromStr, sync::Arc, time::Duration};
use std::{
future::Future,
str::FromStr,
sync::{Arc, LazyLock},
time::Duration,
};

use futures_util::FutureExt as _;
use headers::{ContentLength, HeaderMapExt as _, Host, UserAgent};
use headers::{ContentLength, HeaderMapExt as _, UserAgent};
use hyper_util::client::legacy::connect::{
dns::{GaiResolver, Name},
HttpInfo,
};
use opentelemetry::{
metrics::{Histogram, UpDownCounter},
KeyValue,
};
use opentelemetry_http::HeaderInjector;
use opentelemetry_semantic_conventions::{
attribute::{HTTP_REQUEST_BODY_SIZE, HTTP_RESPONSE_BODY_SIZE},
metric::{HTTP_CLIENT_ACTIVE_REQUESTS, HTTP_CLIENT_REQUEST_DURATION},
trace::{
CLIENT_ADDRESS, CLIENT_PORT, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE,
NETWORK_TRANSPORT, NETWORK_TYPE, SERVER_ADDRESS, SERVER_PORT, URL_FULL,
USER_AGENT_ORIGINAL,
ERROR_TYPE, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, NETWORK_LOCAL_ADDRESS,
NETWORK_LOCAL_PORT, NETWORK_PEER_ADDRESS, NETWORK_PEER_PORT, NETWORK_TRANSPORT,
NETWORK_TYPE, SERVER_ADDRESS, SERVER_PORT, URL_FULL, URL_SCHEME, USER_AGENT_ORIGINAL,
},
};
use tokio::time::Instant;
use tower::{BoxError, Service as _};
use tracing::Instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::METER;

static USER_AGENT: &str = concat!("matrix-authentication-service/", env!("CARGO_PKG_VERSION"));

static HTTP_REQUESTS_DURATION_HISTOGRAM: LazyLock<Histogram<u64>> = LazyLock::new(|| {
METER
.u64_histogram(HTTP_CLIENT_REQUEST_DURATION)
.with_unit("ms")
.with_description("Duration of HTTP client requests")
.init()
});

static HTTP_REQUESTS_IN_FLIGHT: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter(HTTP_CLIENT_ACTIVE_REQUESTS)
.with_unit("{requests}")
.with_description("Number of HTTP client requests in flight")
.init()
});

struct TracingResolver {
inner: GaiResolver,
}
@@ -76,34 +105,38 @@ pub fn client() -> reqwest::Client {
async fn send_traced(
request: reqwest::RequestBuilder,
) -> Result<reqwest::Response, reqwest::Error> {
// TODO: have in-flight and request metrics
let start = Instant::now();
let (client, request) = request.build_split();
let mut request = request?;

let headers = request.headers();
let host = headers.typed_get::<Host>().map(tracing::field::display);
let server_address = request.url().host_str().map(ToOwned::to_owned);
let server_port = request.url().port_or_known_default();
let scheme = request.url().scheme().to_owned();
let user_agent = headers
.typed_get::<UserAgent>()
.map(tracing::field::display);
let content_length = headers.typed_get().map(|ContentLength(len)| len);
let method = request.method().to_string();

// Create a new span for the request
let span = tracing::info_span!(
"http.client.request",
"otel.kind" = "client",
"otel.status_code" = tracing::field::Empty,
{ HTTP_REQUEST_METHOD } = %request.method(),
{ HTTP_REQUEST_METHOD } = method,
{ URL_FULL } = %request.url(),
{ HTTP_RESPONSE_STATUS_CODE } = tracing::field::Empty,
{ SERVER_ADDRESS } = host,
{ SERVER_ADDRESS } = server_address,
{ SERVER_PORT } = server_port,
{ HTTP_REQUEST_BODY_SIZE } = content_length,
{ HTTP_RESPONSE_BODY_SIZE } = tracing::field::Empty,
{ NETWORK_TRANSPORT } = "tcp",
{ NETWORK_TYPE } = tracing::field::Empty,
{ SERVER_ADDRESS } = tracing::field::Empty,
{ SERVER_PORT } = tracing::field::Empty,
{ CLIENT_ADDRESS } = tracing::field::Empty,
{ CLIENT_PORT } = tracing::field::Empty,
{ NETWORK_LOCAL_ADDRESS } = tracing::field::Empty,
{ NETWORK_LOCAL_PORT } = tracing::field::Empty,
{ NETWORK_PEER_ADDRESS } = tracing::field::Empty,
{ NETWORK_PEER_PORT } = tracing::field::Empty,
{ USER_AGENT_ORIGINAL } = user_agent,
"rust.error" = tracing::field::Empty,
);
@@ -115,9 +148,31 @@ async fn send_traced(
propagator.inject_context(&context, &mut injector);
});

let mut metrics_labels = vec![
KeyValue::new(HTTP_REQUEST_METHOD, method.clone()),
KeyValue::new(URL_SCHEME, scheme),
];

if let Some(server_address) = server_address {
metrics_labels.push(KeyValue::new(SERVER_ADDRESS, server_address));
}

if let Some(server_port) = server_port {
metrics_labels.push(KeyValue::new(SERVER_PORT, i64::from(server_port)));
}

HTTP_REQUESTS_IN_FLIGHT.add(1, &metrics_labels);
async move {
let span = tracing::Span::current();
match client.execute(request).await {
let result = client.execute(request).await;

// XXX: We *could* loose this if the future is dropped before this, but let's
// not worry about it for now. Ideally we would use a `Drop` guard to decrement
// the counter
HTTP_REQUESTS_IN_FLIGHT.add(-1, &metrics_labels);

let duration = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
let result = match result {
Ok(response) => {
span.record("otel.status_code", "OK");
span.record(HTTP_RESPONSE_STATUS_CODE, response.status().as_u16());
@@ -128,26 +183,37 @@ async fn send_traced(

if let Some(http_info) = response.extensions().get::<HttpInfo>() {
let local = http_info.local_addr();
let remote = http_info.remote_addr();

let peer = http_info.remote_addr();
let family = if local.is_ipv4() { "ipv4" } else { "ipv6" };
span.record(NETWORK_TYPE, family);
span.record(CLIENT_ADDRESS, remote.ip().to_string());
span.record(CLIENT_PORT, remote.port());
span.record(SERVER_ADDRESS, local.ip().to_string());
span.record(SERVER_PORT, local.port());
span.record(NETWORK_LOCAL_ADDRESS, local.ip().to_string());
span.record(NETWORK_LOCAL_PORT, local.port());
span.record(NETWORK_PEER_ADDRESS, peer.ip().to_string());
span.record(NETWORK_PEER_PORT, peer.port());
} else {
tracing::warn!("No HttpInfo injected in response extensions");
}

metrics_labels.push(KeyValue::new(
HTTP_RESPONSE_STATUS_CODE,
i64::from(response.status().as_u16()),
));

Ok(response)
}
Err(err) => {
span.record("otel.status_code", "ERROR");
span.record("rust.error", &err as &dyn std::error::Error);

metrics_labels.push(KeyValue::new(ERROR_TYPE, "NO_RESPONSE"));

Err(err)
}
}
};

HTTP_REQUESTS_DURATION_HISTOGRAM.record(duration, &metrics_labels);

result
}
.instrument(span)
.await