From ab780252460105111727f6697bc986348cc8c2ec Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Wed, 22 Feb 2023 12:15:43 +0100 Subject: [PATCH 01/21] feat(new sink): Add AppSignal sink Add a new sink for the [AppSignal.com](https://www.appsignal.com/) service. It sends a JSON payload based to the public appsignal-endpoint.net, using the HttpClient. --- .github/actions/spelling/allow.txt | 3 + .github/semantic.yml | 1 + Cargo.toml | 3 + src/sinks/appsignal/mod.rs | 268 ++++++++++++++++++ src/sinks/mod.rs | 9 + .../configuration/sinks/appsignal.md | 15 + .../reference/components/sinks/appsignal.cue | 86 ++++++ .../components/sinks/base/appsignal.cue | 242 ++++++++++++++++ website/cue/reference/services/appsignal.cue | 10 + website/cue/reference/urls.cue | 1 + 10 files changed, 638 insertions(+) create mode 100644 src/sinks/appsignal/mod.rs create mode 100644 website/content/en/docs/reference/configuration/sinks/appsignal.md create mode 100644 website/cue/reference/components/sinks/appsignal.cue create mode 100644 website/cue/reference/components/sinks/base/appsignal.cue create mode 100644 website/cue/reference/services/appsignal.cue diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index e003e4a875f42..5bf6e95941e01 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -28,6 +28,9 @@ Apanda apikey apimachinery apiserver +appsignal +Appsignal +APPSIGNAL archlinux Archos Arival diff --git a/.github/semantic.yml b/.github/semantic.yml index a03f00b43108f..fe0d3fd8d2626 100644 --- a/.github/semantic.yml +++ b/.github/semantic.yml @@ -185,6 +185,7 @@ scopes: # sinks - amqp sink # Anything `amqp` sink related + - appsignal sink # Anything `appsignal` sink related - aws_cloudwatch_logs sink # Anything `aws_cloudwatch_logs` sink related - aws_cloudwatch_metrics sink # Anything `aws_cloudwatch_metrics` sink related - aws_kinesis_firehose sink # Anything `aws_kinesis_firehose` sink related diff --git a/Cargo.toml b/Cargo.toml index c96b495549b4f..9360d349aff36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -606,6 +606,7 @@ transforms-throttle = ["dep:governor"] sinks = ["sinks-logs", "sinks-metrics"] sinks-logs = [ "sinks-amqp", + "sinks-appsignal", "sinks-aws_cloudwatch_logs", "sinks-aws_kinesis_firehose", "sinks-aws_kinesis_streams", @@ -647,6 +648,7 @@ sinks-logs = [ "sinks-websocket", ] sinks-metrics = [ + "sinks-appsignal", "sinks-aws_cloudwatch_metrics", "sinks-blackhole", "sinks-console", @@ -662,6 +664,7 @@ sinks-metrics = [ ] sinks-amqp = ["lapin"] +sinks-appsignal = [] sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs"] sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"] sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"] diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs new file mode 100644 index 0000000000000..844ccacd67ef6 --- /dev/null +++ b/src/sinks/appsignal/mod.rs @@ -0,0 +1,268 @@ +use std::io::Write; + +use bytes::{BufMut, Bytes, BytesMut}; +use flate2::write::GzEncoder; +use futures::{FutureExt, SinkExt}; +use http::{header::AUTHORIZATION, Request, Uri}; +use hyper::Body; +use serde_json::json; +use vector_common::sensitive_string::SensitiveString; +use vector_config::configurable_component; + +use crate::{ + codecs::Transformer, + config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext}, + event::Event, + http::HttpClient, + sinks::{ + util::{ + http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, + BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig, + }, + BuildError, + }, +}; + +/// Configuration for the `appsignal` sink. +#[configurable_component(sink("appsignal"))] +#[derive(Clone, Debug, Default)] +pub struct AppsignalSinkConfig { + /// The AppSignal API endpoint to report to. This is configured by default and doesn't need to be changed. + #[configurable(validation(format = "uri"))] + #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))] + endpoint: Option, + + /// A valid app-level AppSignal Push API key. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))] + push_api_key: SensitiveString, + + #[configurable(derived)] + #[serde(default)] + batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + request: TowerRequestConfig, + + #[configurable(derived)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + encoding: Transformer, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +#[derive(Clone, Copy, Debug, Default)] +struct AppsignalDefaultBatchSettings; + +impl SinkBatchSettings for AppsignalDefaultBatchSettings { + const MAX_EVENTS: Option = Some(100); + const MAX_BYTES: Option = Some(450_000); + const TIMEOUT_SECS: f64 = 1.0; +} + +impl_generate_config_from_default!(AppsignalSinkConfig); + +#[async_trait::async_trait] +impl SinkConfig for AppsignalSinkConfig { + async fn build( + &self, + cx: SinkContext, + ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { + let push_api_key = self.push_api_key.inner().to_string(); + let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); + let batch_settings = self.batch.into_batch_settings()?; + + let buffer = JsonArrayBuffer::new(batch_settings.size); + + let client = HttpClient::new(None, cx.proxy())?; + + let sink = BatchedHttpSink::new( + self.clone(), + buffer, + request_settings, + batch_settings.timeout, + client.clone(), + ) + .sink_map_err(|error| error!(message = "Fatal AppSignal sink error.", %error)); + + let healthcheck = healthcheck( + endpoint_uri(self.endpoint.as_ref(), "vector/healthcheck")?, + push_api_key, + client, + ) + .boxed(); + + Ok((super::VectorSink::from_event_sink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::new(DataType::Metric | DataType::Log) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +pub struct AppsignalEventEncoder { + transformer: Transformer, +} + +impl HttpEventEncoder for AppsignalEventEncoder { + fn encode_event(&mut self, mut event: Event) -> Option { + self.transformer.transform(&mut event); + + match event { + Event::Log(log) => Some(json!({ "log": log })), + Event::Metric(metric) => Some(json!({ "metric": metric })), + _ => panic!("The AppSignal sink does not support this event: {event:?}"), + } + } +} + +#[async_trait::async_trait] +impl HttpSink for AppsignalSinkConfig { + type Input = serde_json::Value; + type Output = Vec; + type Encoder = AppsignalEventEncoder; + + fn build_encoder(&self) -> Self::Encoder { + AppsignalEventEncoder { + transformer: self.encoding.clone(), + } + } + + async fn build_request(&self, events: Self::Output) -> crate::Result> { + let uri = endpoint_uri(self.endpoint.as_ref(), "vector/events")?; + let request = Request::post(uri) + .header( + AUTHORIZATION, + format!("Bearer {}", self.push_api_key.inner()), + ) + .header("Content-Encoding", "gzip"); + + let mut body = crate::serde::json::to_bytes(&events).unwrap().freeze(); + + let buffer = BytesMut::new(); + let mut writer = GzEncoder::new(buffer.writer(), flate2::Compression::new(6)); + writer.write_all(&body).expect("Writing to Vec can't fail"); + body = writer + .finish() + .expect("Writing to Vec can't fail") + .into_inner() + .into(); + + request.body(body).map_err(Into::into) + } +} + +pub(crate) async fn healthcheck( + uri: Uri, + push_api_key: String, + client: HttpClient, +) -> crate::Result<()> { + let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key)); + let response = client.send(request.body(Body::empty()).unwrap()).await?; + + match response.status() { + status if status.is_success() => Ok(()), + other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()), + } +} + +fn endpoint_uri(endpoint: Option<&String>, path: &str) -> crate::Result { + let mut uri = if let Some(endpoint) = endpoint { + endpoint.to_string() + } else { + "https://appsignal-endpoint.net".to_string() + }; + if !uri.ends_with('/') { + uri.push('/'); + } + uri.push_str(path); + match uri.parse::() { + Ok(u) => Ok(u), + Err(e) => Err(Box::new(BuildError::UriParseError { source: e })), + } +} +#[cfg(test)] +mod test { + use futures::{future::ready, stream}; + use serde::Deserialize; + use vector_core::event::{Event, LogEvent}; + + use crate::{ + config::{GenerateConfig, SinkConfig, SinkContext}, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, + }; + + use super::{endpoint_uri, AppsignalSinkConfig}; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[tokio::test] + async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = AppsignalSinkConfig::generate_config().to_string(); + let mut config = + AppsignalSinkConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); + config.endpoint = Some(mock_endpoint.to_string()); + + let context = SinkContext::new_test(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; + } + + #[test] + fn endpoint_uri_with_default_endpoint() { + let uri = endpoint_uri(None, "vector/events"); + assert_eq!( + uri.expect("Not a valid URI").to_string(), + "https://appsignal-endpoint.net/vector/events" + ); + } + + #[test] + fn endpoint_uri_with_endpoint_override() { + let uri = endpoint_uri( + Some(&"https://appsignal-endpoint.net".to_string()), + "vector/events", + ); + assert_eq!( + uri.expect("Not a valid URI").to_string(), + "https://appsignal-endpoint.net/vector/events" + ); + } + + #[test] + fn endpoint_uri_with_endpoint_override_and_trailing_slash() { + let uri = endpoint_uri( + Some(&"https://appsignal-endpoint.net/".to_string()), + "vector/events", + ); + assert_eq!( + uri.expect("Not a valid URI").to_string(), + "https://appsignal-endpoint.net/vector/events" + ); + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index d78b6ade94fda..b9d6955642e85 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -7,6 +7,8 @@ pub mod util; #[cfg(feature = "sinks-amqp")] pub mod amqp; +#[cfg(feature = "sinks-appsignal")] +pub mod appsignal; #[cfg(feature = "sinks-aws_cloudwatch_logs")] pub mod aws_cloudwatch_logs; #[cfg(feature = "sinks-aws_cloudwatch_metrics")] @@ -143,6 +145,11 @@ pub enum Sinks { #[configurable(metadata(docs::label = "AMQP"))] Amqp(amqp::AmqpSinkConfig), + /// Send events to AppSignal. + #[cfg(feature = "sinks-appsignal")] + #[configurable(metadata(docs::label = "AppSignal"))] + Appsignal(appsignal::AppsignalSinkConfig), + /// Publish log events to AWS CloudWatch Logs. #[cfg(feature = "sinks-aws_cloudwatch_logs")] #[configurable(metadata(docs::label = "AWS CloudWatch Logs"))] @@ -434,6 +441,8 @@ impl NamedComponent for Sinks { match self { #[cfg(feature = "sinks-amqp")] Self::Amqp(config) => config.get_component_name(), + #[cfg(feature = "sinks-appsignal")] + Self::Appsignal(config) => config.get_component_name(), #[cfg(feature = "sinks-aws_cloudwatch_logs")] Self::AwsCloudwatchLogs(config) => config.get_component_name(), #[cfg(feature = "sinks-aws_cloudwatch_metrics")] diff --git a/website/content/en/docs/reference/configuration/sinks/appsignal.md b/website/content/en/docs/reference/configuration/sinks/appsignal.md new file mode 100644 index 0000000000000..b6167a2de4f0f --- /dev/null +++ b/website/content/en/docs/reference/configuration/sinks/appsignal.md @@ -0,0 +1,15 @@ +--- +title: AppSignal +description: Deliver events to [AppSignal](https://www.appsignal.com/) +kind: sink +layout: component +tags: ["appsignal", "component", "sink", "logs", "metrics"] +aliases: ["/docs/reference/configuration/sinks/appsignal"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... + */}} diff --git a/website/cue/reference/components/sinks/appsignal.cue b/website/cue/reference/components/sinks/appsignal.cue new file mode 100644 index 0000000000000..3ec9e998df8db --- /dev/null +++ b/website/cue/reference/components/sinks/appsignal.cue @@ -0,0 +1,86 @@ +package metadata + +components: sinks: appsignal: { + title: "AppSignal" + + classes: { + commonly_used: false + delivery: "at_least_once" + development: "beta" + egress_method: "batch" + service_providers: ["AppSignal"] + stateful: false + } + + features: { + auto_generated: true + acknowledgements: true + healthcheck: enabled: true + send: { + batch: { + enabled: true + common: false + max_events: 100 + max_bytes: 450_000_000 + timeout_secs: 1.0 + } + compression: { + enabled: true + default: "gzip" + algorithms: ["gzip"] + levels: [6] + } + encoding: { + enabled: true + codec: enabled: false + } + proxy: enabled: true + request: { + enabled: true + concurrency: 100 + headers: false + } + tls: { + enabled: true + can_verify_certificate: true + can_verify_hostname: true + enabled_default: true + enabled_by_scheme: true + } + to: { + service: services.appsignal + + interface: { + socket: { + direction: "outgoing" + protocols: ["http"] + ssl: "required" + } + } + } + } + } + + support: { + requirements: [] + warnings: [] + notices: [] + } + + configuration: base.components.sinks.appsignal.configuration + + input: { + logs: true + metrics: { + counter: true + distribution: true + gauge: true + histogram: false + set: false + summary: false + } + traces: false + } + + telemetry: components.sinks.http.telemetry +} diff --git a/website/cue/reference/components/sinks/base/appsignal.cue b/website/cue/reference/components/sinks/base/appsignal.cue new file mode 100644 index 0000000000000..7b542943107b7 --- /dev/null +++ b/website/cue/reference/components/sinks/base/appsignal.cue @@ -0,0 +1,242 @@ +package metadata + +base: components: sinks: appsignal: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source connected to that sink, where the source supports + end-to-end acknowledgements as well, will wait for events to be acknowledged by the sink + before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + batch: { + description: "Event batching behavior." + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that will be processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized / compressed. + """ + required: false + type: uint: { + default: 450000 + unit: "bytes" + } + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: { + default: 100 + unit: "events" + } + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + encoding: { + description: "Transformations to prepare an event for serialization." + required: false + type: object: options: { + except_fields: { + description: "List of fields that will be excluded from the encoded event." + required: false + type: array: items: type: string: {} + } + only_fields: { + description: "List of fields that will be included in the encoded event." + required: false + type: array: items: type: string: {} + } + timestamp_format: { + description: "Format used for timestamp fields." + required: false + type: string: enum: { + rfc3339: "Represent the timestamp as a RFC 3339 timestamp." + unix: "Represent the timestamp as a Unix timestamp." + } + } + } + } + endpoint: { + description: "The AppSignal API endpoint to report to. This is configured by default and doesn't need to be changed." + required: false + type: string: examples: ["https://appsignal-endpoint.net"] + } + push_api_key: { + description: "A valid app-level AppSignal Push API key." + required: true + type: string: examples: ["00000000-0000-0000-0000-000000000000", "${APPSIGNAL_PUSH_API_KEY}"] + } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, etc. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + Note that the new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + + When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: "Configuration for outbound request concurrency." + required: false + type: { + string: { + default: "none" + enum: { + adaptive: """ + Concurrency will be managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/about/under-the-hood/networking/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: """ + The maximum number of retries to make for failed requests. + + The default, for all intents and purposes, represents an infinite number of retries. + """ + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the fibonacci sequence will be used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 3600 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + It is highly recommended that you do not lower this value below the service’s internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } +} diff --git a/website/cue/reference/services/appsignal.cue b/website/cue/reference/services/appsignal.cue new file mode 100644 index 0000000000000..9d37b3a9a0c0a --- /dev/null +++ b/website/cue/reference/services/appsignal.cue @@ -0,0 +1,10 @@ +package metadata + +services: appsignal: { + name: "AppSignal" + thing: "a \(name) account" + url: urls.appsignal + versions: null + + description: "[AppSignal](\(urls.appsignal)) is an all-in-one application monitoring tool. We help developers monitor their applications from A-Z with ease." +} diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 96b559cc820c4..7c17ec0c165b6 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -1,6 +1,7 @@ package metadata urls: { + appsignal: "https://www.appsignal.com/" azure_blob_storage: "https://azure.microsoft.com/en-us/services/storage/blobs/" azure_event_hubs: "https://learn.microsoft.com/en-us/azure/event-hubs/" azure_event_hubs_kafka: "https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-for-kafka-ecosystem-overview" From 864238fdcb001ed131920b9f3bc86f1833fc6d59 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Fri, 17 Mar 2023 09:42:59 +0100 Subject: [PATCH 02/21] Auto configure TLS settings for AppSignal sink We auto configure the TLS settings with the defaults. Remove the documentation option. --- src/sinks/appsignal/mod.rs | 4 +++- website/cue/reference/components/sinks/appsignal.cue | 8 +------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 844ccacd67ef6..23dc676083228 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -21,6 +21,7 @@ use crate::{ }, BuildError, }, + tls::TlsSettings, }; /// Configuration for the `appsignal` sink. @@ -84,7 +85,8 @@ impl SinkConfig for AppsignalSinkConfig { let buffer = JsonArrayBuffer::new(batch_settings.size); - let client = HttpClient::new(None, cx.proxy())?; + let tls_settings = TlsSettings::from_options(&None)?; + let client = HttpClient::new(tls_settings, cx.proxy())?; let sink = BatchedHttpSink::new( self.clone(), diff --git a/website/cue/reference/components/sinks/appsignal.cue b/website/cue/reference/components/sinks/appsignal.cue index 3ec9e998df8db..4a6c77728a0e2 100644 --- a/website/cue/reference/components/sinks/appsignal.cue +++ b/website/cue/reference/components/sinks/appsignal.cue @@ -40,13 +40,7 @@ components: sinks: appsignal: { concurrency: 100 headers: false } - tls: { - enabled: true - can_verify_certificate: true - can_verify_hostname: true - enabled_default: true - enabled_by_scheme: true - } + tls: enabled: false to: { service: services.appsignal From d2fca6473437fc3b32da9983a69889ac5aa63600 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Fri, 17 Mar 2023 10:39:21 +0100 Subject: [PATCH 03/21] Set default AppSignal endpoint with Serde This should improve how the docs for this config option are displayed. I couldn't directly add the String value as the default value, but had to use a function that returns the default value as per the Serde docs: https://serde.rs/attr-default.html --- src/sinks/appsignal/mod.rs | 38 +++++++++++++++----------------------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 23dc676083228..e6c7d70279556 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -31,7 +31,8 @@ pub struct AppsignalSinkConfig { /// The AppSignal API endpoint to report to. This is configured by default and doesn't need to be changed. #[configurable(validation(format = "uri"))] #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))] - endpoint: Option, + #[serde(default = "default_endpoint")] + endpoint: String, /// A valid app-level AppSignal Push API key. #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] @@ -62,6 +63,10 @@ pub struct AppsignalSinkConfig { acknowledgements: AcknowledgementsConfig, } +fn default_endpoint() -> String { + "https://appsignal-endpoint.net".to_string() +} + #[derive(Clone, Copy, Debug, Default)] struct AppsignalDefaultBatchSettings; @@ -98,7 +103,7 @@ impl SinkConfig for AppsignalSinkConfig { .sink_map_err(|error| error!(message = "Fatal AppSignal sink error.", %error)); let healthcheck = healthcheck( - endpoint_uri(self.endpoint.as_ref(), "vector/healthcheck")?, + endpoint_uri(self.endpoint.clone(), "vector/healthcheck")?, push_api_key, client, ) @@ -145,7 +150,7 @@ impl HttpSink for AppsignalSinkConfig { } async fn build_request(&self, events: Self::Output) -> crate::Result> { - let uri = endpoint_uri(self.endpoint.as_ref(), "vector/events")?; + let uri = endpoint_uri(self.endpoint.clone(), "vector/events")?; let request = Request::post(uri) .header( AUTHORIZATION, @@ -182,12 +187,8 @@ pub(crate) async fn healthcheck( } } -fn endpoint_uri(endpoint: Option<&String>, path: &str) -> crate::Result { - let mut uri = if let Some(endpoint) = endpoint { - endpoint.to_string() - } else { - "https://appsignal-endpoint.net".to_string() - }; +fn endpoint_uri(endpoint: String, path: &str) -> crate::Result { + let mut uri = endpoint.clone(); if !uri.ends_with('/') { uri.push('/'); } @@ -226,7 +227,7 @@ mod test { let mut config = AppsignalSinkConfig::deserialize(toml::de::ValueDeserializer::new(&config)) .expect("config should be valid"); - config.endpoint = Some(mock_endpoint.to_string()); + config.endpoint = mock_endpoint.to_string(); let context = SinkContext::new_test(); let (sink, _healthcheck) = config.build(context).await.unwrap(); @@ -236,18 +237,9 @@ mod test { } #[test] - fn endpoint_uri_with_default_endpoint() { - let uri = endpoint_uri(None, "vector/events"); - assert_eq!( - uri.expect("Not a valid URI").to_string(), - "https://appsignal-endpoint.net/vector/events" - ); - } - - #[test] - fn endpoint_uri_with_endpoint_override() { + fn endpoint_uri_with_path() { let uri = endpoint_uri( - Some(&"https://appsignal-endpoint.net".to_string()), + "https://appsignal-endpoint.net".to_string(), "vector/events", ); assert_eq!( @@ -257,9 +249,9 @@ mod test { } #[test] - fn endpoint_uri_with_endpoint_override_and_trailing_slash() { + fn endpoint_uri_with_trailing_slash() { let uri = endpoint_uri( - Some(&"https://appsignal-endpoint.net/".to_string()), + "https://appsignal-endpoint.net/".to_string(), "vector/events", ); assert_eq!( From 3fb4f542cfcb67d205f99f99ea63cd618d1d3dd5 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Fri, 17 Mar 2023 11:38:41 +0100 Subject: [PATCH 04/21] Fix clippy warning by removing unnecessary clone --- src/sinks/appsignal/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index e6c7d70279556..c276005e40a18 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -188,7 +188,7 @@ pub(crate) async fn healthcheck( } fn endpoint_uri(endpoint: String, path: &str) -> crate::Result { - let mut uri = endpoint.clone(); + let mut uri = endpoint; if !uri.ends_with('/') { uri.push('/'); } From d1deaecea4e6190bdd9434901d25c61636e9edee Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Fri, 17 Mar 2023 12:58:56 +0100 Subject: [PATCH 05/21] Update generated AppSignal sink docs --- website/cue/reference/components/sinks/base/appsignal.cue | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/website/cue/reference/components/sinks/base/appsignal.cue b/website/cue/reference/components/sinks/base/appsignal.cue index 7b542943107b7..3ccc980f147aa 100644 --- a/website/cue/reference/components/sinks/base/appsignal.cue +++ b/website/cue/reference/components/sinks/base/appsignal.cue @@ -89,7 +89,10 @@ base: components: sinks: appsignal: configuration: { endpoint: { description: "The AppSignal API endpoint to report to. This is configured by default and doesn't need to be changed." required: false - type: string: examples: ["https://appsignal-endpoint.net"] + type: string: { + default: "https://appsignal-endpoint.net" + examples: ["https://appsignal-endpoint.net"] + } } push_api_key: { description: "A valid app-level AppSignal Push API key." From 9b9e85f5827694bdf7a2955c98f6e9fcd6472bfe Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Mon, 20 Mar 2023 13:43:41 +0100 Subject: [PATCH 06/21] Add AppSignal sink integration test This requires a TEST_APPSIGNAL_PUSH_API_KEY to be set for the test to pass. --- .github/workflows/integration-test.yml | 1 + Cargo.toml | 2 + Makefile | 2 +- scripts/integration/appsignal/test.yaml | 11 ++++ src/sinks/appsignal/integration_tests.rs | 67 ++++++++++++++++++++++++ src/sinks/appsignal/mod.rs | 3 ++ 6 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 scripts/integration/appsignal/test.yaml create mode 100644 src/sinks/appsignal/integration_tests.rs diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 02e573f1d62d2..958adb45a3871 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -57,6 +57,7 @@ jobs: matrix: include: - test: 'amqp' + - test: 'appsignal' - test: 'aws' - test: 'axiom' - test: 'azure' diff --git a/Cargo.toml b/Cargo.toml index 9360d349aff36..af881aaee7262 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -729,6 +729,7 @@ nightly = [] # Testing-related features all-integration-tests = [ "amqp-integration-tests", + "appsignal-integration-tests", "aws-integration-tests", "axiom-integration-tests", "azure-integration-tests", @@ -766,6 +767,7 @@ all-integration-tests = [ ] amqp-integration-tests = ["sources-amqp", "sinks-amqp"] +appsignal-integration-tests = ["sinks-appsignal"] aws-integration-tests = [ "aws-cloudwatch-logs-integration-tests", diff --git a/Makefile b/Makefile index beb2defdb8276..bd9d69b23bb02 100644 --- a/Makefile +++ b/Makefile @@ -330,7 +330,7 @@ test-behavior: test-behavior-transforms test-behavior-formats test-behavior-conf .PHONY: test-integration test-integration: ## Runs all integration tests -test-integration: test-integration-amqp test-integration-aws test-integration-axiom test-integration-azure test-integration-chronicle test-integration-clickhouse +test-integration: test-integration-amqp test-integration-appsignal test-integration-aws test-integration-axiom test-integration-azure test-integration-chronicle test-integration-clickhouse test-integration: test-integration-databend test-integration-docker-logs test-integration-elasticsearch test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-humio test-integration-http-client test-integration-influxdb test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats diff --git a/scripts/integration/appsignal/test.yaml b/scripts/integration/appsignal/test.yaml new file mode 100644 index 0000000000000..ce8b12687b2a8 --- /dev/null +++ b/scripts/integration/appsignal/test.yaml @@ -0,0 +1,11 @@ +features: +- appsignal-integration-tests + +test_filter: '::appsignal::' + +runner: + env: + TEST_APPSIGNAL_PUSH_API_KEY: + +matrix: + version: [latest] diff --git a/src/sinks/appsignal/integration_tests.rs b/src/sinks/appsignal/integration_tests.rs new file mode 100644 index 0000000000000..cd50914789087 --- /dev/null +++ b/src/sinks/appsignal/integration_tests.rs @@ -0,0 +1,67 @@ +use futures::stream; +use indoc::indoc; +use vector_core::event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue}; + +use crate::{ + config::SinkConfig, + sinks::appsignal::AppsignalSinkConfig, + sinks::util::test::load_sink, + test_util::{ + components::{assert_sink_compliance, run_and_assert_sink_compliance, SINK_TAGS}, + generate_lines_with_stream, map_event_batch_stream, + }, +}; + +#[tokio::test] +async fn logs_real_endpoint() { + let config = indoc! {r#" + push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}" + "#}; + let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY") + .expect("couldn't find the AppSignal push API key in environment variables"); + assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required"); + let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &api_key); + let (config, cx) = load_sink::(config.as_str()).unwrap(); + + let (sink, _) = config.build(cx).await.unwrap(); + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let generator = |index| format!("this is a log with index {}", index); + let (_, events) = generate_lines_with_stream(generator, 10, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; + + assert_eq!(receiver.await, BatchStatus::Delivered); +} + +#[tokio::test] +async fn metrics_real_endpoint() { + assert_sink_compliance(&SINK_TAGS, async { + let config = indoc! {r#" + push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}" + "#}; + let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY") + .expect("couldn't find the AppSignal push API key in environment variables"); + assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required"); + let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &api_key); + let (config, cx) = load_sink::(config.as_str()).unwrap(); + + let (sink, _) = config.build(cx).await.unwrap(); + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let events: Vec<_> = (0..10) + .map(|index| { + Event::Metric(Metric::new( + "counter", + MetricKind::Absolute, + MetricValue::Counter { + value: index as f64, + }, + )) + }) + .collect(); + let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); + + sink.run(stream).await.unwrap(); + assert_eq!(receiver.await, BatchStatus::Delivered); + }) + .await; +} diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index c276005e40a18..5d846361dd198 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -1,3 +1,6 @@ +#[cfg(all(test, feature = "appsignal-integration-tests"))] +mod integration_tests; + use std::io::Write; use bytes::{BufMut, Bytes, BytesMut}; From b732764b4fcdb12593da1d725432ad65669b28bf Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Mon, 27 Mar 2023 14:05:25 +0200 Subject: [PATCH 07/21] Simplify the AppSignal endpoint_uri function Remove the need to clone the endpoint String. --- src/sinks/appsignal/mod.rs | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 5d846361dd198..abbab3b8f2e48 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -106,7 +106,7 @@ impl SinkConfig for AppsignalSinkConfig { .sink_map_err(|error| error!(message = "Fatal AppSignal sink error.", %error)); let healthcheck = healthcheck( - endpoint_uri(self.endpoint.clone(), "vector/healthcheck")?, + endpoint_uri(&self.endpoint, "vector/healthcheck")?, push_api_key, client, ) @@ -153,7 +153,7 @@ impl HttpSink for AppsignalSinkConfig { } async fn build_request(&self, events: Self::Output) -> crate::Result> { - let uri = endpoint_uri(self.endpoint.clone(), "vector/events")?; + let uri = endpoint_uri(&self.endpoint, "vector/events")?; let request = Request::post(uri) .header( AUTHORIZATION, @@ -190,17 +190,18 @@ pub(crate) async fn healthcheck( } } -fn endpoint_uri(endpoint: String, path: &str) -> crate::Result { - let mut uri = endpoint; - if !uri.ends_with('/') { - uri.push('/'); - } - uri.push_str(path); +fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result { + let uri = if endpoint.ends_with('/') { + format!("{endpoint}{path}") + } else { + format!("{endpoint}/{path}") + }; match uri.parse::() { Ok(u) => Ok(u), Err(e) => Err(Box::new(BuildError::UriParseError { source: e })), } } + #[cfg(test)] mod test { use futures::{future::ready, stream}; @@ -241,10 +242,7 @@ mod test { #[test] fn endpoint_uri_with_path() { - let uri = endpoint_uri( - "https://appsignal-endpoint.net".to_string(), - "vector/events", - ); + let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events"); assert_eq!( uri.expect("Not a valid URI").to_string(), "https://appsignal-endpoint.net/vector/events" @@ -253,10 +251,7 @@ mod test { #[test] fn endpoint_uri_with_trailing_slash() { - let uri = endpoint_uri( - "https://appsignal-endpoint.net/".to_string(), - "vector/events", - ); + let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events"); assert_eq!( uri.expect("Not a valid URI").to_string(), "https://appsignal-endpoint.net/vector/events" From 28c5babeef2eb1bcc7dee30f6a59c2b7f0221526 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Mon, 27 Mar 2023 14:21:54 +0200 Subject: [PATCH 08/21] Add AppSignal error case integration scenario Test if the AppSignal sink fails as expected when the config is incorrect. --- scripts/integration/appsignal/test.yaml | 2 +- src/sinks/appsignal/integration_tests.rs | 28 +++++++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/scripts/integration/appsignal/test.yaml b/scripts/integration/appsignal/test.yaml index ce8b12687b2a8..0f0970f75e4ac 100644 --- a/scripts/integration/appsignal/test.yaml +++ b/scripts/integration/appsignal/test.yaml @@ -1,7 +1,7 @@ features: - appsignal-integration-tests -test_filter: '::appsignal::' +test_filter: '::appsignal::integration_tests::' runner: env: diff --git a/src/sinks/appsignal/integration_tests.rs b/src/sinks/appsignal/integration_tests.rs index cd50914789087..bcbee6a8052fc 100644 --- a/src/sinks/appsignal/integration_tests.rs +++ b/src/sinks/appsignal/integration_tests.rs @@ -7,7 +7,10 @@ use crate::{ sinks::appsignal::AppsignalSinkConfig, sinks::util::test::load_sink, test_util::{ - components::{assert_sink_compliance, run_and_assert_sink_compliance, SINK_TAGS}, + components::{ + assert_sink_compliance, assert_sink_error, run_and_assert_sink_compliance, + COMPONENT_ERROR_TAGS, SINK_TAGS, + }, generate_lines_with_stream, map_event_batch_stream, }, }; @@ -65,3 +68,26 @@ async fn metrics_real_endpoint() { }) .await; } + +#[tokio::test] +async fn error_scenario_real_endpoint() { + assert_sink_error(&COMPONENT_ERROR_TAGS, async { + let config = indoc! {r#" + push_api_key = "invalid key" + "#}; + let (config, cx) = load_sink::(config).unwrap(); + + let (sink, _) = config.build(cx).await.unwrap(); + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let events = vec![Event::Metric(Metric::new( + "counter", + MetricKind::Absolute, + MetricValue::Counter { value: 1.0 }, + ))]; + let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); + + sink.run(stream).await.unwrap(); + assert_eq!(receiver.await, BatchStatus::Rejected); + }) + .await; +} From 351a0f0cb942b99e08ba7c5c92ee24c526b60683 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Mon, 27 Mar 2023 14:31:20 +0200 Subject: [PATCH 09/21] Use write_all helper function for payload encoding Use the write_all helper function to emit an error event rather than panicking. --- src/sinks/appsignal/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index abbab3b8f2e48..6e09d15014547 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -1,8 +1,6 @@ #[cfg(all(test, feature = "appsignal-integration-tests"))] mod integration_tests; -use std::io::Write; - use bytes::{BufMut, Bytes, BytesMut}; use flate2::write::GzEncoder; use futures::{FutureExt, SinkExt}; @@ -19,6 +17,7 @@ use crate::{ http::HttpClient, sinks::{ util::{ + encoding::write_all, http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig, }, @@ -165,7 +164,7 @@ impl HttpSink for AppsignalSinkConfig { let buffer = BytesMut::new(); let mut writer = GzEncoder::new(buffer.writer(), flate2::Compression::new(6)); - writer.write_all(&body).expect("Writing to Vec can't fail"); + write_all(&mut writer, 0, &body)?; body = writer .finish() .expect("Writing to Vec can't fail") From 8bc473a20e50a6b0523319eb24c4b834084bed16 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Mon, 27 Mar 2023 14:49:37 +0200 Subject: [PATCH 10/21] Handle compression error without panic Add a snafu error for context when writing to the gzip compressor fails. --- src/sinks/appsignal/mod.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 6e09d15014547..916a241f3674d 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -7,6 +7,7 @@ use futures::{FutureExt, SinkExt}; use http::{header::AUTHORIZATION, Request, Uri}; use hyper::Body; use serde_json::json; +use snafu::{ResultExt, Snafu}; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; @@ -26,6 +27,26 @@ use crate::{ tls::TlsSettings, }; +#[derive(Debug, Snafu)] +pub enum FinishError { + #[snafu(display( + "Failure occurred during writing to or finalizing the compressor: {}", + source + ))] + CompressionFailed { source: std::io::Error }, +} + +impl FinishError { + /// Gets the telemetry-friendly string version of this error. + /// + /// The value will be a short string with only lowercase letters and underscores. + pub const fn as_error_type(&self) -> &'static str { + match self { + Self::CompressionFailed { .. } => "compression_failed", + } + } +} + /// Configuration for the `appsignal` sink. #[configurable_component(sink("appsignal"))] #[derive(Clone, Debug, Default)] @@ -167,7 +188,7 @@ impl HttpSink for AppsignalSinkConfig { write_all(&mut writer, 0, &body)?; body = writer .finish() - .expect("Writing to Vec can't fail") + .context(CompressionFailedSnafu)? .into_inner() .into(); From f07ae12b7a9bc5950e8fe3387bd671ea2acbab65 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Mon, 27 Mar 2023 15:32:46 +0200 Subject: [PATCH 11/21] Apply fixes from a couple review feedback items Small changes to fix issues that were reported in the PR review. --- src/sinks/appsignal/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 916a241f3674d..85be4d59c103d 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -51,7 +51,7 @@ impl FinishError { #[configurable_component(sink("appsignal"))] #[derive(Clone, Debug, Default)] pub struct AppsignalSinkConfig { - /// The AppSignal API endpoint to report to. This is configured by default and doesn't need to be changed. + /// The URI for the AppSignal API to send data to. #[configurable(validation(format = "uri"))] #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))] #[serde(default = "default_endpoint")] @@ -155,7 +155,7 @@ impl HttpEventEncoder for AppsignalEventEncoder { match event { Event::Log(log) => Some(json!({ "log": log })), Event::Metric(metric) => Some(json!({ "metric": metric })), - _ => panic!("The AppSignal sink does not support this event: {event:?}"), + _ => panic!("The AppSignal sink does not support this type of event: {event:?}"), } } } @@ -181,7 +181,7 @@ impl HttpSink for AppsignalSinkConfig { ) .header("Content-Encoding", "gzip"); - let mut body = crate::serde::json::to_bytes(&events).unwrap().freeze(); + let mut body = crate::serde::json::to_bytes(&events)?.freeze(); let buffer = BytesMut::new(); let mut writer = GzEncoder::new(buffer.writer(), flate2::Compression::new(6)); From af88729c2adbe3788cd5cb54249799ec58d12878 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Mon, 27 Mar 2023 15:43:23 +0200 Subject: [PATCH 12/21] Add basic AppSignal module documentation --- src/sinks/appsignal/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 85be4d59c103d..3db6afc5f66c6 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -1,3 +1,11 @@ +//! The AppSignal sink +//! +//! This sink provides downstream support for `AppSignal` to collect logs and a subset of Vector +//! metric types. These events are sent to the `appsignal-endpoint.net` domain, which is part of +//! the `appsignal.com` infrastructure. +//! +//! Logs and metrics are stored on an per app basis are require an app-level Push API key. + #[cfg(all(test, feature = "appsignal-integration-tests"))] mod integration_tests; From 09d88d07115d80f05d406e68365e1dda237459c0 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Wed, 29 Mar 2023 11:02:54 +0200 Subject: [PATCH 13/21] Add zlib compression support to AppSignal sink By request, because the AppSignal API supports zlib compression, add it to the sink as well as a configuration option. We don't want people to send it without any compression, so a custom compression struct has been added for the configuration. The compression code was copied from the http sink implementation. --- src/sinks/appsignal/mod.rs | 69 +++++++++++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 16 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 3db6afc5f66c6..246be774ee6fe 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -10,7 +10,7 @@ mod integration_tests; use bytes::{BufMut, Bytes, BytesMut}; -use flate2::write::GzEncoder; +use flate2::write::{GzEncoder, ZlibEncoder}; use futures::{FutureExt, SinkExt}; use http::{header::AUTHORIZATION, Request, Uri}; use hyper::Body; @@ -55,6 +55,24 @@ impl FinishError { } } +/// Supported compression types for the AppSignal sink. +#[configurable_component] +#[derive(Copy, Clone, Debug, Derivative, Eq, PartialEq)] +#[serde(rename_all = "lowercase")] +#[derivative(Default)] +enum AppsignalCompression { + /// [Gzip][gzip] compression. + /// + /// [gzip]: https://www.gzip.org/ + #[derivative(Default)] + Gzip, + + /// [Zlib][zlib] compression. + /// + /// [zlib]: https://zlib.net/ + Zlib, +} + /// Configuration for the `appsignal` sink. #[configurable_component(sink("appsignal"))] #[derive(Clone, Debug, Default)] @@ -70,6 +88,10 @@ pub struct AppsignalSinkConfig { #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))] push_api_key: SensitiveString, + #[configurable(derived)] + #[serde(default)] + compression: AppsignalCompression, + #[configurable(derived)] #[serde(default)] batch: BatchConfig, @@ -182,24 +204,39 @@ impl HttpSink for AppsignalSinkConfig { async fn build_request(&self, events: Self::Output) -> crate::Result> { let uri = endpoint_uri(&self.endpoint, "vector/events")?; - let request = Request::post(uri) - .header( - AUTHORIZATION, - format!("Bearer {}", self.push_api_key.inner()), - ) - .header("Content-Encoding", "gzip"); + let mut request = Request::post(uri).header( + AUTHORIZATION, + format!("Bearer {}", self.push_api_key.inner()), + ); let mut body = crate::serde::json::to_bytes(&events)?.freeze(); - let buffer = BytesMut::new(); - let mut writer = GzEncoder::new(buffer.writer(), flate2::Compression::new(6)); - write_all(&mut writer, 0, &body)?; - body = writer - .finish() - .context(CompressionFailedSnafu)? - .into_inner() - .into(); - + match self.compression { + AppsignalCompression::Gzip => { + request = request.header("Content-Encoding", "gzip"); + + let buffer = BytesMut::new(); + let mut writer = GzEncoder::new(buffer.writer(), flate2::Compression::new(6)); + write_all(&mut writer, 0, &body)?; + body = writer + .finish() + .context(CompressionFailedSnafu)? + .into_inner() + .into(); + } + AppsignalCompression::Zlib => { + request = request.header("Content-Encoding", "deflate"); + + let buffer = BytesMut::new(); + let mut writer = ZlibEncoder::new(buffer.writer(), flate2::Compression::new(6)); + write_all(&mut writer, 0, &body)?; + body = writer + .finish() + .context(CompressionFailedSnafu)? + .into_inner() + .into(); + } + } request.body(body).map_err(Into::into) } } From 01c615e7cdb7fca3a2bb0608d52860b071696eba Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Wed, 29 Mar 2023 11:05:44 +0200 Subject: [PATCH 14/21] Update AppSignal sink copy Fix typo and update the "thing" label to refer to "an AppSignal app", the "thing" where the data is reported to. --- src/sinks/appsignal/mod.rs | 2 +- website/cue/reference/services/appsignal.cue | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 246be774ee6fe..09412fe75d0ac 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -4,7 +4,7 @@ //! metric types. These events are sent to the `appsignal-endpoint.net` domain, which is part of //! the `appsignal.com` infrastructure. //! -//! Logs and metrics are stored on an per app basis are require an app-level Push API key. +//! Logs and metrics are stored on an per app basis and require an app-level Push API key. #[cfg(all(test, feature = "appsignal-integration-tests"))] mod integration_tests; diff --git a/website/cue/reference/services/appsignal.cue b/website/cue/reference/services/appsignal.cue index 9d37b3a9a0c0a..f9d8543c97a15 100644 --- a/website/cue/reference/services/appsignal.cue +++ b/website/cue/reference/services/appsignal.cue @@ -2,7 +2,7 @@ package metadata services: appsignal: { name: "AppSignal" - thing: "a \(name) account" + thing: "an \(name) app" url: urls.appsignal versions: null From efa76319bc718df0333df74c9bc64328849c0255 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Thu, 30 Mar 2023 09:21:58 +0200 Subject: [PATCH 15/21] Simplify AppSignal sink compression step Reduce the amount code needed in the sink itself by using the Compression and Compressor structs to do all the compression. --- src/sinks/appsignal/mod.rs | 33 ++++++++++----------------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 09412fe75d0ac..aeedb24640175 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -9,8 +9,7 @@ #[cfg(all(test, feature = "appsignal-integration-tests"))] mod integration_tests; -use bytes::{BufMut, Bytes, BytesMut}; -use flate2::write::{GzEncoder, ZlibEncoder}; +use bytes::Bytes; use futures::{FutureExt, SinkExt}; use http::{header::AUTHORIZATION, Request, Uri}; use hyper::Body; @@ -28,7 +27,8 @@ use crate::{ util::{ encoding::write_all, http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig, + BatchConfig, BoxedRawValue, Compression, Compressor, JsonArrayBuffer, + SinkBatchSettings, TowerRequestConfig, }, BuildError, }, @@ -211,32 +211,19 @@ impl HttpSink for AppsignalSinkConfig { let mut body = crate::serde::json::to_bytes(&events)?.freeze(); - match self.compression { + let compression = match self.compression { AppsignalCompression::Gzip => { request = request.header("Content-Encoding", "gzip"); - - let buffer = BytesMut::new(); - let mut writer = GzEncoder::new(buffer.writer(), flate2::Compression::new(6)); - write_all(&mut writer, 0, &body)?; - body = writer - .finish() - .context(CompressionFailedSnafu)? - .into_inner() - .into(); + Compression::gzip_default() } AppsignalCompression::Zlib => { request = request.header("Content-Encoding", "deflate"); - - let buffer = BytesMut::new(); - let mut writer = ZlibEncoder::new(buffer.writer(), flate2::Compression::new(6)); - write_all(&mut writer, 0, &body)?; - body = writer - .finish() - .context(CompressionFailedSnafu)? - .into_inner() - .into(); + Compression::zlib_default() } - } + }; + let mut compressor = Compressor::from(compression); + write_all(&mut compressor, 0, &body)?; + body = compressor.finish().context(CompressionFailedSnafu)?.into(); request.body(body).map_err(Into::into) } } From f287fc3c3fbb7109156fdcfd9924964230c31904 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Thu, 30 Mar 2023 10:13:35 +0200 Subject: [PATCH 16/21] Update appsignal.cue file Run `make generate-component-docs` after changes made to the AppSignal sink configuration. --- .../components/sinks/base/appsignal.cue | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/website/cue/reference/components/sinks/base/appsignal.cue b/website/cue/reference/components/sinks/base/appsignal.cue index 3ccc980f147aa..5f0c29ea5cf49 100644 --- a/website/cue/reference/components/sinks/base/appsignal.cue +++ b/website/cue/reference/components/sinks/base/appsignal.cue @@ -62,6 +62,25 @@ base: components: sinks: appsignal: configuration: { } } } + compression: { + description: "Supported compression types for the AppSignal sink." + required: false + type: string: { + default: "gzip" + enum: { + gzip: """ + [Gzip][gzip] compression. + + [gzip]: https://www.gzip.org/ + """ + zlib: """ + [Zlib][zlib] compression. + + [zlib]: https://zlib.net/ + """ + } + } + } encoding: { description: "Transformations to prepare an event for serialization." required: false @@ -87,7 +106,7 @@ base: components: sinks: appsignal: configuration: { } } endpoint: { - description: "The AppSignal API endpoint to report to. This is configured by default and doesn't need to be changed." + description: "The URI for the AppSignal API to send data to." required: false type: string: { default: "https://appsignal-endpoint.net" From ffe28312eb8b982e919be747b3a7e516581d1eda Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Mon, 3 Apr 2023 11:54:04 +0200 Subject: [PATCH 17/21] Fix review feedback - Add documentation for the EventEncoder - Remove unused `as_error_type` function. - Make things not public if they should not be. --- src/sinks/appsignal/mod.rs | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index aeedb24640175..e9056625b7213 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -36,7 +36,7 @@ use crate::{ }; #[derive(Debug, Snafu)] -pub enum FinishError { +enum FinishError { #[snafu(display( "Failure occurred during writing to or finalizing the compressor: {}", source @@ -44,17 +44,6 @@ pub enum FinishError { CompressionFailed { source: std::io::Error }, } -impl FinishError { - /// Gets the telemetry-friendly string version of this error. - /// - /// The value will be a short string with only lowercase letters and underscores. - pub const fn as_error_type(&self) -> &'static str { - match self { - Self::CompressionFailed { .. } => "compression_failed", - } - } -} - /// Supported compression types for the AppSignal sink. #[configurable_component] #[derive(Copy, Clone, Debug, Derivative, Eq, PartialEq)] @@ -174,6 +163,8 @@ impl SinkConfig for AppsignalSinkConfig { } } +// Encode logs and metrics for requests to the AppSignal API. +// It will use a JSON format wrapping events in either "log" or "metric", based on the type of event. pub struct AppsignalEventEncoder { transformer: Transformer, } @@ -228,11 +219,7 @@ impl HttpSink for AppsignalSinkConfig { } } -pub(crate) async fn healthcheck( - uri: Uri, - push_api_key: String, - client: HttpClient, -) -> crate::Result<()> { +async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> { let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key)); let response = client.send(request.body(Body::empty()).unwrap()).await?; From bcae1f7df20460ddf768ee30064659d1abb7099b Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Tue, 4 Apr 2023 10:29:01 +0200 Subject: [PATCH 18/21] Change encoder comments to be doc comments --- src/sinks/appsignal/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index e9056625b7213..bd999a8236d32 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -163,8 +163,8 @@ impl SinkConfig for AppsignalSinkConfig { } } -// Encode logs and metrics for requests to the AppSignal API. -// It will use a JSON format wrapping events in either "log" or "metric", based on the type of event. +/// Encode logs and metrics for requests to the AppSignal API. +/// It will use a JSON format wrapping events in either "log" or "metric", based on the type of event. pub struct AppsignalEventEncoder { transformer: Transformer, } From c042735405e249ed60f8ce6a64098c4b7857401c Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Tue, 4 Apr 2023 10:30:45 +0200 Subject: [PATCH 19/21] Revert custom compression struct Use the built-in Compression struct instead. As mentioned on the PR, add the no-compression option again, and use it by default. https://github.com/vectordotdev/vector/pull/16650#discussion_r1156145954 --- src/sinks/appsignal/mod.rs | 33 ++++--------------- .../components/sinks/base/appsignal.cue | 11 +++++-- 2 files changed, 15 insertions(+), 29 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index bd999a8236d32..4116dbe2322ec 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -44,24 +44,6 @@ enum FinishError { CompressionFailed { source: std::io::Error }, } -/// Supported compression types for the AppSignal sink. -#[configurable_component] -#[derive(Copy, Clone, Debug, Derivative, Eq, PartialEq)] -#[serde(rename_all = "lowercase")] -#[derivative(Default)] -enum AppsignalCompression { - /// [Gzip][gzip] compression. - /// - /// [gzip]: https://www.gzip.org/ - #[derivative(Default)] - Gzip, - - /// [Zlib][zlib] compression. - /// - /// [zlib]: https://zlib.net/ - Zlib, -} - /// Configuration for the `appsignal` sink. #[configurable_component(sink("appsignal"))] #[derive(Clone, Debug, Default)] @@ -79,7 +61,7 @@ pub struct AppsignalSinkConfig { #[configurable(derived)] #[serde(default)] - compression: AppsignalCompression, + compression: Compression, #[configurable(derived)] #[serde(default)] @@ -201,17 +183,16 @@ impl HttpSink for AppsignalSinkConfig { ); let mut body = crate::serde::json::to_bytes(&events)?.freeze(); - - let compression = match self.compression { - AppsignalCompression::Gzip => { + let compression = self.compression; + match compression { + Compression::Gzip(_level) => { request = request.header("Content-Encoding", "gzip"); - Compression::gzip_default() } - AppsignalCompression::Zlib => { + Compression::Zlib(_level) => { request = request.header("Content-Encoding", "deflate"); - Compression::zlib_default() } - }; + Compression::None => {} + } let mut compressor = Compressor::from(compression); write_all(&mut compressor, 0, &body)?; body = compressor.finish().context(CompressionFailedSnafu)?.into(); diff --git a/website/cue/reference/components/sinks/base/appsignal.cue b/website/cue/reference/components/sinks/base/appsignal.cue index 5f0c29ea5cf49..197f556b2b40b 100644 --- a/website/cue/reference/components/sinks/base/appsignal.cue +++ b/website/cue/reference/components/sinks/base/appsignal.cue @@ -63,16 +63,21 @@ base: components: sinks: appsignal: configuration: { } } compression: { - description: "Supported compression types for the AppSignal sink." - required: false + description: """ + Compression configuration. + + All compression algorithms use the default compression level unless otherwise specified. + """ + required: false type: string: { - default: "gzip" + default: "none" enum: { gzip: """ [Gzip][gzip] compression. [gzip]: https://www.gzip.org/ """ + none: "No compression." zlib: """ [Zlib][zlib] compression. From 1c4e18956299106e0bd77570df19e538dd47c5cd Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Wed, 5 Apr 2023 08:23:40 +0200 Subject: [PATCH 20/21] Use gzip compression as default for AppSignal sink As discussed in the PR, use gzip as the default compression for the AppSignal sink for now. https://github.com/vectordotdev/vector/pull/16650#discussion_r1157710362 --- src/sinks/appsignal/mod.rs | 2 +- website/cue/reference/components/sinks/base/appsignal.cue | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 4116dbe2322ec..c0b4ffaf94611 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -60,7 +60,7 @@ pub struct AppsignalSinkConfig { push_api_key: SensitiveString, #[configurable(derived)] - #[serde(default)] + #[serde(default = "Compression::gzip_default")] compression: Compression, #[configurable(derived)] diff --git a/website/cue/reference/components/sinks/base/appsignal.cue b/website/cue/reference/components/sinks/base/appsignal.cue index 197f556b2b40b..61bf184f1cae8 100644 --- a/website/cue/reference/components/sinks/base/appsignal.cue +++ b/website/cue/reference/components/sinks/base/appsignal.cue @@ -70,7 +70,7 @@ base: components: sinks: appsignal: configuration: { """ required: false type: string: { - default: "none" + default: "gzip" enum: { gzip: """ [Gzip][gzip] compression. From 4b7b8600c1b7798e58e6f2a2262aaa5a61d15028 Mon Sep 17 00:00:00 2001 From: Tom de Bruijn Date: Wed, 5 Apr 2023 08:29:52 +0200 Subject: [PATCH 21/21] Simplify compression header on AppSignal sink Use the `content_encoding` function to set the Content-Encoding header value. Removes the match-statement with a simpler if-statement. --- src/sinks/appsignal/mod.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index c0b4ffaf94611..aff1fdb5093e3 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -183,17 +183,10 @@ impl HttpSink for AppsignalSinkConfig { ); let mut body = crate::serde::json::to_bytes(&events)?.freeze(); - let compression = self.compression; - match compression { - Compression::Gzip(_level) => { - request = request.header("Content-Encoding", "gzip"); - } - Compression::Zlib(_level) => { - request = request.header("Content-Encoding", "deflate"); - } - Compression::None => {} + if let Some(ce) = self.compression.content_encoding() { + request = request.header("Content-Encoding", ce); } - let mut compressor = Compressor::from(compression); + let mut compressor = Compressor::from(self.compression); write_all(&mut compressor, 0, &body)?; body = compressor.finish().context(CompressionFailedSnafu)?.into(); request.body(body).map_err(Into::into)