From 81cf6a7afa8455de878fb6ba6bcc1e547549c9e7 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Wed, 2 Aug 2023 14:14:16 +0200 Subject: [PATCH 1/3] chore(appsignal sink): Refactor to use StreamSink Previously, the AppSignal sink was written in what was already a bit of an older style in PR https://github.com/vectordotdev/vector/pull/16650. We want to change some functionality in the future for how metrics are sent. To do this, it looks like we'll need to use the newer sink style, or at least it will be easier. With this change, the AppSignal sink's functionality has remained the same. We have updated the sink to the new StreamSink style, using a HttpBatchService wrapper to send the requests to the AppSignal public endpoint API. We followed the [sink guides][2] initially and looked at other sinks already rewritten linked in [issue #9261][1] to see how to implement it further. Updated the integration_tests to test if the sink is a HTTP sink with the `HTTP_SINK_TAGS`. Previously, it didn't test yet if the `EndpointBytesSent` event was sent. We're unsure if `AppsignalResponse`'s `bytes_sent` needs to be implemented or not. If it returns `None` the tests also pass, but we thought we might as well implement it properly. Part of [tracking issue #9261][1] [1]: https://github.com/vectordotdev/vector/issues/9261 [2]: https://github.com/vectordotdev/vector/blob/600f8191a8fe169eb38c429958dd59714349acb4/docs/tutorials/sinks/1_basic_sink.md Co-authored-by: Jeff Kreeftmeijer --- src/sinks/appsignal/integration_tests.rs | 197 +++++++- src/sinks/appsignal/mod.rs | 430 ++++++++++++++---- .../components/sinks/base/appsignal.cue | 12 +- 3 files changed, 519 insertions(+), 120 deletions(-) diff --git a/src/sinks/appsignal/integration_tests.rs b/src/sinks/appsignal/integration_tests.rs index bcbee6a8052fc..eff40c7e7923d 100644 --- a/src/sinks/appsignal/integration_tests.rs +++ b/src/sinks/appsignal/integration_tests.rs @@ -1,52 +1,78 @@ -use futures::stream; +use bytes::Bytes; +use futures::{channel::mpsc::Receiver, stream, StreamExt}; +use http::header::AUTHORIZATION; +use hyper::StatusCode; use indoc::indoc; -use vector_core::event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue}; +use vector_core::event::{ + BatchNotifier, BatchStatus, Event, LogEvent, Metric, MetricKind, MetricValue, +}; use crate::{ config::SinkConfig, - sinks::appsignal::AppsignalSinkConfig, - sinks::util::test::load_sink, + sinks::appsignal::AppsignalConfig, + sinks::util::test::{build_test_server_status, load_sink}, test_util::{ components::{ assert_sink_compliance, assert_sink_error, run_and_assert_sink_compliance, - COMPONENT_ERROR_TAGS, SINK_TAGS, + COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS, }, - generate_lines_with_stream, map_event_batch_stream, + generate_lines_with_stream, map_event_batch_stream, next_addr, }, }; +async fn start_test(events: Vec) -> (Vec, Receiver<(http::request::Parts, Bytes)>) { + let config = indoc! {r#" + push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}" + compression = "none" + "#}; + let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &push_api_key()); + let (mut config, cx) = load_sink::(config.as_str()).unwrap(); + let addr = next_addr(); + // Set the endpoint to a local server so we can fetch the sent events later + config.endpoint = format!("http://{}", addr); + + let (sink, _) = config.build(cx).await.unwrap(); + + // Always return OK from server. We're not testing responses. + let (rx, _trigger, server) = build_test_server_status(addr, StatusCode::OK); + tokio::spawn(server); + + let (batch, receiver) = BatchNotifier::new_with_receiver(); + + let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); + + sink.run(stream).await.unwrap(); + assert_eq!(receiver.await, BatchStatus::Delivered); + + (events, rx) +} + #[tokio::test] async fn logs_real_endpoint() { let config = indoc! {r#" push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}" "#}; - let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY") - .expect("couldn't find the AppSignal push API key in environment variables"); - assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required"); - let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &api_key); - let (config, cx) = load_sink::(config.as_str()).unwrap(); + let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &push_api_key()); + let (config, cx) = load_sink::(config.as_str()).unwrap(); let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); let generator = |index| format!("this is a log with index {}", index); let (_, events) = generate_lines_with_stream(generator, 10, Some(batch)); - run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; assert_eq!(receiver.await, BatchStatus::Delivered); } #[tokio::test] async fn metrics_real_endpoint() { - assert_sink_compliance(&SINK_TAGS, async { + assert_sink_compliance(&HTTP_SINK_TAGS, async { let config = indoc! {r#" push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}" "#}; - let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY") - .expect("couldn't find the AppSignal push API key in environment variables"); - assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required"); - let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &api_key); - let (config, cx) = load_sink::(config.as_str()).unwrap(); + let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &push_api_key()); + let (config, cx) = load_sink::(config.as_str()).unwrap(); let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); @@ -69,13 +95,139 @@ async fn metrics_real_endpoint() { .await; } +#[tokio::test] +async fn metrics_shape() { + let events: Vec<_> = (0..5) + .map(|index| { + Event::Metric(Metric::new( + format!("counter_{}", index), + MetricKind::Absolute, + MetricValue::Counter { + value: index as f64, + }, + )) + }) + .collect(); + let api_key = push_api_key(); + let (expected, rx) = start_test(events).await; + let output = rx.take(expected.len()).collect::>().await; + + for val in output.iter() { + assert_eq!( + val.0.headers.get("Content-Type").unwrap(), + "application/json" + ); + assert_eq!( + val.0.headers.get(AUTHORIZATION).unwrap(), + &format!("Bearer {api_key}") + ); + + let payload = std::str::from_utf8(&val.1).unwrap(); + let payload: serde_json::Value = serde_json::from_str(payload).unwrap(); + let events = payload.as_array().unwrap(); + assert_eq!(events.len(), 5); + + let metrics: Vec<(&str, &str, f64)> = events + .iter() + .map(|json_value| { + let metric = json_value + .as_object() + .unwrap() + .get("metric") + .unwrap() + .as_object() + .unwrap(); + let name = metric.get("name").unwrap().as_str().unwrap(); + let kind = metric.get("kind").unwrap().as_str().unwrap(); + let counter = metric.get("counter").unwrap().as_object().unwrap(); + let value = counter.get("value").unwrap().as_f64().unwrap(); + (name, kind, value) + }) + .collect(); + assert_eq!( + vec![ + ("counter_0", "absolute", 0.0), + ("counter_1", "absolute", 1.0), + ("counter_2", "absolute", 2.0), + ("counter_3", "absolute", 3.0), + ("counter_4", "absolute", 4.0), + ], + metrics + ); + } +} + +#[tokio::test] +async fn logs_shape() { + let events: Vec<_> = (0..5) + .map(|index| Event::Log(LogEvent::from(format!("Log message {index}")))) + .collect(); + let api_key = push_api_key(); + let (expected, rx) = start_test(events).await; + let output = rx.take(expected.len()).collect::>().await; + + for val in output.iter() { + assert_eq!( + val.0.headers.get("Content-Type").unwrap(), + "application/json" + ); + assert_eq!( + val.0.headers.get(AUTHORIZATION).unwrap(), + &format!("Bearer {api_key}") + ); + + let payload = std::str::from_utf8(&val.1).unwrap(); + let payload: serde_json::Value = serde_json::from_str(payload).unwrap(); + let events = payload.as_array().unwrap(); + assert_eq!(events.len(), 5); + + let log_messages: Vec<&str> = events + .iter() + .map(|value| { + value + .as_object() + .unwrap() + .get("log") + .unwrap() + .as_object() + .unwrap() + .get("message") + .unwrap() + .as_str() + .unwrap() + }) + .collect(); + assert_eq!( + vec![ + "Log message 0", + "Log message 1", + "Log message 2", + "Log message 3", + "Log message 4", + ], + log_messages + ); + + let event = events + .last() + .unwrap() + .as_object() + .unwrap() + .get("log") + .unwrap() + .as_object() + .unwrap(); + assert!(!event.get("timestamp").unwrap().as_str().unwrap().is_empty()); + } +} + #[tokio::test] async fn error_scenario_real_endpoint() { assert_sink_error(&COMPONENT_ERROR_TAGS, async { let config = indoc! {r#" push_api_key = "invalid key" "#}; - let (config, cx) = load_sink::(config).unwrap(); + let (config, cx) = load_sink::(config).unwrap(); let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); @@ -91,3 +243,10 @@ async fn error_scenario_real_endpoint() { }) .await; } + +fn push_api_key() -> String { + let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY") + .expect("couldn't find the AppSignal push API key in environment variables"); + assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required"); + api_key +} diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 31cb7a8e2ecc2..5b8b01edbff25 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -9,45 +9,43 @@ #[cfg(all(test, feature = "appsignal-integration-tests"))] mod integration_tests; -use bytes::Bytes; -use futures::{FutureExt, SinkExt}; -use http::{header::AUTHORIZATION, Request, Uri}; +use std::task::Poll; + +use futures::{ + future, + future::{BoxFuture, Ready}, +}; +use http::{header::AUTHORIZATION, Request, StatusCode, Uri}; use hyper::Body; -use serde_json::json; -use snafu::{ResultExt, Snafu}; -use vector_common::sensitive_string::SensitiveString; -use vector_config::configurable_component; +use serde_json::{json, Value}; +use tower::{Service, ServiceBuilder, ServiceExt}; use crate::{ - codecs::Transformer, - config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext}, - event::Event, http::HttpClient, + internal_events::SinkRequestBuildError, sinks::{ + prelude::*, util::{ - encoding::write_all, - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, BoxedRawValue, Compression, Compressor, JsonArrayBuffer, - SinkBatchSettings, TowerRequestConfig, + encoding::{as_tracked_write, Encoder}, + http::HttpBatchService, + http::HttpStatusRetryLogic, + sink::Response, + Compression, }, BuildError, }, - tls::{TlsConfig, TlsSettings}, }; - -#[derive(Debug, Snafu)] -enum FinishError { - #[snafu(display( - "Failure occurred during writing to or finalizing the compressor: {}", - source - ))] - CompressionFailed { source: std::io::Error }, -} +use bytes::Bytes; +use vector_common::sensitive_string::SensitiveString; +use vector_core::{ + config::{proxy::ProxyConfig, telemetry}, + tls::{MaybeTlsSettings, TlsEnableableConfig}, +}; /// Configuration for the `appsignal` sink. -#[configurable_component(sink("appsignal", "Send events to AppSignal."))] +#[configurable_component(sink("appsignal", "AppSignal sink."))] #[derive(Clone, Debug, Default)] -pub struct AppsignalSinkConfig { +pub struct AppsignalConfig { /// The URI for the AppSignal API to send data to. #[configurable(validation(format = "uri"))] #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))] @@ -72,7 +70,7 @@ pub struct AppsignalSinkConfig { request: TowerRequestConfig, #[configurable(derived)] - tls: Option, + tls: Option, #[configurable(derived)] #[serde( @@ -103,42 +101,57 @@ impl SinkBatchSettings for AppsignalDefaultBatchSettings { const TIMEOUT_SECS: f64 = 1.0; } -impl_generate_config_from_default!(AppsignalSinkConfig); +impl AppsignalConfig { + fn build_client(&self, proxy: &ProxyConfig) -> crate::Result { + let tls = MaybeTlsSettings::from_config(&self.tls, false)?; + let client = HttpClient::new(tls, proxy)?; + Ok(client) + } + + fn build_sink(&self, http_client: HttpClient) -> crate::Result { + let batch_settings = self.batch.into_batcher_settings()?; + + let endpoint = endpoint_uri(&self.endpoint, "vector/events")?; + let push_api_key = self.push_api_key.clone(); + let compression = self.compression; + let service = AppsignalService::new(http_client, endpoint, push_api_key, compression); + + let request_opts = self.request; + let request_settings = request_opts.unwrap_with(&TowerRequestConfig::default()); + let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status); + + let service = ServiceBuilder::new() + .settings(request_settings, retry_logic) + .service(service); + + let transformer = self.encoding.clone(); + let sink = AppsignalSink { + service, + compression, + transformer, + batch_settings, + }; + + Ok(VectorSink::from_event_streamsink(sink)) + } +} + +impl_generate_config_from_default!(AppsignalConfig); #[async_trait::async_trait] #[typetag::serde(name = "appsignal")] -impl SinkConfig for AppsignalSinkConfig { - async fn build( - &self, - cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let push_api_key = self.push_api_key.inner().to_string(); - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); - let batch_settings = self.batch.into_batch_settings()?; - - let buffer = JsonArrayBuffer::new(batch_settings.size); - - let tls_settings = TlsSettings::from_options(&self.tls)?; - let client = HttpClient::new(tls_settings, cx.proxy())?; - - let sink = BatchedHttpSink::new( - self.clone(), - buffer, - request_settings, - batch_settings.timeout, - client.clone(), - ) - .sink_map_err(|error| error!(message = "Fatal AppSignal sink error.", %error)); - +impl SinkConfig for AppsignalConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let client = self.build_client(cx.proxy())?; let healthcheck = healthcheck( endpoint_uri(&self.endpoint, "vector/healthcheck")?, - push_api_key, - client, + self.push_api_key.inner().to_string(), + client.clone(), ) .boxed(); + let sink = self.build_sink(client)?; - #[allow(deprecated)] - Ok((super::VectorSink::from_event_sink(sink), healthcheck)) + Ok((sink, healthcheck)) } fn input(&self) -> Input { @@ -150,61 +163,279 @@ impl SinkConfig for AppsignalSinkConfig { } } -/// Encode logs and metrics for requests to the AppSignal API. -/// It will use a JSON format wrapping events in either "log" or "metric", based on the type of event. -pub struct AppsignalEventEncoder { - transformer: Transformer, +async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> { + let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key)); + let response = client.send(request.body(Body::empty()).unwrap()).await?; + + match response.status() { + status if status.is_success() => Ok(()), + other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()), + } } -impl HttpEventEncoder for AppsignalEventEncoder { - fn encode_event(&mut self, mut event: Event) -> Option { - self.transformer.transform(&mut event); +struct AppsignalSink { + service: S, + compression: Compression, + transformer: Transformer, + batch_settings: BatcherSettings, +} - match event { - Event::Log(log) => Some(json!({ "log": log })), - Event::Metric(metric) => Some(json!({ "metric": metric })), - _ => panic!("The AppSignal sink does not support this type of event: {event:?}"), - } +impl AppsignalSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let service = ServiceBuilder::new().service(self.service); + + input + .batched(self.batch_settings.into_byte_size_config()) + .request_builder( + None, + AppsignalRequestBuilder { + compression: self.compression, + encoder: AppsignalEncoder { + transformer: self.transformer.clone(), + }, + }, + ) + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(service) + .run() + .await } } #[async_trait::async_trait] -impl HttpSink for AppsignalSinkConfig { - type Input = serde_json::Value; - type Output = Vec; - type Encoder = AppsignalEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - AppsignalEventEncoder { - transformer: self.encoding.clone(), +impl StreamSink for AppsignalSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} + +#[derive(Clone)] +struct AppsignalEncoder { + pub transformer: crate::codecs::Transformer, +} + +impl Encoder> for AppsignalEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn std::io::Write, + ) -> std::io::Result<(usize, GroupedCountByteSize)> { + let mut result = Value::Array(Vec::new()); + let mut byte_size = telemetry().create_request_count_byte_size(); + for mut event in events { + self.transformer.transform(&mut event); + + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + let json = match event { + Event::Log(log) => json!({ "log": log }), + Event::Metric(metric) => json!({ "metric": metric }), + _ => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "The AppSignal sink does not support this type of event: {event:?}" + ), + )) + } + }; + if let Value::Array(ref mut array) = result { + array.push(json); + } } + let written_bytes = + as_tracked_write::<_, _, std::io::Error>(writer, &result, |writer, value| { + serde_json::to_writer(writer, value)?; + Ok(()) + })?; + + Ok((written_bytes, byte_size)) } +} - async fn build_request(&self, events: Self::Output) -> crate::Result> { - let uri = endpoint_uri(&self.endpoint, "vector/events")?; - let mut request = Request::post(uri).header( - AUTHORIZATION, - format!("Bearer {}", self.push_api_key.inner()), - ); +#[derive(Clone)] +struct AppsignalRequest { + payload: Bytes, + finalizers: EventFinalizers, + metadata: RequestMetadata, +} - let mut body = crate::serde::json::to_bytes(&events)?.freeze(); - if let Some(ce) = self.compression.content_encoding() { - request = request.header("Content-Encoding", ce); +impl MetaDescriptive for AppsignalRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl Finalizable for AppsignalRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +impl ByteSizeOf for AppsignalRequest { + fn allocated_bytes(&self) -> usize { + self.payload.allocated_bytes() + self.finalizers.allocated_bytes() + } +} + +struct AppsignalRequestBuilder { + encoder: AppsignalEncoder, + compression: Compression, +} + +impl RequestBuilder> for AppsignalRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = AppsignalEncoder; + type Payload = Bytes; + type Request = AppsignalRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + self.compression + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut input: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = input.take_finalizers(); + let metadata_builder = RequestMetadataBuilder::from_events(&input); + + (finalizers, metadata_builder, input) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + AppsignalRequest { + finalizers: metadata, + payload: payload.into_payload(), + metadata: request_metadata, } - let mut compressor = Compressor::from(self.compression); - write_all(&mut compressor, 0, &body)?; - body = compressor.finish().context(CompressionFailedSnafu)?.into(); - request.body(body).map_err(Into::into) } } -async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> { - let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key)); - let response = client.send(request.body(Body::empty()).unwrap()).await?; +#[derive(Clone)] +struct AppsignalService { + batch_service: + HttpBatchService, crate::Error>>, AppsignalRequest>, +} - match response.status() { - status if status.is_success() => Ok(()), - other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()), +impl AppsignalService { + pub fn new( + http_client: HttpClient, + endpoint: Uri, + push_api_key: SensitiveString, + compression: Compression, + ) -> Self { + let batch_service = HttpBatchService::new(http_client, move |req| { + let req: AppsignalRequest = req; + + let mut request = Request::post(&endpoint) + .header("Content-Type", "application/json") + .header(AUTHORIZATION, format!("Bearer {}", push_api_key.inner())) + .header("Content-Length", req.payload.len()); + if let Some(ce) = compression.content_encoding() { + request = request.header("Content-Encoding", ce) + } + let result = request.body(req.payload).map_err(|x| x.into()); + future::ready(result) + }); + Self { batch_service } + } +} + +impl Service for AppsignalService { + type Response = AppsignalResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, mut request: AppsignalRequest) -> Self::Future { + let mut http_service = self.batch_service.clone(); + + Box::pin(async move { + let metadata = std::mem::take(request.metadata_mut()); + http_service.ready().await?; + let bytes_sent = metadata.request_wire_size(); + let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); + let http_response = http_service.call(request).await?; + let event_status = if http_response.is_successful() { + EventStatus::Delivered + } else if http_response.is_transient() { + EventStatus::Errored + } else { + EventStatus::Rejected + }; + Ok(AppsignalResponse { + event_status, + http_status: http_response.status(), + event_byte_size, + bytes_sent, + }) + }) + } +} + +struct AppsignalResponse { + event_status: EventStatus, + http_status: StatusCode, + event_byte_size: GroupedCountByteSize, + bytes_sent: usize, +} + +impl DriverResponse for AppsignalResponse { + fn event_status(&self) -> EventStatus { + self.event_status + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.event_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.bytes_sent) } } @@ -234,21 +465,20 @@ mod test { }, }; - use super::{endpoint_uri, AppsignalSinkConfig}; + use super::{endpoint_uri, AppsignalConfig}; #[test] fn generate_config() { - crate::test_util::test_generate_config::(); + crate::test_util::test_generate_config::(); } #[tokio::test] async fn component_spec_compliance() { let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - let config = AppsignalSinkConfig::generate_config().to_string(); - let mut config = - AppsignalSinkConfig::deserialize(toml::de::ValueDeserializer::new(&config)) - .expect("config should be valid"); + let config = AppsignalConfig::generate_config().to_string(); + let mut config = AppsignalConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); config.endpoint = mock_endpoint.to_string(); let context = SinkContext::default(); diff --git a/website/cue/reference/components/sinks/base/appsignal.cue b/website/cue/reference/components/sinks/base/appsignal.cue index f3d81555a59d5..3bddd96936173 100644 --- a/website/cue/reference/components/sinks/base/appsignal.cue +++ b/website/cue/reference/components/sinks/base/appsignal.cue @@ -283,7 +283,7 @@ base: components: sinks: appsignal: configuration: { } } tls: { - description: "TLS configuration." + description: "Configures the TLS options for incoming/outgoing connections." required: false type: object: options: { alpn_protocols: { @@ -317,6 +317,16 @@ base: components: sinks: appsignal: configuration: { required: false type: string: examples: ["/path/to/host_certificate.crt"] } + enabled: { + description: """ + Whether or not to require TLS for incoming or outgoing connections. + + When enabled and used for incoming connections, an identity certificate is also required. See `tls.crt_file` for + more information. + """ + required: false + type: bool: {} + } key_file: { description: """ Absolute path to a private key file used to identify this server. From 230f0054a9eb39493669c330c645521d35ec1f64 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Tue, 15 Aug 2023 09:57:49 +0200 Subject: [PATCH 2/3] Split AppSignal sink into separate modules As per review feedback: split the new sink style into separate module files. --- src/sinks/appsignal/config.rs | 199 +++++++++ src/sinks/appsignal/encoder.rs | 52 +++ src/sinks/appsignal/integration_tests.rs | 2 +- src/sinks/appsignal/mod.rs | 504 +---------------------- src/sinks/appsignal/request_builder.rs | 88 ++++ src/sinks/appsignal/service.rs | 112 +++++ src/sinks/appsignal/sink.rs | 76 ++++ src/sinks/appsignal/tests.rs | 30 ++ 8 files changed, 565 insertions(+), 498 deletions(-) create mode 100644 src/sinks/appsignal/config.rs create mode 100644 src/sinks/appsignal/encoder.rs create mode 100644 src/sinks/appsignal/request_builder.rs create mode 100644 src/sinks/appsignal/service.rs create mode 100644 src/sinks/appsignal/sink.rs create mode 100644 src/sinks/appsignal/tests.rs diff --git a/src/sinks/appsignal/config.rs b/src/sinks/appsignal/config.rs new file mode 100644 index 0000000000000..16ba89c8b6668 --- /dev/null +++ b/src/sinks/appsignal/config.rs @@ -0,0 +1,199 @@ +use futures::FutureExt; +use http::{header::AUTHORIZATION, Request, Uri}; +use hyper::Body; +use tower::ServiceBuilder; +use vector_common::sensitive_string::SensitiveString; +use vector_config::configurable_component; +use vector_core::{ + config::{proxy::ProxyConfig, AcknowledgementsConfig, DataType, Input}, + tls::{MaybeTlsSettings, TlsEnableableConfig}, +}; + +use crate::{ + codecs::Transformer, + http::HttpClient, + sinks::{ + prelude::{SinkConfig, SinkContext}, + util::{ + http::HttpStatusRetryLogic, BatchConfig, Compression, ServiceBuilderExt, + SinkBatchSettings, TowerRequestConfig, + }, + BuildError, Healthcheck, HealthcheckError, VectorSink, + }, +}; + +use super::{ + service::{AppsignalResponse, AppsignalService}, + sink::AppsignalSink, +}; + +/// Configuration for the `appsignal` sink. +#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))] +#[derive(Clone, Debug, Default)] +pub struct AppsignalConfig { + /// The URI for the AppSignal API to send data to. + #[configurable(validation(format = "uri"))] + #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))] + #[serde(default = "default_endpoint")] + pub endpoint: String, + + /// A valid app-level AppSignal Push API key. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))] + push_api_key: SensitiveString, + + #[configurable(derived)] + #[serde(default = "Compression::gzip_default")] + compression: Compression, + + #[configurable(derived)] + #[serde(default)] + batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + request: TowerRequestConfig, + + #[configurable(derived)] + tls: Option, + + #[configurable(derived)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + encoding: Transformer, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +pub(crate) fn default_endpoint() -> String { + "https://appsignal-endpoint.net".to_string() +} + +#[derive(Clone, Copy, Debug, Default)] +pub(crate) struct AppsignalDefaultBatchSettings; + +impl SinkBatchSettings for AppsignalDefaultBatchSettings { + const MAX_EVENTS: Option = Some(100); + const MAX_BYTES: Option = Some(450_000); + const TIMEOUT_SECS: f64 = 1.0; +} + +impl AppsignalConfig { + pub(crate) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result { + let tls = MaybeTlsSettings::from_config(&self.tls, false)?; + let client = HttpClient::new(tls, proxy)?; + Ok(client) + } + + pub(crate) fn build_sink(&self, http_client: HttpClient) -> crate::Result { + let batch_settings = self.batch.into_batcher_settings()?; + + let endpoint = endpoint_uri(&self.endpoint, "vector/events")?; + let push_api_key = self.push_api_key.clone(); + let compression = self.compression; + let service = AppsignalService::new(http_client, endpoint, push_api_key, compression); + + let request_opts = self.request; + let request_settings = request_opts.unwrap_with(&TowerRequestConfig::default()); + let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status); + + let service = ServiceBuilder::new() + .settings(request_settings, retry_logic) + .service(service); + + let transformer = self.encoding.clone(); + let sink = AppsignalSink { + service, + compression, + transformer, + batch_settings, + }; + + Ok(VectorSink::from_event_streamsink(sink)) + } +} + +impl_generate_config_from_default!(AppsignalConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "appsignal")] +impl SinkConfig for AppsignalConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let client = self.build_client(cx.proxy())?; + let healthcheck = healthcheck( + endpoint_uri(&self.endpoint, "vector/healthcheck")?, + self.push_api_key.inner().to_string(), + client.clone(), + ) + .boxed(); + let sink = self.build_sink(client)?; + + Ok((sink, healthcheck)) + } + + fn input(&self) -> Input { + Input::new(DataType::Metric | DataType::Log) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> { + let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key)); + let response = client.send(request.body(Body::empty()).unwrap()).await?; + + match response.status() { + status if status.is_success() => Ok(()), + other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()), + } +} + +pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result { + let uri = if endpoint.ends_with('/') { + format!("{endpoint}{path}") + } else { + format!("{endpoint}/{path}") + }; + match uri.parse::() { + Ok(u) => Ok(u), + Err(e) => Err(Box::new(BuildError::UriParseError { source: e })), + } +} + +#[cfg(test)] +mod test { + use super::{endpoint_uri, AppsignalConfig}; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn endpoint_uri_with_path() { + let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events"); + assert_eq!( + uri.expect("Not a valid URI").to_string(), + "https://appsignal-endpoint.net/vector/events" + ); + } + + #[test] + fn endpoint_uri_with_trailing_slash() { + let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events"); + assert_eq!( + uri.expect("Not a valid URI").to_string(), + "https://appsignal-endpoint.net/vector/events" + ); + } +} diff --git a/src/sinks/appsignal/encoder.rs b/src/sinks/appsignal/encoder.rs new file mode 100644 index 0000000000000..dc782ae0171c6 --- /dev/null +++ b/src/sinks/appsignal/encoder.rs @@ -0,0 +1,52 @@ +use serde_json::{json, Value}; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::{config::telemetry, event::Event, EstimatedJsonEncodedSizeOf}; + +use crate::{ + codecs::Transformer, + sinks::util::encoding::{as_tracked_write, Encoder}, +}; + +#[derive(Clone)] +pub(crate) struct AppsignalEncoder { + pub transformer: Transformer, +} + +impl Encoder> for AppsignalEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn std::io::Write, + ) -> std::io::Result<(usize, GroupedCountByteSize)> { + let mut result = Value::Array(Vec::new()); + let mut byte_size = telemetry().create_request_count_byte_size(); + for mut event in events { + self.transformer.transform(&mut event); + + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + let json = match event { + Event::Log(log) => json!({ "log": log }), + Event::Metric(metric) => json!({ "metric": metric }), + _ => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "The AppSignal sink does not support this type of event: {event:?}" + ), + )) + } + }; + if let Value::Array(ref mut array) = result { + array.push(json); + } + } + let written_bytes = + as_tracked_write::<_, _, std::io::Error>(writer, &result, |writer, value| { + serde_json::to_writer(writer, value)?; + Ok(()) + })?; + + Ok((written_bytes, byte_size)) + } +} diff --git a/src/sinks/appsignal/integration_tests.rs b/src/sinks/appsignal/integration_tests.rs index eff40c7e7923d..1c28032caf40d 100644 --- a/src/sinks/appsignal/integration_tests.rs +++ b/src/sinks/appsignal/integration_tests.rs @@ -9,7 +9,7 @@ use vector_core::event::{ use crate::{ config::SinkConfig, - sinks::appsignal::AppsignalConfig, + sinks::appsignal::config::AppsignalConfig, sinks::util::test::{build_test_server_status, load_sink}, test_util::{ components::{ diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 5b8b01edbff25..175c456bf6cbc 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -6,503 +6,13 @@ //! //! Logs and metrics are stored on an per app basis and require an app-level Push API key. +mod config; +mod encoder; +mod request_builder; +mod service; +mod sink; + #[cfg(all(test, feature = "appsignal-integration-tests"))] mod integration_tests; - -use std::task::Poll; - -use futures::{ - future, - future::{BoxFuture, Ready}, -}; -use http::{header::AUTHORIZATION, Request, StatusCode, Uri}; -use hyper::Body; -use serde_json::{json, Value}; -use tower::{Service, ServiceBuilder, ServiceExt}; - -use crate::{ - http::HttpClient, - internal_events::SinkRequestBuildError, - sinks::{ - prelude::*, - util::{ - encoding::{as_tracked_write, Encoder}, - http::HttpBatchService, - http::HttpStatusRetryLogic, - sink::Response, - Compression, - }, - BuildError, - }, -}; -use bytes::Bytes; -use vector_common::sensitive_string::SensitiveString; -use vector_core::{ - config::{proxy::ProxyConfig, telemetry}, - tls::{MaybeTlsSettings, TlsEnableableConfig}, -}; - -/// Configuration for the `appsignal` sink. -#[configurable_component(sink("appsignal", "AppSignal sink."))] -#[derive(Clone, Debug, Default)] -pub struct AppsignalConfig { - /// The URI for the AppSignal API to send data to. - #[configurable(validation(format = "uri"))] - #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))] - #[serde(default = "default_endpoint")] - endpoint: String, - - /// A valid app-level AppSignal Push API key. - #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] - #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))] - push_api_key: SensitiveString, - - #[configurable(derived)] - #[serde(default = "Compression::gzip_default")] - compression: Compression, - - #[configurable(derived)] - #[serde(default)] - batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - request: TowerRequestConfig, - - #[configurable(derived)] - tls: Option, - - #[configurable(derived)] - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - encoding: Transformer, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -fn default_endpoint() -> String { - "https://appsignal-endpoint.net".to_string() -} - -#[derive(Clone, Copy, Debug, Default)] -struct AppsignalDefaultBatchSettings; - -impl SinkBatchSettings for AppsignalDefaultBatchSettings { - const MAX_EVENTS: Option = Some(100); - const MAX_BYTES: Option = Some(450_000); - const TIMEOUT_SECS: f64 = 1.0; -} - -impl AppsignalConfig { - fn build_client(&self, proxy: &ProxyConfig) -> crate::Result { - let tls = MaybeTlsSettings::from_config(&self.tls, false)?; - let client = HttpClient::new(tls, proxy)?; - Ok(client) - } - - fn build_sink(&self, http_client: HttpClient) -> crate::Result { - let batch_settings = self.batch.into_batcher_settings()?; - - let endpoint = endpoint_uri(&self.endpoint, "vector/events")?; - let push_api_key = self.push_api_key.clone(); - let compression = self.compression; - let service = AppsignalService::new(http_client, endpoint, push_api_key, compression); - - let request_opts = self.request; - let request_settings = request_opts.unwrap_with(&TowerRequestConfig::default()); - let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status); - - let service = ServiceBuilder::new() - .settings(request_settings, retry_logic) - .service(service); - - let transformer = self.encoding.clone(); - let sink = AppsignalSink { - service, - compression, - transformer, - batch_settings, - }; - - Ok(VectorSink::from_event_streamsink(sink)) - } -} - -impl_generate_config_from_default!(AppsignalConfig); - -#[async_trait::async_trait] -#[typetag::serde(name = "appsignal")] -impl SinkConfig for AppsignalConfig { - async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let client = self.build_client(cx.proxy())?; - let healthcheck = healthcheck( - endpoint_uri(&self.endpoint, "vector/healthcheck")?, - self.push_api_key.inner().to_string(), - client.clone(), - ) - .boxed(); - let sink = self.build_sink(client)?; - - Ok((sink, healthcheck)) - } - - fn input(&self) -> Input { - Input::new(DataType::Metric | DataType::Log) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> { - let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key)); - let response = client.send(request.body(Body::empty()).unwrap()).await?; - - match response.status() { - status if status.is_success() => Ok(()), - other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()), - } -} - -struct AppsignalSink { - service: S, - compression: Compression, - transformer: Transformer, - batch_settings: BatcherSettings, -} - -impl AppsignalSink -where - S: Service + Send + 'static, - S::Future: Send + 'static, - S::Response: DriverResponse + Send + 'static, - S::Error: std::fmt::Debug + Into + Send, -{ - async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let service = ServiceBuilder::new().service(self.service); - - input - .batched(self.batch_settings.into_byte_size_config()) - .request_builder( - None, - AppsignalRequestBuilder { - compression: self.compression, - encoder: AppsignalEncoder { - transformer: self.transformer.clone(), - }, - }, - ) - .filter_map(|request| async move { - match request { - Err(error) => { - emit!(SinkRequestBuildError { error }); - None - } - Ok(req) => Some(req), - } - }) - .into_driver(service) - .run() - .await - } -} - -#[async_trait::async_trait] -impl StreamSink for AppsignalSink -where - S: Service + Send + 'static, - S::Future: Send + 'static, - S::Response: DriverResponse + Send + 'static, - S::Error: std::fmt::Debug + Into + Send, -{ - async fn run( - self: Box, - input: futures_util::stream::BoxStream<'_, Event>, - ) -> Result<(), ()> { - self.run_inner(input).await - } -} - -#[derive(Clone)] -struct AppsignalEncoder { - pub transformer: crate::codecs::Transformer, -} - -impl Encoder> for AppsignalEncoder { - fn encode_input( - &self, - events: Vec, - writer: &mut dyn std::io::Write, - ) -> std::io::Result<(usize, GroupedCountByteSize)> { - let mut result = Value::Array(Vec::new()); - let mut byte_size = telemetry().create_request_count_byte_size(); - for mut event in events { - self.transformer.transform(&mut event); - - byte_size.add_event(&event, event.estimated_json_encoded_size_of()); - - let json = match event { - Event::Log(log) => json!({ "log": log }), - Event::Metric(metric) => json!({ "metric": metric }), - _ => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!( - "The AppSignal sink does not support this type of event: {event:?}" - ), - )) - } - }; - if let Value::Array(ref mut array) = result { - array.push(json); - } - } - let written_bytes = - as_tracked_write::<_, _, std::io::Error>(writer, &result, |writer, value| { - serde_json::to_writer(writer, value)?; - Ok(()) - })?; - - Ok((written_bytes, byte_size)) - } -} - -#[derive(Clone)] -struct AppsignalRequest { - payload: Bytes, - finalizers: EventFinalizers, - metadata: RequestMetadata, -} - -impl MetaDescriptive for AppsignalRequest { - fn get_metadata(&self) -> &RequestMetadata { - &self.metadata - } - - fn metadata_mut(&mut self) -> &mut RequestMetadata { - &mut self.metadata - } -} - -impl Finalizable for AppsignalRequest { - fn take_finalizers(&mut self) -> EventFinalizers { - self.finalizers.take_finalizers() - } -} - -impl ByteSizeOf for AppsignalRequest { - fn allocated_bytes(&self) -> usize { - self.payload.allocated_bytes() + self.finalizers.allocated_bytes() - } -} - -struct AppsignalRequestBuilder { - encoder: AppsignalEncoder, - compression: Compression, -} - -impl RequestBuilder> for AppsignalRequestBuilder { - type Metadata = EventFinalizers; - type Events = Vec; - type Encoder = AppsignalEncoder; - type Payload = Bytes; - type Request = AppsignalRequest; - type Error = std::io::Error; - - fn compression(&self) -> Compression { - self.compression - } - - fn encoder(&self) -> &Self::Encoder { - &self.encoder - } - - fn split_input( - &self, - mut input: Vec, - ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { - let finalizers = input.take_finalizers(); - let metadata_builder = RequestMetadataBuilder::from_events(&input); - - (finalizers, metadata_builder, input) - } - - fn build_request( - &self, - metadata: Self::Metadata, - request_metadata: RequestMetadata, - payload: EncodeResult, - ) -> Self::Request { - AppsignalRequest { - finalizers: metadata, - payload: payload.into_payload(), - metadata: request_metadata, - } - } -} - -#[derive(Clone)] -struct AppsignalService { - batch_service: - HttpBatchService, crate::Error>>, AppsignalRequest>, -} - -impl AppsignalService { - pub fn new( - http_client: HttpClient, - endpoint: Uri, - push_api_key: SensitiveString, - compression: Compression, - ) -> Self { - let batch_service = HttpBatchService::new(http_client, move |req| { - let req: AppsignalRequest = req; - - let mut request = Request::post(&endpoint) - .header("Content-Type", "application/json") - .header(AUTHORIZATION, format!("Bearer {}", push_api_key.inner())) - .header("Content-Length", req.payload.len()); - if let Some(ce) = compression.content_encoding() { - request = request.header("Content-Encoding", ce) - } - let result = request.body(req.payload).map_err(|x| x.into()); - future::ready(result) - }); - Self { batch_service } - } -} - -impl Service for AppsignalService { - type Response = AppsignalResponse; - type Error = crate::Error; - type Future = BoxFuture<'static, Result>; - - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, mut request: AppsignalRequest) -> Self::Future { - let mut http_service = self.batch_service.clone(); - - Box::pin(async move { - let metadata = std::mem::take(request.metadata_mut()); - http_service.ready().await?; - let bytes_sent = metadata.request_wire_size(); - let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); - let http_response = http_service.call(request).await?; - let event_status = if http_response.is_successful() { - EventStatus::Delivered - } else if http_response.is_transient() { - EventStatus::Errored - } else { - EventStatus::Rejected - }; - Ok(AppsignalResponse { - event_status, - http_status: http_response.status(), - event_byte_size, - bytes_sent, - }) - }) - } -} - -struct AppsignalResponse { - event_status: EventStatus, - http_status: StatusCode, - event_byte_size: GroupedCountByteSize, - bytes_sent: usize, -} - -impl DriverResponse for AppsignalResponse { - fn event_status(&self) -> EventStatus { - self.event_status - } - - fn events_sent(&self) -> &GroupedCountByteSize { - &self.event_byte_size - } - - fn bytes_sent(&self) -> Option { - Some(self.bytes_sent) - } -} - -fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result { - let uri = if endpoint.ends_with('/') { - format!("{endpoint}{path}") - } else { - format!("{endpoint}/{path}") - }; - match uri.parse::() { - Ok(u) => Ok(u), - Err(e) => Err(Box::new(BuildError::UriParseError { source: e })), - } -} - #[cfg(test)] -mod test { - use futures::{future::ready, stream}; - use serde::Deserialize; - use vector_core::event::{Event, LogEvent}; - - use crate::{ - config::{GenerateConfig, SinkConfig, SinkContext}, - test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - use super::{endpoint_uri, AppsignalConfig}; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - let config = AppsignalConfig::generate_config().to_string(); - let mut config = AppsignalConfig::deserialize(toml::de::ValueDeserializer::new(&config)) - .expect("config should be valid"); - config.endpoint = mock_endpoint.to_string(); - - let context = SinkContext::default(); - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let event = Event::Log(LogEvent::from("simple message")); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; - } - - #[test] - fn endpoint_uri_with_path() { - let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events"); - assert_eq!( - uri.expect("Not a valid URI").to_string(), - "https://appsignal-endpoint.net/vector/events" - ); - } - - #[test] - fn endpoint_uri_with_trailing_slash() { - let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events"); - assert_eq!( - uri.expect("Not a valid URI").to_string(), - "https://appsignal-endpoint.net/vector/events" - ); - } -} +mod tests; diff --git a/src/sinks/appsignal/request_builder.rs b/src/sinks/appsignal/request_builder.rs new file mode 100644 index 0000000000000..8e8cb334143b5 --- /dev/null +++ b/src/sinks/appsignal/request_builder.rs @@ -0,0 +1,88 @@ +use bytes::Bytes; + +use vector_common::{ + byte_size_of::ByteSizeOf, + finalization::{EventFinalizers, Finalizable}, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; +use vector_core::event::Event; + +use crate::sinks::util::{ + metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, RequestBuilder, +}; + +use super::encoder::AppsignalEncoder; + +#[derive(Clone)] +pub(crate) struct AppsignalRequest { + pub(crate) payload: Bytes, + pub(crate) finalizers: EventFinalizers, + pub(crate) metadata: RequestMetadata, +} + +impl MetaDescriptive for AppsignalRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl Finalizable for AppsignalRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +impl ByteSizeOf for AppsignalRequest { + fn allocated_bytes(&self) -> usize { + self.payload.allocated_bytes() + self.finalizers.allocated_bytes() + } +} + +pub(crate) struct AppsignalRequestBuilder { + pub(crate) encoder: AppsignalEncoder, + pub(crate) compression: Compression, +} + +impl RequestBuilder> for AppsignalRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = AppsignalEncoder; + type Payload = Bytes; + type Request = AppsignalRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + self.compression + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut input: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = input.take_finalizers(); + let metadata_builder = RequestMetadataBuilder::from_events(&input); + + (finalizers, metadata_builder, input) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + AppsignalRequest { + finalizers: metadata, + payload: payload.into_payload(), + metadata: request_metadata, + } + } +} diff --git a/src/sinks/appsignal/service.rs b/src/sinks/appsignal/service.rs new file mode 100644 index 0000000000000..48ff2f3ab49bf --- /dev/null +++ b/src/sinks/appsignal/service.rs @@ -0,0 +1,112 @@ +use std::task::Poll; + +use bytes::Bytes; +use futures::{ + future, + future::{BoxFuture, Ready}, +}; +use http::{header::AUTHORIZATION, Request, StatusCode, Uri}; +use hyper::Body; +use tower::{Service, ServiceExt}; + +use vector_common::{ + finalization::EventStatus, request_metadata::GroupedCountByteSize, + request_metadata::MetaDescriptive, sensitive_string::SensitiveString, +}; +use vector_core::stream::DriverResponse; + +use crate::{ + http::HttpClient, + sinks::util::{http::HttpBatchService, sink::Response, Compression}, +}; + +use super::request_builder::AppsignalRequest; + +#[derive(Clone)] +pub(crate) struct AppsignalService { + pub(crate) batch_service: + HttpBatchService, crate::Error>>, AppsignalRequest>, +} + +impl AppsignalService { + pub fn new( + http_client: HttpClient, + endpoint: Uri, + push_api_key: SensitiveString, + compression: Compression, + ) -> Self { + let batch_service = HttpBatchService::new(http_client, move |req| { + let req: AppsignalRequest = req; + + let mut request = Request::post(&endpoint) + .header("Content-Type", "application/json") + .header(AUTHORIZATION, format!("Bearer {}", push_api_key.inner())) + .header("Content-Length", req.payload.len()); + if let Some(ce) = compression.content_encoding() { + request = request.header("Content-Encoding", ce) + } + let result = request.body(req.payload).map_err(|x| x.into()); + future::ready(result) + }); + Self { batch_service } + } +} + +impl Service for AppsignalService { + type Response = AppsignalResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, mut request: AppsignalRequest) -> Self::Future { + let mut http_service = self.batch_service.clone(); + + Box::pin(async move { + let metadata = std::mem::take(request.metadata_mut()); + http_service.ready().await?; + let bytes_sent = metadata.request_wire_size(); + let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); + let http_response = http_service.call(request).await?; + let event_status = if http_response.is_successful() { + EventStatus::Delivered + } else if http_response.is_transient() { + EventStatus::Errored + } else { + EventStatus::Rejected + }; + Ok(AppsignalResponse { + event_status, + http_status: http_response.status(), + event_byte_size, + bytes_sent, + }) + }) + } +} + +pub struct AppsignalResponse { + pub(crate) event_status: EventStatus, + pub(crate) http_status: StatusCode, + pub(crate) event_byte_size: GroupedCountByteSize, + pub(crate) bytes_sent: usize, +} + +impl DriverResponse for AppsignalResponse { + fn event_status(&self) -> EventStatus { + self.event_status + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.event_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.bytes_sent) + } +} diff --git a/src/sinks/appsignal/sink.rs b/src/sinks/appsignal/sink.rs new file mode 100644 index 0000000000000..1d004f051f123 --- /dev/null +++ b/src/sinks/appsignal/sink.rs @@ -0,0 +1,76 @@ +use futures::{stream::BoxStream, StreamExt}; +use tower::{Service, ServiceBuilder}; +use vector_core::{ + event::Event, + sink::StreamSink, + stream::{BatcherSettings, DriverResponse}, +}; + +use crate::{ + codecs::Transformer, internal_events::SinkRequestBuildError, + sinks::util::builder::SinkBuilderExt, sinks::util::Compression, +}; + +use super::{ + encoder::AppsignalEncoder, + request_builder::{AppsignalRequest, AppsignalRequestBuilder}, +}; + +pub(crate) struct AppsignalSink { + pub(crate) service: S, + pub(crate) compression: Compression, + pub(crate) transformer: Transformer, + pub(crate) batch_settings: BatcherSettings, +} + +impl AppsignalSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + pub(crate) async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let service = ServiceBuilder::new().service(self.service); + + input + .batched(self.batch_settings.into_byte_size_config()) + .request_builder( + None, + AppsignalRequestBuilder { + compression: self.compression, + encoder: AppsignalEncoder { + transformer: self.transformer.clone(), + }, + }, + ) + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for AppsignalSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/appsignal/tests.rs b/src/sinks/appsignal/tests.rs new file mode 100644 index 0000000000000..8bc5fd94e0143 --- /dev/null +++ b/src/sinks/appsignal/tests.rs @@ -0,0 +1,30 @@ +use futures::{future::ready, stream}; +use serde::Deserialize; +use vector_config::component::GenerateConfig; +use vector_core::event::{Event, LogEvent}; + +use crate::{ + config::{SinkConfig, SinkContext}, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, +}; + +use super::config::AppsignalConfig; + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = AppsignalConfig::generate_config().to_string(); + let mut config = AppsignalConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); + config.endpoint = mock_endpoint.to_string(); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; +} From dc85a1dadd628da358ddc6e2700d6d543d8f7706 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Tue, 15 Aug 2023 19:41:24 +0200 Subject: [PATCH 3/3] Fix visibility of things in AppSignal sink It doesn't need to be visible for the entire crate, only the AppSignal sink scope. --- src/sinks/appsignal/config.rs | 12 ++++++------ src/sinks/appsignal/encoder.rs | 2 +- src/sinks/appsignal/request_builder.rs | 14 +++++++------- src/sinks/appsignal/service.rs | 12 ++++++------ src/sinks/appsignal/sink.rs | 12 ++++++------ 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/sinks/appsignal/config.rs b/src/sinks/appsignal/config.rs index 16ba89c8b6668..7d85422fbccc2 100644 --- a/src/sinks/appsignal/config.rs +++ b/src/sinks/appsignal/config.rs @@ -30,12 +30,12 @@ use super::{ /// Configuration for the `appsignal` sink. #[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))] #[derive(Clone, Debug, Default)] -pub struct AppsignalConfig { +pub(super) struct AppsignalConfig { /// The URI for the AppSignal API to send data to. #[configurable(validation(format = "uri"))] #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))] #[serde(default = "default_endpoint")] - pub endpoint: String, + pub(super) endpoint: String, /// A valid app-level AppSignal Push API key. #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] @@ -73,12 +73,12 @@ pub struct AppsignalConfig { acknowledgements: AcknowledgementsConfig, } -pub(crate) fn default_endpoint() -> String { +pub(super) fn default_endpoint() -> String { "https://appsignal-endpoint.net".to_string() } #[derive(Clone, Copy, Debug, Default)] -pub(crate) struct AppsignalDefaultBatchSettings; +pub(super) struct AppsignalDefaultBatchSettings; impl SinkBatchSettings for AppsignalDefaultBatchSettings { const MAX_EVENTS: Option = Some(100); @@ -87,13 +87,13 @@ impl SinkBatchSettings for AppsignalDefaultBatchSettings { } impl AppsignalConfig { - pub(crate) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result { + pub(super) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result { let tls = MaybeTlsSettings::from_config(&self.tls, false)?; let client = HttpClient::new(tls, proxy)?; Ok(client) } - pub(crate) fn build_sink(&self, http_client: HttpClient) -> crate::Result { + pub(super) fn build_sink(&self, http_client: HttpClient) -> crate::Result { let batch_settings = self.batch.into_batcher_settings()?; let endpoint = endpoint_uri(&self.endpoint, "vector/events")?; diff --git a/src/sinks/appsignal/encoder.rs b/src/sinks/appsignal/encoder.rs index dc782ae0171c6..f56edd04c2f68 100644 --- a/src/sinks/appsignal/encoder.rs +++ b/src/sinks/appsignal/encoder.rs @@ -8,7 +8,7 @@ use crate::{ }; #[derive(Clone)] -pub(crate) struct AppsignalEncoder { +pub(super) struct AppsignalEncoder { pub transformer: Transformer, } diff --git a/src/sinks/appsignal/request_builder.rs b/src/sinks/appsignal/request_builder.rs index 8e8cb334143b5..96c0cece7e6f4 100644 --- a/src/sinks/appsignal/request_builder.rs +++ b/src/sinks/appsignal/request_builder.rs @@ -14,10 +14,10 @@ use crate::sinks::util::{ use super::encoder::AppsignalEncoder; #[derive(Clone)] -pub(crate) struct AppsignalRequest { - pub(crate) payload: Bytes, - pub(crate) finalizers: EventFinalizers, - pub(crate) metadata: RequestMetadata, +pub(super) struct AppsignalRequest { + pub(super) payload: Bytes, + pub(super) finalizers: EventFinalizers, + pub(super) metadata: RequestMetadata, } impl MetaDescriptive for AppsignalRequest { @@ -42,9 +42,9 @@ impl ByteSizeOf for AppsignalRequest { } } -pub(crate) struct AppsignalRequestBuilder { - pub(crate) encoder: AppsignalEncoder, - pub(crate) compression: Compression, +pub(super) struct AppsignalRequestBuilder { + pub(super) encoder: AppsignalEncoder, + pub(super) compression: Compression, } impl RequestBuilder> for AppsignalRequestBuilder { diff --git a/src/sinks/appsignal/service.rs b/src/sinks/appsignal/service.rs index 48ff2f3ab49bf..c2fb558d6ac9d 100644 --- a/src/sinks/appsignal/service.rs +++ b/src/sinks/appsignal/service.rs @@ -23,8 +23,8 @@ use crate::{ use super::request_builder::AppsignalRequest; #[derive(Clone)] -pub(crate) struct AppsignalService { - pub(crate) batch_service: +pub(super) struct AppsignalService { + pub(super) batch_service: HttpBatchService, crate::Error>>, AppsignalRequest>, } @@ -91,10 +91,10 @@ impl Service for AppsignalService { } pub struct AppsignalResponse { - pub(crate) event_status: EventStatus, - pub(crate) http_status: StatusCode, - pub(crate) event_byte_size: GroupedCountByteSize, - pub(crate) bytes_sent: usize, + pub(super) event_status: EventStatus, + pub(super) http_status: StatusCode, + pub(super) event_byte_size: GroupedCountByteSize, + pub(super) bytes_sent: usize, } impl DriverResponse for AppsignalResponse { diff --git a/src/sinks/appsignal/sink.rs b/src/sinks/appsignal/sink.rs index 1d004f051f123..ab9b135829abb 100644 --- a/src/sinks/appsignal/sink.rs +++ b/src/sinks/appsignal/sink.rs @@ -16,11 +16,11 @@ use super::{ request_builder::{AppsignalRequest, AppsignalRequestBuilder}, }; -pub(crate) struct AppsignalSink { - pub(crate) service: S, - pub(crate) compression: Compression, - pub(crate) transformer: Transformer, - pub(crate) batch_settings: BatcherSettings, +pub(super) struct AppsignalSink { + pub(super) service: S, + pub(super) compression: Compression, + pub(super) transformer: Transformer, + pub(super) batch_settings: BatcherSettings, } impl AppsignalSink @@ -30,7 +30,7 @@ where S::Response: DriverResponse + Send + 'static, S::Error: std::fmt::Debug + Into + Send, { - pub(crate) async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + pub(super) async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let service = ServiceBuilder::new().service(self.service); input