From 0df629f6c4b896ee4d8218b3c4abf2722da1b4e0 Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Tue, 2 Apr 2024 14:12:39 +0100 Subject: [PATCH] Add OTLP http metrics export (#4842) Users can now export metrics via OTLP Http in addition to the existing OTLP Grpc Activate this by setting the `protocol` to `http` in your your `router.yaml`: ``` telemetry: exporters: metrics: otlp: enabled: true protocol: http ``` Fixes #4559 --- **Checklist** Complete the checklist (and note appropriate exceptions) before the PR is marked ready-for-review. - [ ] Changes are compatible[^1] - [ ] Documentation[^2] completed - [ ] Performance impact assessed and acceptable - Tests added and passing[^3] - [ ] Unit Tests - [ ] Integration Tests - [ ] Manual Tests **Exceptions** *Note any exceptions here* **Notes** [^1]: It may be appropriate to bring upcoming changes to the attention of other (impacted) groups. Please endeavour to do this before seeking PR approval. The mechanism for doing this will vary considerably, so use your judgement as to how and when to do this. [^2]: Configuration is an important part of many changes. Where applicable please try to document configuration examples. [^3]: Tick whichever testing boxes are applicable. If you are adding Manual Tests, please document the manual testing (extensively) in the Exceptions. Co-authored-by: bryn --- .../feat_bryn_enable_otelp_http_metrics.md | 16 + Cargo.lock | 112 ++++- apollo-router/Cargo.toml | 1 + .../src/plugins/telemetry/metrics/otlp.rs | 69 +--- apollo-router/tests/common.rs | 389 +++++++++++------- apollo-router/tests/integration/lifecycle.rs | 2 +- .../telemetry/fixtures/otlp.router.yaml | 18 +- .../telemetry/fixtures/zipkin.router.yaml | 2 + .../tests/integration/telemetry/jaeger.rs | 22 +- .../tests/integration/telemetry/mod.rs | 1 + .../tests/integration/telemetry/otlp.rs | 226 ++++++++++ .../tests/integration/telemetry/zipkin.rs | 32 +- 12 files changed, 664 insertions(+), 226 deletions(-) create mode 100644 .changesets/feat_bryn_enable_otelp_http_metrics.md create mode 100644 apollo-router/tests/integration/telemetry/otlp.rs diff --git a/.changesets/feat_bryn_enable_otelp_http_metrics.md b/.changesets/feat_bryn_enable_otelp_http_metrics.md new file mode 100644 index 0000000000..41561e9a23 --- /dev/null +++ b/.changesets/feat_bryn_enable_otelp_http_metrics.md @@ -0,0 +1,16 @@ +### Add OTLP http metrics export ([Issue #4559](https://github.com/apollographql/router/issues/4559)) + +Users can now export metrics via OTLP Http in addition to the existing OTLP Grpc + +Activate this by setting the `protocol` to `http` in your your `router.yaml`: + +``` +telemetry: + exporters: + metrics: + otlp: + enabled: true + protocol: http +``` + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/4842 diff --git a/Cargo.lock b/Cargo.lock index 9ac0870357..4ce3c4a0bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -325,13 +325,14 @@ dependencies = [ "nu-ansi-term 0.49.0", "num-traits", "once_cell", - "opentelemetry", + "opentelemetry 0.20.0", "opentelemetry-aws", "opentelemetry-datadog", "opentelemetry-http", "opentelemetry-jaeger", "opentelemetry-otlp", "opentelemetry-prometheus", + "opentelemetry-proto 0.5.0", "opentelemetry-semantic-conventions", "opentelemetry-stdout", "opentelemetry-zipkin", @@ -3011,6 +3012,12 @@ dependencies = [ "url", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "globset" version = "0.4.13" @@ -4427,7 +4434,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" dependencies = [ "opentelemetry_api", - "opentelemetry_sdk", + "opentelemetry_sdk 0.20.0", +] + +[[package]] +name = "opentelemetry" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", ] [[package]] @@ -4451,7 +4473,7 @@ dependencies = [ "indexmap 1.9.3", "itertools 0.10.5", "once_cell", - "opentelemetry", + "opentelemetry 0.20.0", "opentelemetry-http", "opentelemetry-semantic-conventions", "reqwest", @@ -4484,7 +4506,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.11", - "opentelemetry", + "opentelemetry 0.20.0", "opentelemetry-http", "opentelemetry-semantic-conventions", "reqwest", @@ -4502,10 +4524,10 @@ dependencies = [ "futures-core", "http 0.2.11", "opentelemetry-http", - "opentelemetry-proto", + "opentelemetry-proto 0.3.0", "opentelemetry-semantic-conventions", "opentelemetry_api", - "opentelemetry_sdk", + "opentelemetry_sdk 0.20.0", "prost 0.11.9", "reqwest", "thiserror", @@ -4521,7 +4543,7 @@ checksum = "c7d81bc254e2d572120363a2b16cdb0d715d301b5789be0cfc26ad87e4e10e53" dependencies = [ "once_cell", "opentelemetry_api", - "opentelemetry_sdk", + "opentelemetry_sdk 0.20.0", "prometheus", "protobuf", ] @@ -4533,18 +4555,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb" dependencies = [ "opentelemetry_api", - "opentelemetry_sdk", + "opentelemetry_sdk 0.20.0", "prost 0.11.9", "tonic 0.9.2", ] +[[package]] +name = "opentelemetry-proto" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" +dependencies = [ + "hex", + "opentelemetry 0.22.0", + "opentelemetry_sdk 0.22.1", + "prost 0.12.3", + "serde", + "tonic 0.11.0", +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269" dependencies = [ - "opentelemetry", + "opentelemetry 0.20.0", ] [[package]] @@ -4555,7 +4591,7 @@ checksum = "8bd550321bc0f9d3f6dcbfe5c75262789de5b3e2776da2cbcfd2392aa05db0c6" dependencies = [ "futures-util", "opentelemetry_api", - "opentelemetry_sdk", + "opentelemetry_sdk 0.20.0", "ordered-float 3.9.2", "serde", "serde_json", @@ -4571,7 +4607,7 @@ dependencies = [ "futures-core", "http 0.2.11", "once_cell", - "opentelemetry", + "opentelemetry 0.20.0", "opentelemetry-http", "opentelemetry-semantic-conventions", "reqwest", @@ -4621,6 +4657,26 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry 0.22.0", + "ordered-float 4.2.0", + "percent-encoding", + "rand 0.8.5", + "thiserror", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -4645,6 +4701,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" +dependencies = [ + "num-traits", +] + [[package]] name = "outref" version = "0.5.1" @@ -7062,6 +7127,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-trait", + "base64 0.21.7", + "bytes", + "http 0.2.11", + "http-body", + "percent-encoding", + "pin-project", + "prost 0.12.3", + "tokio", + "tokio-stream", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.9.2" @@ -7219,8 +7305,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75327c6b667828ddc28f5e3f169036cb793c3f588d83bf0f262a7f062ffed3c8" dependencies = [ "once_cell", - "opentelemetry", - "opentelemetry_sdk", + "opentelemetry 0.20.0", + "opentelemetry_sdk 0.20.0", "smallvec", "tracing", "tracing-core", diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index 97f241bdf9..e02ebba2f4 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -295,6 +295,7 @@ num-traits = "0.2.18" once_cell = "1.19.0" opentelemetry-stdout = { version = "0.1.0", features = ["trace"] } opentelemetry = { version = "0.20.0", features = ["testing"] } +opentelemetry-proto = { version="0.5.0", features = ["metrics", "trace", "gen-tonic-messages", "with-serde"] } p256 = "0.13.2" rand_core = "0.6.4" reqwest = { version = "0.11.24", default-features = false, features = [ diff --git a/apollo-router/src/plugins/telemetry/metrics/otlp.rs b/apollo-router/src/plugins/telemetry/metrics/otlp.rs index 3bd7c109f4..2ee503a53d 100644 --- a/apollo-router/src/plugins/telemetry/metrics/otlp.rs +++ b/apollo-router/src/plugins/telemetry/metrics/otlp.rs @@ -1,9 +1,7 @@ use opentelemetry::runtime; use opentelemetry::sdk::metrics::PeriodicReader; use opentelemetry::sdk::metrics::View; -use opentelemetry_otlp::HttpExporterBuilder; use opentelemetry_otlp::MetricsExporterBuilder; -use opentelemetry_otlp::TonicExporterBuilder; use tower::BoxError; use crate::plugins::telemetry::config::MetricsCommon; @@ -12,27 +10,6 @@ use crate::plugins::telemetry::metrics::MetricsBuilder; use crate::plugins::telemetry::metrics::MetricsConfigurator; use crate::plugins::telemetry::otlp::TelemetryDataKind; -// TODO Remove MetricExporterBuilder once upstream issue is fixed -// This has to exist because Http is not currently supported for metrics export -// https://github.com/open-telemetry/opentelemetry-rust/issues/772 -struct MetricExporterBuilder { - exporter: Option, -} - -impl From for MetricExporterBuilder { - fn from(exporter: TonicExporterBuilder) -> Self { - Self { - exporter: Some(exporter), - } - } -} - -impl From for MetricExporterBuilder { - fn from(_exporter: HttpExporterBuilder) -> Self { - Self { exporter: None } - } -} - impl MetricsConfigurator for super::super::otlp::Config { fn enabled(&self) -> bool { self.enabled @@ -43,36 +20,30 @@ impl MetricsConfigurator for super::super::otlp::Config { mut builder: MetricsBuilder, metrics_config: &MetricsCommon, ) -> Result { - let exporter: MetricExporterBuilder = self.exporter(TelemetryDataKind::Metrics)?; if !self.enabled { return Ok(builder); } - match exporter.exporter { - Some(exporter) => { - let exporter = MetricsExporterBuilder::Tonic(exporter).build_metrics_exporter( - (&self.temporality).into(), - Box::new( - CustomAggregationSelector::builder() - .boundaries(metrics_config.buckets.clone()) - .build(), - ), - )?; + let exporter_builder: MetricsExporterBuilder = self.exporter(TelemetryDataKind::Metrics)?; + let exporter = exporter_builder.build_metrics_exporter( + (&self.temporality).into(), + Box::new( + CustomAggregationSelector::builder() + .boundaries(metrics_config.buckets.clone()) + .build(), + ), + )?; - builder.public_meter_provider_builder = - builder.public_meter_provider_builder.with_reader( - PeriodicReader::builder(exporter, runtime::Tokio) - .with_interval(self.batch_processor.scheduled_delay) - .with_timeout(self.batch_processor.max_export_timeout) - .build(), - ); - for metric_view in metrics_config.views.clone() { - let view: Box = metric_view.try_into()?; - builder.public_meter_provider_builder = - builder.public_meter_provider_builder.with_view(view); - } - Ok(builder) - } - None => Err("otlp metric export does not support http yet".into()), + builder.public_meter_provider_builder = builder.public_meter_provider_builder.with_reader( + PeriodicReader::builder(exporter, runtime::Tokio) + .with_interval(self.batch_processor.scheduled_delay) + .with_timeout(self.batch_processor.max_export_timeout) + .build(), + ); + for metric_view in metrics_config.views.clone() { + let view: Box = metric_view.try_into()?; + builder.public_meter_provider_builder = + builder.public_meter_provider_builder.with_view(view); } + Ok(builder) } } diff --git a/apollo-router/tests/common.rs b/apollo-router/tests/common.rs index 32d1ad4a1e..526b5f8cfd 100644 --- a/apollo-router/tests/common.rs +++ b/apollo-router/tests/common.rs @@ -4,9 +4,7 @@ use std::net::SocketAddr; use std::net::TcpListener; use std::path::PathBuf; use std::process::Stdio; -use std::sync::Arc; use std::time::Duration; -use std::time::SystemTime; use buildstructor::buildstructor; use http::header::ACCEPT; @@ -19,13 +17,23 @@ use mediatype::names::MULTIPART; use mediatype::MediaType; use mediatype::WriteParams; use mime::APPLICATION_JSON; -use once_cell::sync::OnceCell; use opentelemetry::global; use opentelemetry::propagation::TextMapPropagator; -use opentelemetry::trace::Span; +use opentelemetry::sdk::trace::config; +use opentelemetry::sdk::trace::BatchSpanProcessor; +use opentelemetry::sdk::trace::TracerProvider; +use opentelemetry::sdk::Resource; +use opentelemetry::testing::trace::NoopSpanExporter; use opentelemetry::trace::TraceContextExt; -use opentelemetry::trace::Tracer; -use opentelemetry::trace::TracerProvider; +use opentelemetry_api::trace::TracerProvider as OtherTracerProvider; +use opentelemetry_api::Context; +use opentelemetry_api::KeyValue; +use opentelemetry_otlp::HttpExporterBuilder; +use opentelemetry_otlp::Protocol; +use opentelemetry_otlp::SpanExporterBuilder; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_semantic_conventions::resource::SERVICE_NAME; +use reqwest::Request; use serde_json::json; use serde_json::Value; use tokio::io::AsyncBufReadExt; @@ -33,7 +41,6 @@ use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio::process::Child; use tokio::process::Command; -use tokio::sync::Mutex; use tokio::task; use tokio::time::Instant; use tower::BoxError; @@ -53,73 +60,210 @@ use wiremock::Mock; use wiremock::Respond; use wiremock::ResponseTemplate; -static LOCK: OnceCell>> = OnceCell::new(); - pub struct IntegrationTest { router: Option, test_config_location: PathBuf, router_location: PathBuf, - _lock: tokio::sync::OwnedMutexGuard, stdio_tx: tokio::sync::mpsc::Sender, stdio_rx: tokio::sync::mpsc::Receiver, collect_stdio: Option<(tokio::sync::oneshot::Sender, regex::Regex)>, supergraph: PathBuf, _subgraphs: wiremock::MockServer, - subscriber: Option, + telemetry: Telemetry, + + // Don't remove these, there is a weak reference to the tracer provider from a tracer and if the provider is dropped then no export will happen. + pub _tracer_provider_client: TracerProvider, + pub _tracer_provider_subgraph: TracerProvider, + subscriber_client: Dispatch, _subgraph_overrides: HashMap, - pub bind_addr: SocketAddr, + pub bind_address: SocketAddr, } -struct TracedResponder(pub(crate) ResponseTemplate); +struct TracedResponder { + response_template: ResponseTemplate, + telemetry: Telemetry, + subscriber_subgraph: Dispatch, +} impl Respond for TracedResponder { fn respond(&self, request: &wiremock::Request) -> ResponseTemplate { - let tracer_provider = opentelemetry_jaeger::new_agent_pipeline() - .with_service_name("products") - .build_simple() - .unwrap(); - let tracer = tracer_provider.tracer("products"); - let headers: HashMap = request - .headers - .iter() - .map(|(name, value)| (name.as_str().to_string(), value.as_str().to_string())) - .collect(); - let context = opentelemetry_jaeger::Propagator::new().extract(&headers); - let mut span = tracer.start_with_context("HTTP POST", &context); - span.end_with_timestamp(SystemTime::now()); - tracer_provider.force_flush(); - self.0.clone() + let context = self.telemetry.extract_context(request); + tracing_core::dispatcher::with_default(&self.subscriber_subgraph, || { + let _context_guard = context.attach(); + let span = info_span!("subgraph server"); + let _span_guard = span.enter(); + self.response_template.clone() + }) } } +#[derive(Debug, Clone, Default)] #[allow(dead_code)] pub enum Telemetry { Jaeger, - Otlp, + Otlp { + endpoint: String, + }, Datadog, Zipkin, + #[default] + None, +} + +impl Telemetry { + fn tracer_provider(&self, service_name: &str) -> TracerProvider { + let config = config().with_resource(Resource::new(vec![KeyValue::new( + SERVICE_NAME, + service_name.to_string(), + )])); + + match self { + Telemetry::Jaeger => TracerProvider::builder() + .with_config(config) + .with_span_processor( + BatchSpanProcessor::builder( + opentelemetry_jaeger::new_agent_pipeline() + .with_service_name(service_name) + .build_sync_agent_exporter() + .expect("jaeger pipeline failed"), + opentelemetry::runtime::Tokio, + ) + .with_scheduled_delay(Duration::from_millis(10)) + .build(), + ) + .build(), + Telemetry::Otlp { endpoint } => TracerProvider::builder() + .with_config(config) + .with_span_processor( + BatchSpanProcessor::builder( + SpanExporterBuilder::Http( + HttpExporterBuilder::default() + .with_endpoint(endpoint) + .with_protocol(Protocol::HttpBinary), + ) + .build_span_exporter() + .expect("otlp pipeline failed"), + opentelemetry::runtime::Tokio, + ) + .with_scheduled_delay(Duration::from_millis(10)) + .build(), + ) + .build(), + Telemetry::Datadog => TracerProvider::builder() + .with_config(config) + .with_span_processor( + BatchSpanProcessor::builder( + opentelemetry_datadog::new_pipeline() + .build_exporter() + .expect("datadog pipeline failed"), + opentelemetry::runtime::Tokio, + ) + .with_scheduled_delay(Duration::from_millis(10)) + .build(), + ) + .build(), + Telemetry::Zipkin => TracerProvider::builder() + .with_config(config) + .with_span_processor( + BatchSpanProcessor::builder( + opentelemetry_zipkin::new_pipeline() + .with_service_name(service_name) + .init_exporter() + .expect("zipkin pipeline failed"), + opentelemetry::runtime::Tokio, + ) + .with_scheduled_delay(Duration::from_millis(10)) + .build(), + ) + .build(), + Telemetry::None => TracerProvider::builder() + .with_config(config) + .with_simple_exporter(NoopSpanExporter::default()) + .build(), + } + } + + fn inject_context(&self, request: &mut Request) { + let ctx = tracing::span::Span::current().context(); + + match self { + Telemetry::Jaeger => { + let propagator = opentelemetry_jaeger::Propagator::new(); + propagator.inject_context( + &ctx, + &mut opentelemetry_http::HeaderInjector(request.headers_mut()), + ) + } + Telemetry::Datadog => { + let propagator = opentelemetry_datadog::DatadogPropagator::new(); + propagator.inject_context( + &ctx, + &mut opentelemetry_http::HeaderInjector(request.headers_mut()), + ) + } + Telemetry::Otlp { .. } => { + let propagator = opentelemetry::sdk::propagation::TraceContextPropagator::default(); + propagator.inject_context( + &ctx, + &mut opentelemetry_http::HeaderInjector(request.headers_mut()), + ) + } + Telemetry::Zipkin => { + let propagator = opentelemetry_zipkin::Propagator::new(); + propagator.inject_context( + &ctx, + &mut opentelemetry_http::HeaderInjector(request.headers_mut()), + ) + } + _ => {} + } + } + + pub(crate) fn extract_context(&self, request: &wiremock::Request) -> Context { + let headers: HashMap = request + .headers + .iter() + .map(|(name, value)| (name.as_str().to_string(), value.as_str().to_string())) + .collect(); + + match self { + Telemetry::Jaeger => { + let propagator = opentelemetry_jaeger::Propagator::new(); + propagator.extract(&headers) + } + Telemetry::Datadog => { + let propagator = opentelemetry_datadog::DatadogPropagator::new(); + propagator.extract(&headers) + } + Telemetry::Otlp { .. } => { + let propagator = opentelemetry::sdk::propagation::TraceContextPropagator::default(); + propagator.extract(&headers) + } + Telemetry::Zipkin => { + let propagator = opentelemetry_zipkin::Propagator::new(); + propagator.extract(&headers) + } + _ => Context::current(), + } + } } #[buildstructor] impl IntegrationTest { #[builder] pub async fn new( - config: &'static str, + config: String, telemetry: Option, responder: Option, collect_stdio: Option>, supergraph: Option, mut subgraph_overrides: HashMap, ) -> Self { - // Prevent multiple integration tests from running at the same time - let lock = LOCK - .get_or_init(Default::default) - .clone() - .lock_owned() - .await; - - let subscriber = Self::init_telemetry(telemetry); + let telemetry = telemetry.unwrap_or_default(); + let tracer_provider_client = telemetry.tracer_provider("client"); + let subscriber_client = Self::dispatch(&tracer_provider_client); + let tracer_provider_subgraph = telemetry.tracer_provider("subgraph"); let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).unwrap(); let address = listener.local_addr().unwrap(); @@ -133,13 +277,13 @@ impl IntegrationTest { // before the router is started. // Note: We need the nested scope so that the listener gets dropped once its address // is resolved. - let addr = { + let bind_address = { let bound = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).unwrap(); bound.local_addr().unwrap() }; // Insert the overrides into the config - let config_str = merge_overrides(config, &subgraph_overrides, &addr); + let config_str = merge_overrides(&config, &subgraph_overrides, &bind_address); let supergraph = supergraph.unwrap_or(PathBuf::from_iter([ "..", @@ -153,13 +297,17 @@ impl IntegrationTest { .await; Mock::given(method("POST")) - .respond_with(TracedResponder(responder.unwrap_or_else(|| - ResponseTemplate::new(200).set_body_json(json!({"data":{"topProducts":[{"name":"Table"},{"name":"Couch"},{"name":"Chair"}]}}))))) + .respond_with(TracedResponder{response_template:responder.unwrap_or_else(|| + ResponseTemplate::new(200).set_body_json(json!({"data":{"topProducts":[{"name":"Table"},{"name":"Couch"},{"name":"Chair"}]}}))), + telemetry: telemetry.clone(), + subscriber_subgraph: Self::dispatch(&tracer_provider_subgraph), + }) .mount(&subgraphs) .await; let mut test_config_location = std::env::temp_dir(); - test_config_location.push("test_config.yaml"); + let location = format!("apollo-router-test-{}.yaml", Uuid::new_v4()); + test_config_location.push(location); fs::write(&test_config_location, &config_str).expect("could not write config"); @@ -168,22 +316,39 @@ impl IntegrationTest { let version_line_re = regex::Regex::new("Apollo Router v[^ ]+ ").unwrap(); (sender, version_line_re) }); + Self { router: None, router_location: Self::router_location(), test_config_location, - _lock: lock, stdio_tx, stdio_rx, collect_stdio, supergraph, _subgraphs: subgraphs, - subscriber, _subgraph_overrides: subgraph_overrides, - bind_addr: addr, + bind_address, + _tracer_provider_client: tracer_provider_client, + subscriber_client, + _tracer_provider_subgraph: tracer_provider_subgraph, + telemetry, } } + fn dispatch(tracer_provider: &TracerProvider) -> Dispatch { + let tracer = tracer_provider.tracer("tracer"); + let tracing_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(LevelFilter::INFO); + + let subscriber = Registry::default().with(tracing_layer).with( + tracing_subscriber::fmt::Layer::default() + .compact() + .with_filter(EnvFilter::from_default_env()), + ); + Dispatch::new(subscriber) + } + pub fn router_location() -> PathBuf { PathBuf::from(env!("CARGO_BIN_EXE_router")) } @@ -252,82 +417,6 @@ impl IntegrationTest { self.router = Some(router); } - fn init_telemetry(telemetry: Option) -> Option { - match telemetry { - Some(Telemetry::Jaeger) => { - let tracer = opentelemetry_jaeger::new_agent_pipeline() - .with_service_name("my_app") - .install_simple() - .expect("jaeger pipeline failed"); - let telemetry = tracing_opentelemetry::layer() - .with_tracer(tracer) - .with_filter(LevelFilter::INFO); - let subscriber = Registry::default().with(telemetry).with( - tracing_subscriber::fmt::Layer::default() - .compact() - .with_filter(EnvFilter::from_default_env()), - ); - - global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - Some(Dispatch::new(subscriber)) - } - Some(Telemetry::Datadog) => { - let tracer = opentelemetry_datadog::new_pipeline() - .with_service_name("my_app") - .install_simple() - .expect("datadog pipeline failed"); - let telemetry = tracing_opentelemetry::layer() - .with_tracer(tracer) - .with_filter(LevelFilter::INFO); - let subscriber = Registry::default().with(telemetry).with( - tracing_subscriber::fmt::Layer::default() - .compact() - .with_filter(EnvFilter::from_default_env()), - ); - - global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::new()); - Some(Dispatch::new(subscriber)) - } - Some(Telemetry::Otlp) => { - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .install_simple() - .expect("otlp pipeline failed"); - let telemetry = tracing_opentelemetry::layer() - .with_tracer(tracer) - .with_filter(LevelFilter::INFO); - let subscriber = Registry::default().with(telemetry).with( - tracing_subscriber::fmt::Layer::default() - .compact() - .with_filter(EnvFilter::from_default_env()), - ); - - global::set_text_map_propagator( - opentelemetry::sdk::propagation::TraceContextPropagator::new(), - ); - Some(Dispatch::new(subscriber)) - } - Some(Telemetry::Zipkin) => { - let tracer = opentelemetry_zipkin::new_pipeline() - .with_service_name("my_app") - .install_simple() - .expect("zipkin pipeline failed"); - let telemetry = tracing_opentelemetry::layer() - .with_tracer(tracer) - .with_filter(LevelFilter::INFO); - let subscriber = Registry::default().with(telemetry).with( - tracing_subscriber::fmt::Layer::default() - .compact() - .with_filter(EnvFilter::from_default_env()), - ); - - global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); - Some(Dispatch::new(subscriber)) - } - _ => None, - } - } - #[allow(dead_code)] pub async fn assert_started(&mut self) { self.assert_log_contains("GraphQL endpoint exposed").await; @@ -354,7 +443,7 @@ impl IntegrationTest { pub async fn update_config(&self, yaml: &str) { tokio::fs::write( &self.test_config_location, - &merge_overrides(yaml, &self._subgraph_overrides, &self.bind_addr), + &merge_overrides(yaml, &self._subgraph_overrides, &self.bind_address), ) .await .expect("must be able to write config"); @@ -408,9 +497,10 @@ impl IntegrationTest { self.router.is_some(), "router was not started, call `router.start().await; router.assert_started().await`" ); - let dispatch = self.subscriber.clone(); + let telemetry = self.telemetry.clone(); + let query = query.clone(); - let url = format!("http://{}", self.bind_addr); + let url = format!("http://{}", self.bind_address); async move { let span = info_span!("client_request"); @@ -431,12 +521,7 @@ impl IntegrationTest { .json(&query) .build() .unwrap(); - global::get_text_map_propagator(|propagator| { - propagator.inject_context( - &tracing::span::Span::current().context(), - &mut opentelemetry_http::HeaderInjector(request.headers_mut()), - ); - }); + telemetry.inject_context(&mut request); request.headers_mut().remove(ACCEPT); match client.execute(request).await { Ok(response) => (span_id, response), @@ -448,7 +533,7 @@ impl IntegrationTest { .instrument(span) .await } - .with_subscriber(dispatch.unwrap_or_default()) + .with_subscriber(self.subscriber_client.clone()) } #[allow(dead_code)] @@ -461,8 +546,7 @@ impl IntegrationTest { "router was not started, call `router.start().await; router.assert_started().await`" ); let query = query.clone(); - let dispatch = self.subscriber.clone(); - let url = format!("http://{}", self.bind_addr); + let url = format!("http://{}", self.bind_address); async move { let client = reqwest::Client::new(); @@ -494,7 +578,7 @@ impl IntegrationTest { } } } - .with_subscriber(dispatch.unwrap_or_default()) + .with_subscriber(self.subscriber_client.clone()) } /// Make a raw multipart request to the router. @@ -509,8 +593,7 @@ impl IntegrationTest { "router was not started, call `router.start().await; router.assert_started().await`" ); - let dispatch = self.subscriber.clone(); - let url = format!("http://{}", self.bind_addr); + let url = format!("http://{}", self.bind_address); async move { let span = info_span!("client_raw_request"); let span_id = span.context().span().span_context().trace_id().to_string(); @@ -554,7 +637,7 @@ impl IntegrationTest { .instrument(span) .await } - .with_subscriber(dispatch.unwrap_or_default()) + .with_subscriber(self.subscriber_client.clone()) } #[allow(dead_code)] @@ -569,7 +652,7 @@ impl IntegrationTest { let _span_guard = span.enter(); let mut request = client - .post(format!("http://{}", self.bind_addr)) + .post(format!("http://{}", self.bind_address)) .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) .header(ACCEPT, "multipart/mixed;subscriptionSpec=1.0") .header("apollographql-client-name", "custom_name") @@ -598,7 +681,7 @@ impl IntegrationTest { let client = reqwest::Client::new(); let request = client - .get(format!("http://{}/metrics", self.bind_addr)) + .get(format!("http://{}/metrics", self.bind_address)) .header("apollographql-client-name", "custom_name") .header("apollographql-client-version", "1.0") .build() @@ -780,6 +863,23 @@ impl IntegrationTest { } #[cfg(not(target_os = "linux"))] pub fn dump_stack_traces(&mut self) {} + + #[allow(dead_code)] + pub(crate) fn force_flush(&self) { + let tracer_provider_client = self._tracer_provider_client.clone(); + let tracer_provider_subgraph = self._tracer_provider_subgraph.clone(); + for r in tracer_provider_subgraph.force_flush() { + if let Err(e) = r { + eprintln!("failed to flush subgraph tracer: {e}"); + } + } + + for r in tracer_provider_client.force_flush() { + if let Err(e) = r { + eprintln!("failed to flush client tracer: {e}"); + } + } + } } impl Drop for IntegrationTest { @@ -879,5 +979,14 @@ fn merge_overrides( ); } + // Set health check listen address to avoid port conflicts + config + .as_object_mut() + .expect("config should be an object") + .insert( + "health_check".to_string(), + json!({"listen": bind_addr.to_string()}), + ); + serde_yaml::to_string(&config).unwrap() } diff --git a/apollo-router/tests/integration/lifecycle.rs b/apollo-router/tests/integration/lifecycle.rs index c5805ca642..f2dc5689e2 100644 --- a/apollo-router/tests/integration/lifecycle.rs +++ b/apollo-router/tests/integration/lifecycle.rs @@ -177,7 +177,7 @@ async fn test_shutdown_with_idle_connection() -> Result<(), BoxError> { .await; router.start().await; router.assert_started().await; - let _conn = std::net::TcpStream::connect(router.bind_addr).unwrap(); + let _conn = std::net::TcpStream::connect(router.bind_address).unwrap(); router.execute_default_query().await; tokio::time::timeout(Duration::from_secs(1), router.graceful_shutdown()) .await diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp.router.yaml index d30bb5e44d..f4484786f4 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/otlp.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp.router.yaml @@ -1,8 +1,24 @@ telemetry: exporters: tracing: + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + otlp: + enabled: true + protocol: http + endpoint: /traces + batch_processor: + scheduled_delay: 10ms + metrics: common: service_name: router otlp: enabled: true - endpoint: default + endpoint: /metrics + protocol: http + batch_processor: + scheduled_delay: 10ms + diff --git a/apollo-router/tests/integration/telemetry/fixtures/zipkin.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/zipkin.router.yaml index 97ca9035a8..1705113f18 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/zipkin.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/zipkin.router.yaml @@ -9,3 +9,5 @@ telemetry: zipkin: enabled: true endpoint: default + batch_processor: + scheduled_delay: 10ms diff --git a/apollo-router/tests/integration/telemetry/jaeger.rs b/apollo-router/tests/integration/telemetry/jaeger.rs index 5404ff0461..86b1fc71e9 100644 --- a/apollo-router/tests/integration/telemetry/jaeger.rs +++ b/apollo-router/tests/integration/telemetry/jaeger.rs @@ -36,7 +36,7 @@ async fn test_reload() -> Result<(), BoxError> { id, &query, Some("ExampleQuery"), - &["my_app", "router", "products"], + &["client", "router", "subgraph"], false, ) .await?; @@ -69,7 +69,7 @@ async fn test_remote_root() -> Result<(), BoxError> { id, &query, Some("ExampleQuery"), - &["my_app", "router", "products"], + &["client", "router", "subgraph"], false, ) .await?; @@ -100,7 +100,7 @@ async fn test_local_root() -> Result<(), BoxError> { id, &query, Some("ExampleQuery"), - &["router", "products"], + &["router", "subgraph"], false, ) .await?; @@ -148,7 +148,7 @@ async fn test_local_root_50_percent_sample() -> Result<(), BoxError> { id, &query, Some("ExampleQuery"), - &["router", "products"], + &["router", "subgraph"], false, ) .await @@ -206,7 +206,7 @@ async fn test_default_operation() -> Result<(), BoxError> { id, &query, Some("ExampleQuery1"), - &["my_app", "router", "products"], + &["client", "router", "subgraph"], false, ) .await?; @@ -233,7 +233,7 @@ async fn test_anonymous_operation() -> Result<(), BoxError> { .get("apollo-custom-trace-id") .unwrap() .is_empty()); - validate_trace(id, &query, None, &["my_app", "router", "products"], false).await?; + validate_trace(id, &query, None, &["client", "router", "subgraph"], false).await?; router.graceful_shutdown().await; Ok(()) } @@ -260,7 +260,7 @@ async fn test_selected_operation() -> Result<(), BoxError> { id, &query, Some("ExampleQuery2"), - &["my_app", "router", "products"], + &["client", "router", "subgraph"], false, ) .await?; @@ -286,7 +286,7 @@ async fn test_span_customization() -> Result<(), BoxError> { id, &query, Some("ExampleQuery"), - &["my_app", "router", "products"], + &["client", "router", "subgraph"], true, ) .await?; @@ -527,7 +527,7 @@ fn verify_spans_present( let mut expected_operation_names: HashSet = HashSet::from( [ "execution", - "HTTP POST", + "subgraph server", operation_name .map(|name| format!("query {name}")) .unwrap_or("query".to_string()) @@ -540,7 +540,7 @@ fn verify_spans_present( ] .map(|s| s.into()), ); - if services.contains(&"my_app") { + if services.contains(&"client") { expected_operation_names.insert("client_request".into()); } tracing::debug!("found spans {:?}", operation_names); @@ -557,7 +557,7 @@ fn verify_spans_present( } fn verify_span_parenting(trace: &Value, services: &[&'static str]) -> Result<(), BoxError> { - let root_span = if services.contains(&"my_app") { + let root_span = if services.contains(&"client") { trace.select_path("$..spans[?(@.operationName == 'client_request')]")?[0] } else { trace.select_path("$..spans[?(@.operationName == 'query ExampleQuery')]")?[0] diff --git a/apollo-router/tests/integration/telemetry/mod.rs b/apollo-router/tests/integration/telemetry/mod.rs index 292f2d4038..8ef8d071f2 100644 --- a/apollo-router/tests/integration/telemetry/mod.rs +++ b/apollo-router/tests/integration/telemetry/mod.rs @@ -1,4 +1,5 @@ mod jaeger; mod logging; mod metrics; +mod otlp; mod zipkin; diff --git a/apollo-router/tests/integration/telemetry/otlp.rs b/apollo-router/tests/integration/telemetry/otlp.rs new file mode 100644 index 0000000000..698ef4a798 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/otlp.rs @@ -0,0 +1,226 @@ +#![cfg(all(target_os = "linux", target_arch = "x86_64"))] +extern crate core; + +use std::collections::HashSet; +use std::time::Duration; + +use anyhow::anyhow; +use itertools::Itertools; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceResponse; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceResponse; +use prost::Message; +use serde_json::json; +use serde_json::Value; +use tower::BoxError; +use wiremock::matchers::method; +use wiremock::matchers::path; +use wiremock::Mock; +use wiremock::MockServer; +use wiremock::ResponseTemplate; + +use crate::integration::IntegrationTest; +use crate::integration::Telemetry; +use crate::integration::ValueExt; + +#[tokio::test(flavor = "multi_thread")] +async fn test_basic() -> Result<(), BoxError> { + let mock_server = wiremock::MockServer::start().await; + Mock::given(method("POST")) + .and(path("/traces")) + .respond_with(ResponseTemplate::new(200).set_body_raw( + ExportTraceServiceResponse::default().encode_to_vec(), + "application/x-protobuf", + )) + .expect(1..) + .mount(&mock_server) + .await; + Mock::given(method("POST")) + .and(path("/metrics")) + .respond_with(ResponseTemplate::new(200).set_body_raw( + ExportMetricsServiceResponse::default().encode_to_vec(), + "application/x-protobuf", + )) + .expect(1..) + .mount(&mock_server) + .await; + + let config = include_str!("fixtures/otlp.router.yaml") + .replace("", &mock_server.uri()); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Otlp { + endpoint: format!("{}/traces", mock_server.uri()), + }) + .config(&config) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + for _ in 0..2 { + let (id, result) = router.execute_query(&query).await; + assert!(!result + .headers() + .get("apollo-custom-trace-id") + .unwrap() + .is_empty()); + validate_telemetry( + &mock_server, + id, + &query, + Some("ExampleQuery"), + &["client", "router", "subgraph"], + false, + ) + .await?; + router.touch_config().await; + router.assert_reloaded().await; + } + router.graceful_shutdown().await; + Ok(()) +} + +async fn validate_telemetry( + mock_server: &MockServer, + _id: String, + query: &Value, + operation_name: Option<&str>, + services: &[&'static str], + custom_span_instrumentation: bool, +) -> Result<(), BoxError> { + for _ in 0..10 { + let trace_valid = find_valid_trace( + mock_server, + query, + operation_name, + services, + custom_span_instrumentation, + ) + .await; + + let metrics_valid = find_valid_metrics(mock_server, query, operation_name, services).await; + + if metrics_valid.is_ok() && trace_valid.is_ok() { + return Ok(()); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } + find_valid_trace( + mock_server, + query, + operation_name, + services, + custom_span_instrumentation, + ) + .await?; + find_valid_metrics(mock_server, query, operation_name, services).await?; + + Ok(()) +} + +async fn find_valid_trace( + mock_server: &MockServer, + _query: &Value, + _operation_name: Option<&str>, + services: &[&'static str], + _custom_span_instrumentation: bool, +) -> Result<(), BoxError> { + let requests = mock_server + .received_requests() + .await + .expect("Could not get otlp requests"); + + // A valid trace has: + // * A valid service name + // * All three services + // * The correct spans + // * All spans are parented + // * Required attributes of 'router' span has been set + let traces: Vec<_>= requests + .iter() + .filter_map(|r| { + if r.url.path().ends_with("/traces") { + match opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest::decode( + bytes::Bytes::copy_from_slice(&r.body), + ) { + Ok(trace) => { + match serde_json::to_value(trace) { + Ok(trace) => { Some(Ok(trace)) } + Err(e) => { + Some(Err(BoxError::from(format!("failed to decode trace: {}", e)))) + } + } + } + Err(e) => { + Some(Err(BoxError::from(format!("failed to decode trace: {}", e)))) + } + } + } + else { + None + } + }) + .try_collect()?; + if !traces.is_empty() { + let json_trace = serde_json::Value::Array(traces); + verify_trace_participants(&json_trace, services)?; + + Ok(()) + } else { + Err(anyhow!("No traces received").into()) + } +} + +fn verify_trace_participants(trace: &Value, services: &[&'static str]) -> Result<(), BoxError> { + let actual_services: HashSet = trace + .select_path("$..resource.attributes[?(@.key=='service.name')].value.stringValue")? + .into_iter() + .filter_map(|service| service.as_string()) + .collect(); + tracing::debug!("found services {:?}", actual_services); + + let expected_services = services + .iter() + .map(|s| s.to_string()) + .collect::>(); + if actual_services != expected_services { + return Err(BoxError::from(format!( + "incomplete traces, got {actual_services:?} expected {expected_services:?}" + ))); + } + Ok(()) +} + +fn validate_service_name(trace: Value) -> Result<(), BoxError> { + let service_name = + trace.select_path("$..resource.attributes[?(@.key=='service.name')].value.stringValue")?; + assert_eq!( + service_name.first(), + Some(&&Value::String("router".to_string())) + ); + Ok(()) +} + +async fn find_valid_metrics( + mock_server: &MockServer, + _query: &Value, + _operation_name: Option<&str>, + _services: &[&'static str], +) -> Result<(), BoxError> { + let requests = mock_server + .received_requests() + .await + .expect("Could not get otlp requests"); + if let Some(metrics) = requests.iter().find(|r| r.url.path().ends_with("/metrics")) { + let metrics = opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest::decode(bytes::Bytes::copy_from_slice(&metrics.body))?; + let json_trace = serde_json::to_value(metrics)?; + // For now just validate service name. + validate_service_name(json_trace)?; + + Ok(()) + } else { + Err(anyhow!("No metrics received").into()) + } +} diff --git a/apollo-router/tests/integration/telemetry/zipkin.rs b/apollo-router/tests/integration/telemetry/zipkin.rs index b5a79d0211..f9995ef22e 100644 --- a/apollo-router/tests/integration/telemetry/zipkin.rs +++ b/apollo-router/tests/integration/telemetry/zipkin.rs @@ -1,6 +1,7 @@ #![cfg(all(target_os = "linux", target_arch = "x86_64"))] extern crate core; +use std::collections::HashSet; use std::time::Duration; use anyhow::anyhow; @@ -35,7 +36,7 @@ async fn test_basic() -> Result<(), BoxError> { id, &query, Some("ExampleQuery"), - &["my_app", "router", "products"], + &["client", "router", "subgraph"], false, ) .await?; @@ -71,7 +72,7 @@ async fn validate_trace( { return Ok(()); } - tokio::time::sleep(Duration::from_millis(1000)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } find_valid_trace( &url, @@ -88,11 +89,10 @@ async fn find_valid_trace( url: &str, _query: &Value, _operation_name: Option<&str>, - _services: &[&'static str], + services: &[&'static str], _custom_span_instrumentation: bool, ) -> Result<(), BoxError> { // A valid trace has: - // * A valid service name // * All three services // * The correct spans // * All spans are parented @@ -105,17 +105,27 @@ async fn find_valid_trace( .json() .await?; tracing::debug!("{}", serde_json::to_string_pretty(&trace)?); - validate_service_name(trace)?; + verify_trace_participants(&trace, services)?; Ok(()) } -fn validate_service_name(trace: Value) -> Result<(), BoxError> { - let service_name = trace.select_path("$..localEndpoint.serviceName")?; +fn verify_trace_participants(trace: &Value, services: &[&'static str]) -> Result<(), BoxError> { + let actual_services: HashSet = trace + .select_path("$..serviceName")? + .into_iter() + .filter_map(|service| service.as_string()) + .collect(); + tracing::debug!("found services {:?}", actual_services); - assert_eq!( - service_name.first(), - Some(&&Value::String("router".to_string())) - ); + let expected_services = services + .iter() + .map(|s| s.to_string()) + .collect::>(); + if actual_services != expected_services { + return Err(BoxError::from(format!( + "incomplete traces, got {actual_services:?} expected {expected_services:?}" + ))); + } Ok(()) }