diff --git a/src/sinks/appsignal/config.rs b/src/sinks/appsignal/config.rs new file mode 100644 index 0000000000000..7d85422fbccc2 --- /dev/null +++ b/src/sinks/appsignal/config.rs @@ -0,0 +1,199 @@ +use futures::FutureExt; +use http::{header::AUTHORIZATION, Request, Uri}; +use hyper::Body; +use tower::ServiceBuilder; +use vector_common::sensitive_string::SensitiveString; +use vector_config::configurable_component; +use vector_core::{ + config::{proxy::ProxyConfig, AcknowledgementsConfig, DataType, Input}, + tls::{MaybeTlsSettings, TlsEnableableConfig}, +}; + +use crate::{ + codecs::Transformer, + http::HttpClient, + sinks::{ + prelude::{SinkConfig, SinkContext}, + util::{ + http::HttpStatusRetryLogic, BatchConfig, Compression, ServiceBuilderExt, + SinkBatchSettings, TowerRequestConfig, + }, + BuildError, Healthcheck, HealthcheckError, VectorSink, + }, +}; + +use super::{ + service::{AppsignalResponse, AppsignalService}, + sink::AppsignalSink, +}; + +/// Configuration for the `appsignal` sink. +#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))] +#[derive(Clone, Debug, Default)] +pub(super) struct AppsignalConfig { + /// The URI for the AppSignal API to send data to. + #[configurable(validation(format = "uri"))] + #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))] + #[serde(default = "default_endpoint")] + pub(super) endpoint: String, + + /// A valid app-level AppSignal Push API key. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))] + push_api_key: SensitiveString, + + #[configurable(derived)] + #[serde(default = "Compression::gzip_default")] + compression: Compression, + + #[configurable(derived)] + #[serde(default)] + batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + request: TowerRequestConfig, + + #[configurable(derived)] + tls: Option, + + #[configurable(derived)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + encoding: Transformer, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +pub(super) fn default_endpoint() -> String { + "https://appsignal-endpoint.net".to_string() +} + +#[derive(Clone, Copy, Debug, Default)] +pub(super) struct AppsignalDefaultBatchSettings; + +impl SinkBatchSettings for AppsignalDefaultBatchSettings { + const MAX_EVENTS: Option = Some(100); + const MAX_BYTES: Option = Some(450_000); + const TIMEOUT_SECS: f64 = 1.0; +} + +impl AppsignalConfig { + pub(super) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result { + let tls = MaybeTlsSettings::from_config(&self.tls, false)?; + let client = HttpClient::new(tls, proxy)?; + Ok(client) + } + + pub(super) fn build_sink(&self, http_client: HttpClient) -> crate::Result { + let batch_settings = self.batch.into_batcher_settings()?; + + let endpoint = endpoint_uri(&self.endpoint, "vector/events")?; + let push_api_key = self.push_api_key.clone(); + let compression = self.compression; + let service = AppsignalService::new(http_client, endpoint, push_api_key, compression); + + let request_opts = self.request; + let request_settings = request_opts.unwrap_with(&TowerRequestConfig::default()); + let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status); + + let service = ServiceBuilder::new() + .settings(request_settings, retry_logic) + .service(service); + + let transformer = self.encoding.clone(); + let sink = AppsignalSink { + service, + compression, + transformer, + batch_settings, + }; + + Ok(VectorSink::from_event_streamsink(sink)) + } +} + +impl_generate_config_from_default!(AppsignalConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "appsignal")] +impl SinkConfig for AppsignalConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let client = self.build_client(cx.proxy())?; + let healthcheck = healthcheck( + endpoint_uri(&self.endpoint, "vector/healthcheck")?, + self.push_api_key.inner().to_string(), + client.clone(), + ) + .boxed(); + let sink = self.build_sink(client)?; + + Ok((sink, healthcheck)) + } + + fn input(&self) -> Input { + Input::new(DataType::Metric | DataType::Log) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> { + let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key)); + let response = client.send(request.body(Body::empty()).unwrap()).await?; + + match response.status() { + status if status.is_success() => Ok(()), + other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()), + } +} + +pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result { + let uri = if endpoint.ends_with('/') { + format!("{endpoint}{path}") + } else { + format!("{endpoint}/{path}") + }; + match uri.parse::() { + Ok(u) => Ok(u), + Err(e) => Err(Box::new(BuildError::UriParseError { source: e })), + } +} + +#[cfg(test)] +mod test { + use super::{endpoint_uri, AppsignalConfig}; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn endpoint_uri_with_path() { + let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events"); + assert_eq!( + uri.expect("Not a valid URI").to_string(), + "https://appsignal-endpoint.net/vector/events" + ); + } + + #[test] + fn endpoint_uri_with_trailing_slash() { + let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events"); + assert_eq!( + uri.expect("Not a valid URI").to_string(), + "https://appsignal-endpoint.net/vector/events" + ); + } +} diff --git a/src/sinks/appsignal/encoder.rs b/src/sinks/appsignal/encoder.rs new file mode 100644 index 0000000000000..f56edd04c2f68 --- /dev/null +++ b/src/sinks/appsignal/encoder.rs @@ -0,0 +1,52 @@ +use serde_json::{json, Value}; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::{config::telemetry, event::Event, EstimatedJsonEncodedSizeOf}; + +use crate::{ + codecs::Transformer, + sinks::util::encoding::{as_tracked_write, Encoder}, +}; + +#[derive(Clone)] +pub(super) struct AppsignalEncoder { + pub transformer: Transformer, +} + +impl Encoder> for AppsignalEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn std::io::Write, + ) -> std::io::Result<(usize, GroupedCountByteSize)> { + let mut result = Value::Array(Vec::new()); + let mut byte_size = telemetry().create_request_count_byte_size(); + for mut event in events { + self.transformer.transform(&mut event); + + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + let json = match event { + Event::Log(log) => json!({ "log": log }), + Event::Metric(metric) => json!({ "metric": metric }), + _ => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "The AppSignal sink does not support this type of event: {event:?}" + ), + )) + } + }; + if let Value::Array(ref mut array) = result { + array.push(json); + } + } + let written_bytes = + as_tracked_write::<_, _, std::io::Error>(writer, &result, |writer, value| { + serde_json::to_writer(writer, value)?; + Ok(()) + })?; + + Ok((written_bytes, byte_size)) + } +} diff --git a/src/sinks/appsignal/integration_tests.rs b/src/sinks/appsignal/integration_tests.rs index bcbee6a8052fc..1c28032caf40d 100644 --- a/src/sinks/appsignal/integration_tests.rs +++ b/src/sinks/appsignal/integration_tests.rs @@ -1,52 +1,78 @@ -use futures::stream; +use bytes::Bytes; +use futures::{channel::mpsc::Receiver, stream, StreamExt}; +use http::header::AUTHORIZATION; +use hyper::StatusCode; use indoc::indoc; -use vector_core::event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue}; +use vector_core::event::{ + BatchNotifier, BatchStatus, Event, LogEvent, Metric, MetricKind, MetricValue, +}; use crate::{ config::SinkConfig, - sinks::appsignal::AppsignalSinkConfig, - sinks::util::test::load_sink, + sinks::appsignal::config::AppsignalConfig, + sinks::util::test::{build_test_server_status, load_sink}, test_util::{ components::{ assert_sink_compliance, assert_sink_error, run_and_assert_sink_compliance, - COMPONENT_ERROR_TAGS, SINK_TAGS, + COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS, }, - generate_lines_with_stream, map_event_batch_stream, + generate_lines_with_stream, map_event_batch_stream, next_addr, }, }; +async fn start_test(events: Vec) -> (Vec, Receiver<(http::request::Parts, Bytes)>) { + let config = indoc! {r#" + push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}" + compression = "none" + "#}; + let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &push_api_key()); + let (mut config, cx) = load_sink::(config.as_str()).unwrap(); + let addr = next_addr(); + // Set the endpoint to a local server so we can fetch the sent events later + config.endpoint = format!("http://{}", addr); + + let (sink, _) = config.build(cx).await.unwrap(); + + // Always return OK from server. We're not testing responses. + let (rx, _trigger, server) = build_test_server_status(addr, StatusCode::OK); + tokio::spawn(server); + + let (batch, receiver) = BatchNotifier::new_with_receiver(); + + let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); + + sink.run(stream).await.unwrap(); + assert_eq!(receiver.await, BatchStatus::Delivered); + + (events, rx) +} + #[tokio::test] async fn logs_real_endpoint() { let config = indoc! {r#" push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}" "#}; - let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY") - .expect("couldn't find the AppSignal push API key in environment variables"); - assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required"); - let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &api_key); - let (config, cx) = load_sink::(config.as_str()).unwrap(); + let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &push_api_key()); + let (config, cx) = load_sink::(config.as_str()).unwrap(); let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); let generator = |index| format!("this is a log with index {}", index); let (_, events) = generate_lines_with_stream(generator, 10, Some(batch)); - run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; assert_eq!(receiver.await, BatchStatus::Delivered); } #[tokio::test] async fn metrics_real_endpoint() { - assert_sink_compliance(&SINK_TAGS, async { + assert_sink_compliance(&HTTP_SINK_TAGS, async { let config = indoc! {r#" push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}" "#}; - let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY") - .expect("couldn't find the AppSignal push API key in environment variables"); - assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required"); - let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &api_key); - let (config, cx) = load_sink::(config.as_str()).unwrap(); + let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &push_api_key()); + let (config, cx) = load_sink::(config.as_str()).unwrap(); let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); @@ -69,13 +95,139 @@ async fn metrics_real_endpoint() { .await; } +#[tokio::test] +async fn metrics_shape() { + let events: Vec<_> = (0..5) + .map(|index| { + Event::Metric(Metric::new( + format!("counter_{}", index), + MetricKind::Absolute, + MetricValue::Counter { + value: index as f64, + }, + )) + }) + .collect(); + let api_key = push_api_key(); + let (expected, rx) = start_test(events).await; + let output = rx.take(expected.len()).collect::>().await; + + for val in output.iter() { + assert_eq!( + val.0.headers.get("Content-Type").unwrap(), + "application/json" + ); + assert_eq!( + val.0.headers.get(AUTHORIZATION).unwrap(), + &format!("Bearer {api_key}") + ); + + let payload = std::str::from_utf8(&val.1).unwrap(); + let payload: serde_json::Value = serde_json::from_str(payload).unwrap(); + let events = payload.as_array().unwrap(); + assert_eq!(events.len(), 5); + + let metrics: Vec<(&str, &str, f64)> = events + .iter() + .map(|json_value| { + let metric = json_value + .as_object() + .unwrap() + .get("metric") + .unwrap() + .as_object() + .unwrap(); + let name = metric.get("name").unwrap().as_str().unwrap(); + let kind = metric.get("kind").unwrap().as_str().unwrap(); + let counter = metric.get("counter").unwrap().as_object().unwrap(); + let value = counter.get("value").unwrap().as_f64().unwrap(); + (name, kind, value) + }) + .collect(); + assert_eq!( + vec![ + ("counter_0", "absolute", 0.0), + ("counter_1", "absolute", 1.0), + ("counter_2", "absolute", 2.0), + ("counter_3", "absolute", 3.0), + ("counter_4", "absolute", 4.0), + ], + metrics + ); + } +} + +#[tokio::test] +async fn logs_shape() { + let events: Vec<_> = (0..5) + .map(|index| Event::Log(LogEvent::from(format!("Log message {index}")))) + .collect(); + let api_key = push_api_key(); + let (expected, rx) = start_test(events).await; + let output = rx.take(expected.len()).collect::>().await; + + for val in output.iter() { + assert_eq!( + val.0.headers.get("Content-Type").unwrap(), + "application/json" + ); + assert_eq!( + val.0.headers.get(AUTHORIZATION).unwrap(), + &format!("Bearer {api_key}") + ); + + let payload = std::str::from_utf8(&val.1).unwrap(); + let payload: serde_json::Value = serde_json::from_str(payload).unwrap(); + let events = payload.as_array().unwrap(); + assert_eq!(events.len(), 5); + + let log_messages: Vec<&str> = events + .iter() + .map(|value| { + value + .as_object() + .unwrap() + .get("log") + .unwrap() + .as_object() + .unwrap() + .get("message") + .unwrap() + .as_str() + .unwrap() + }) + .collect(); + assert_eq!( + vec![ + "Log message 0", + "Log message 1", + "Log message 2", + "Log message 3", + "Log message 4", + ], + log_messages + ); + + let event = events + .last() + .unwrap() + .as_object() + .unwrap() + .get("log") + .unwrap() + .as_object() + .unwrap(); + assert!(!event.get("timestamp").unwrap().as_str().unwrap().is_empty()); + } +} + #[tokio::test] async fn error_scenario_real_endpoint() { assert_sink_error(&COMPONENT_ERROR_TAGS, async { let config = indoc! {r#" push_api_key = "invalid key" "#}; - let (config, cx) = load_sink::(config).unwrap(); + let (config, cx) = load_sink::(config).unwrap(); let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); @@ -91,3 +243,10 @@ async fn error_scenario_real_endpoint() { }) .await; } + +fn push_api_key() -> String { + let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY") + .expect("couldn't find the AppSignal push API key in environment variables"); + assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required"); + api_key +} diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 31cb7a8e2ecc2..175c456bf6cbc 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -6,273 +6,13 @@ //! //! Logs and metrics are stored on an per app basis and require an app-level Push API key. +mod config; +mod encoder; +mod request_builder; +mod service; +mod sink; + #[cfg(all(test, feature = "appsignal-integration-tests"))] mod integration_tests; - -use bytes::Bytes; -use futures::{FutureExt, SinkExt}; -use http::{header::AUTHORIZATION, Request, Uri}; -use hyper::Body; -use serde_json::json; -use snafu::{ResultExt, Snafu}; -use vector_common::sensitive_string::SensitiveString; -use vector_config::configurable_component; - -use crate::{ - codecs::Transformer, - config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext}, - event::Event, - http::HttpClient, - sinks::{ - util::{ - encoding::write_all, - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, BoxedRawValue, Compression, Compressor, JsonArrayBuffer, - SinkBatchSettings, TowerRequestConfig, - }, - BuildError, - }, - tls::{TlsConfig, TlsSettings}, -}; - -#[derive(Debug, Snafu)] -enum FinishError { - #[snafu(display( - "Failure occurred during writing to or finalizing the compressor: {}", - source - ))] - CompressionFailed { source: std::io::Error }, -} - -/// Configuration for the `appsignal` sink. -#[configurable_component(sink("appsignal", "Send events to AppSignal."))] -#[derive(Clone, Debug, Default)] -pub struct AppsignalSinkConfig { - /// The URI for the AppSignal API to send data to. - #[configurable(validation(format = "uri"))] - #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))] - #[serde(default = "default_endpoint")] - endpoint: String, - - /// A valid app-level AppSignal Push API key. - #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] - #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))] - push_api_key: SensitiveString, - - #[configurable(derived)] - #[serde(default = "Compression::gzip_default")] - compression: Compression, - - #[configurable(derived)] - #[serde(default)] - batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - request: TowerRequestConfig, - - #[configurable(derived)] - tls: Option, - - #[configurable(derived)] - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - encoding: Transformer, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -fn default_endpoint() -> String { - "https://appsignal-endpoint.net".to_string() -} - -#[derive(Clone, Copy, Debug, Default)] -struct AppsignalDefaultBatchSettings; - -impl SinkBatchSettings for AppsignalDefaultBatchSettings { - const MAX_EVENTS: Option = Some(100); - const MAX_BYTES: Option = Some(450_000); - const TIMEOUT_SECS: f64 = 1.0; -} - -impl_generate_config_from_default!(AppsignalSinkConfig); - -#[async_trait::async_trait] -#[typetag::serde(name = "appsignal")] -impl SinkConfig for AppsignalSinkConfig { - async fn build( - &self, - cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let push_api_key = self.push_api_key.inner().to_string(); - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); - let batch_settings = self.batch.into_batch_settings()?; - - let buffer = JsonArrayBuffer::new(batch_settings.size); - - let tls_settings = TlsSettings::from_options(&self.tls)?; - let client = HttpClient::new(tls_settings, cx.proxy())?; - - let sink = BatchedHttpSink::new( - self.clone(), - buffer, - request_settings, - batch_settings.timeout, - client.clone(), - ) - .sink_map_err(|error| error!(message = "Fatal AppSignal sink error.", %error)); - - let healthcheck = healthcheck( - endpoint_uri(&self.endpoint, "vector/healthcheck")?, - push_api_key, - client, - ) - .boxed(); - - #[allow(deprecated)] - Ok((super::VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - Input::new(DataType::Metric | DataType::Log) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -/// Encode logs and metrics for requests to the AppSignal API. -/// It will use a JSON format wrapping events in either "log" or "metric", based on the type of event. -pub struct AppsignalEventEncoder { - transformer: Transformer, -} - -impl HttpEventEncoder for AppsignalEventEncoder { - fn encode_event(&mut self, mut event: Event) -> Option { - self.transformer.transform(&mut event); - - match event { - Event::Log(log) => Some(json!({ "log": log })), - Event::Metric(metric) => Some(json!({ "metric": metric })), - _ => panic!("The AppSignal sink does not support this type of event: {event:?}"), - } - } -} - -#[async_trait::async_trait] -impl HttpSink for AppsignalSinkConfig { - type Input = serde_json::Value; - type Output = Vec; - type Encoder = AppsignalEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - AppsignalEventEncoder { - transformer: self.encoding.clone(), - } - } - - async fn build_request(&self, events: Self::Output) -> crate::Result> { - let uri = endpoint_uri(&self.endpoint, "vector/events")?; - let mut request = Request::post(uri).header( - AUTHORIZATION, - format!("Bearer {}", self.push_api_key.inner()), - ); - - let mut body = crate::serde::json::to_bytes(&events)?.freeze(); - if let Some(ce) = self.compression.content_encoding() { - request = request.header("Content-Encoding", ce); - } - let mut compressor = Compressor::from(self.compression); - write_all(&mut compressor, 0, &body)?; - body = compressor.finish().context(CompressionFailedSnafu)?.into(); - request.body(body).map_err(Into::into) - } -} - -async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> { - let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key)); - let response = client.send(request.body(Body::empty()).unwrap()).await?; - - match response.status() { - status if status.is_success() => Ok(()), - other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()), - } -} - -fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result { - let uri = if endpoint.ends_with('/') { - format!("{endpoint}{path}") - } else { - format!("{endpoint}/{path}") - }; - match uri.parse::() { - Ok(u) => Ok(u), - Err(e) => Err(Box::new(BuildError::UriParseError { source: e })), - } -} - #[cfg(test)] -mod test { - use futures::{future::ready, stream}; - use serde::Deserialize; - use vector_core::event::{Event, LogEvent}; - - use crate::{ - config::{GenerateConfig, SinkConfig, SinkContext}, - test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - use super::{endpoint_uri, AppsignalSinkConfig}; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - let config = AppsignalSinkConfig::generate_config().to_string(); - let mut config = - AppsignalSinkConfig::deserialize(toml::de::ValueDeserializer::new(&config)) - .expect("config should be valid"); - config.endpoint = mock_endpoint.to_string(); - - let context = SinkContext::default(); - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let event = Event::Log(LogEvent::from("simple message")); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; - } - - #[test] - fn endpoint_uri_with_path() { - let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events"); - assert_eq!( - uri.expect("Not a valid URI").to_string(), - "https://appsignal-endpoint.net/vector/events" - ); - } - - #[test] - fn endpoint_uri_with_trailing_slash() { - let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events"); - assert_eq!( - uri.expect("Not a valid URI").to_string(), - "https://appsignal-endpoint.net/vector/events" - ); - } -} +mod tests; diff --git a/src/sinks/appsignal/request_builder.rs b/src/sinks/appsignal/request_builder.rs new file mode 100644 index 0000000000000..96c0cece7e6f4 --- /dev/null +++ b/src/sinks/appsignal/request_builder.rs @@ -0,0 +1,88 @@ +use bytes::Bytes; + +use vector_common::{ + byte_size_of::ByteSizeOf, + finalization::{EventFinalizers, Finalizable}, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; +use vector_core::event::Event; + +use crate::sinks::util::{ + metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, RequestBuilder, +}; + +use super::encoder::AppsignalEncoder; + +#[derive(Clone)] +pub(super) struct AppsignalRequest { + pub(super) payload: Bytes, + pub(super) finalizers: EventFinalizers, + pub(super) metadata: RequestMetadata, +} + +impl MetaDescriptive for AppsignalRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl Finalizable for AppsignalRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +impl ByteSizeOf for AppsignalRequest { + fn allocated_bytes(&self) -> usize { + self.payload.allocated_bytes() + self.finalizers.allocated_bytes() + } +} + +pub(super) struct AppsignalRequestBuilder { + pub(super) encoder: AppsignalEncoder, + pub(super) compression: Compression, +} + +impl RequestBuilder> for AppsignalRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = AppsignalEncoder; + type Payload = Bytes; + type Request = AppsignalRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + self.compression + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut input: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = input.take_finalizers(); + let metadata_builder = RequestMetadataBuilder::from_events(&input); + + (finalizers, metadata_builder, input) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + AppsignalRequest { + finalizers: metadata, + payload: payload.into_payload(), + metadata: request_metadata, + } + } +} diff --git a/src/sinks/appsignal/service.rs b/src/sinks/appsignal/service.rs new file mode 100644 index 0000000000000..c2fb558d6ac9d --- /dev/null +++ b/src/sinks/appsignal/service.rs @@ -0,0 +1,112 @@ +use std::task::Poll; + +use bytes::Bytes; +use futures::{ + future, + future::{BoxFuture, Ready}, +}; +use http::{header::AUTHORIZATION, Request, StatusCode, Uri}; +use hyper::Body; +use tower::{Service, ServiceExt}; + +use vector_common::{ + finalization::EventStatus, request_metadata::GroupedCountByteSize, + request_metadata::MetaDescriptive, sensitive_string::SensitiveString, +}; +use vector_core::stream::DriverResponse; + +use crate::{ + http::HttpClient, + sinks::util::{http::HttpBatchService, sink::Response, Compression}, +}; + +use super::request_builder::AppsignalRequest; + +#[derive(Clone)] +pub(super) struct AppsignalService { + pub(super) batch_service: + HttpBatchService, crate::Error>>, AppsignalRequest>, +} + +impl AppsignalService { + pub fn new( + http_client: HttpClient, + endpoint: Uri, + push_api_key: SensitiveString, + compression: Compression, + ) -> Self { + let batch_service = HttpBatchService::new(http_client, move |req| { + let req: AppsignalRequest = req; + + let mut request = Request::post(&endpoint) + .header("Content-Type", "application/json") + .header(AUTHORIZATION, format!("Bearer {}", push_api_key.inner())) + .header("Content-Length", req.payload.len()); + if let Some(ce) = compression.content_encoding() { + request = request.header("Content-Encoding", ce) + } + let result = request.body(req.payload).map_err(|x| x.into()); + future::ready(result) + }); + Self { batch_service } + } +} + +impl Service for AppsignalService { + type Response = AppsignalResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, mut request: AppsignalRequest) -> Self::Future { + let mut http_service = self.batch_service.clone(); + + Box::pin(async move { + let metadata = std::mem::take(request.metadata_mut()); + http_service.ready().await?; + let bytes_sent = metadata.request_wire_size(); + let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); + let http_response = http_service.call(request).await?; + let event_status = if http_response.is_successful() { + EventStatus::Delivered + } else if http_response.is_transient() { + EventStatus::Errored + } else { + EventStatus::Rejected + }; + Ok(AppsignalResponse { + event_status, + http_status: http_response.status(), + event_byte_size, + bytes_sent, + }) + }) + } +} + +pub struct AppsignalResponse { + pub(super) event_status: EventStatus, + pub(super) http_status: StatusCode, + pub(super) event_byte_size: GroupedCountByteSize, + pub(super) bytes_sent: usize, +} + +impl DriverResponse for AppsignalResponse { + fn event_status(&self) -> EventStatus { + self.event_status + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.event_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.bytes_sent) + } +} diff --git a/src/sinks/appsignal/sink.rs b/src/sinks/appsignal/sink.rs new file mode 100644 index 0000000000000..ab9b135829abb --- /dev/null +++ b/src/sinks/appsignal/sink.rs @@ -0,0 +1,76 @@ +use futures::{stream::BoxStream, StreamExt}; +use tower::{Service, ServiceBuilder}; +use vector_core::{ + event::Event, + sink::StreamSink, + stream::{BatcherSettings, DriverResponse}, +}; + +use crate::{ + codecs::Transformer, internal_events::SinkRequestBuildError, + sinks::util::builder::SinkBuilderExt, sinks::util::Compression, +}; + +use super::{ + encoder::AppsignalEncoder, + request_builder::{AppsignalRequest, AppsignalRequestBuilder}, +}; + +pub(super) struct AppsignalSink { + pub(super) service: S, + pub(super) compression: Compression, + pub(super) transformer: Transformer, + pub(super) batch_settings: BatcherSettings, +} + +impl AppsignalSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + pub(super) async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let service = ServiceBuilder::new().service(self.service); + + input + .batched(self.batch_settings.into_byte_size_config()) + .request_builder( + None, + AppsignalRequestBuilder { + compression: self.compression, + encoder: AppsignalEncoder { + transformer: self.transformer.clone(), + }, + }, + ) + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for AppsignalSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/appsignal/tests.rs b/src/sinks/appsignal/tests.rs new file mode 100644 index 0000000000000..8bc5fd94e0143 --- /dev/null +++ b/src/sinks/appsignal/tests.rs @@ -0,0 +1,30 @@ +use futures::{future::ready, stream}; +use serde::Deserialize; +use vector_config::component::GenerateConfig; +use vector_core::event::{Event, LogEvent}; + +use crate::{ + config::{SinkConfig, SinkContext}, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, +}; + +use super::config::AppsignalConfig; + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = AppsignalConfig::generate_config().to_string(); + let mut config = AppsignalConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); + config.endpoint = mock_endpoint.to_string(); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; +} diff --git a/website/cue/reference/components/sinks/base/appsignal.cue b/website/cue/reference/components/sinks/base/appsignal.cue index f3d81555a59d5..3bddd96936173 100644 --- a/website/cue/reference/components/sinks/base/appsignal.cue +++ b/website/cue/reference/components/sinks/base/appsignal.cue @@ -283,7 +283,7 @@ base: components: sinks: appsignal: configuration: { } } tls: { - description: "TLS configuration." + description: "Configures the TLS options for incoming/outgoing connections." required: false type: object: options: { alpn_protocols: { @@ -317,6 +317,16 @@ base: components: sinks: appsignal: configuration: { required: false type: string: examples: ["/path/to/host_certificate.crt"] } + enabled: { + description: """ + Whether or not to require TLS for incoming or outgoing connections. + + When enabled and used for incoming connections, an identity certificate is also required. See `tls.crt_file` for + more information. + """ + required: false + type: bool: {} + } key_file: { description: """ Absolute path to a private key file used to identify this server.