diff --git a/src/sinks/honeycomb.rs b/src/sinks/honeycomb.rs deleted file mode 100644 index 6cba7079a84e0..0000000000000 --- a/src/sinks/honeycomb.rs +++ /dev/null @@ -1,254 +0,0 @@ -use bytes::Bytes; -use futures::{FutureExt, SinkExt}; -use http::{Request, StatusCode, Uri}; -use serde_json::json; -use vector_common::sensitive_string::SensitiveString; -use vector_config::configurable_component; -use vrl::value::Kind; - -use crate::{ - codecs::Transformer, - config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, - event::{Event, Value}, - http::HttpClient, - schema, - sinks::util::{ - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig, - }, -}; - -/// Configuration for the `honeycomb` sink. -#[configurable_component(sink("honeycomb", "Deliver log events to Honeycomb."))] -#[derive(Clone, Debug)] -pub struct HoneycombConfig { - // This endpoint is not user-configurable and only exists for testing purposes - #[serde(skip, default = "default_endpoint")] - endpoint: String, - - /// The API key that is used to authenticate against Honeycomb. - #[configurable(metadata(docs::examples = "${HONEYCOMB_API_KEY}"))] - #[configurable(metadata(docs::examples = "some-api-key"))] - api_key: SensitiveString, - - /// The dataset to which logs are sent. - #[configurable(metadata(docs::examples = "my-honeycomb-dataset"))] - // TODO: we probably want to make this a template - // but this limits us in how we can do our healthcheck. - dataset: String, - - #[configurable(derived)] - #[serde(default)] - batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - request: TowerRequestConfig, - - #[configurable(derived)] - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - encoding: Transformer, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -fn default_endpoint() -> String { - "https://api.honeycomb.io/1/batch".to_string() -} - -#[derive(Clone, Copy, Debug, Default)] -struct HoneycombDefaultBatchSettings; - -impl SinkBatchSettings for HoneycombDefaultBatchSettings { - const MAX_EVENTS: Option = None; - const MAX_BYTES: Option = Some(100_000); - const TIMEOUT_SECS: f64 = 1.0; -} - -impl GenerateConfig for HoneycombConfig { - fn generate_config() -> toml::Value { - toml::from_str( - r#"api_key = "${HONEYCOMB_API_KEY}" - dataset = "my-honeycomb-dataset""#, - ) - .unwrap() - } -} - -#[async_trait::async_trait] -#[typetag::serde(name = "honeycomb")] -impl SinkConfig for HoneycombConfig { - async fn build( - &self, - cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); - let batch_settings = self.batch.into_batch_settings()?; - - let buffer = JsonArrayBuffer::new(batch_settings.size); - - let client = HttpClient::new(None, cx.proxy())?; - - let sink = BatchedHttpSink::new( - self.clone(), - buffer, - request_settings, - batch_settings.timeout, - client.clone(), - ) - .sink_map_err(|error| error!(message = "Fatal honeycomb sink error.", %error)); - - let healthcheck = healthcheck(self.clone(), client).boxed(); - - #[allow(deprecated)] - Ok((super::VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - let requirement = - schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); - - Input::log().with_schema_requirement(requirement) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -pub struct HoneycombEventEncoder { - transformer: Transformer, -} - -impl HttpEventEncoder for HoneycombEventEncoder { - fn encode_event(&mut self, mut event: Event) -> Option { - self.transformer.transform(&mut event); - let mut log = event.into_log(); - - let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { - ts - } else { - chrono::Utc::now() - }; - - let data = json!({ - "time": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true), - "data": log.convert_to_fields(), - }); - - Some(data) - } -} - -#[async_trait::async_trait] -impl HttpSink for HoneycombConfig { - type Input = serde_json::Value; - type Output = Vec; - type Encoder = HoneycombEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - HoneycombEventEncoder { - transformer: self.encoding.clone(), - } - } - - async fn build_request(&self, events: Self::Output) -> crate::Result> { - let uri = self.build_uri(); - let request = Request::post(uri).header("X-Honeycomb-Team", self.api_key.inner()); - let body = crate::serde::json::to_bytes(&events).unwrap().freeze(); - - request.body(body).map_err(Into::into) - } -} - -impl HoneycombConfig { - fn build_uri(&self) -> Uri { - let uri = format!("{}/{}", self.endpoint, self.dataset); - - uri.parse::().expect("This should be a valid uri") - } -} - -async fn healthcheck(config: HoneycombConfig, client: HttpClient) -> crate::Result<()> { - let req = config - .build_request(Vec::new()) - .await? - .map(hyper::Body::from); - - let res = client.send(req).await?; - - let status = res.status(); - let body = hyper::body::to_bytes(res.into_body()).await?; - - if status == StatusCode::BAD_REQUEST { - Ok(()) - } else if status == StatusCode::UNAUTHORIZED { - let json: serde_json::Value = serde_json::from_slice(&body[..])?; - - let message = if let Some(s) = json - .as_object() - .and_then(|o| o.get("error")) - .and_then(|s| s.as_str()) - { - s.to_string() - } else { - "Token is not valid, 401 returned.".to_string() - }; - - Err(message.into()) - } else { - let body = String::from_utf8_lossy(&body[..]); - - Err(format!( - "Server returned unexpected error status: {} body: {}", - status, body - ) - .into()) - } -} -#[cfg(test)] -mod test { - use futures::{future::ready, stream}; - use serde::Deserialize; - use vector_core::event::{Event, LogEvent}; - - use crate::{ - config::{GenerateConfig, SinkConfig, SinkContext}, - test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - use super::HoneycombConfig; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - let config = HoneycombConfig::generate_config().to_string(); - let mut config = HoneycombConfig::deserialize(toml::de::ValueDeserializer::new(&config)) - .expect("config should be valid"); - config.endpoint = mock_endpoint.to_string(); - - let context = SinkContext::default(); - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let event = Event::Log(LogEvent::from("simple message")); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; - } -} diff --git a/src/sinks/honeycomb/config.rs b/src/sinks/honeycomb/config.rs new file mode 100644 index 0000000000000..0a53bf16369cc --- /dev/null +++ b/src/sinks/honeycomb/config.rs @@ -0,0 +1,185 @@ +//! Configuration for the `honeycomb` sink. + +use bytes::Bytes; +use futures::FutureExt; +use http::{Request, StatusCode, Uri}; +use vector_common::sensitive_string::SensitiveString; +use vrl::value::Kind; + +use crate::{ + http::HttpClient, + sinks::{ + prelude::*, + util::{ + http::{http_response_retry_logic, HttpService}, + BatchConfig, BoxedRawValue, + }, + }, +}; + +use super::{ + encoder::HoneycombEncoder, request_builder::HoneycombRequestBuilder, + service::HoneycombSvcRequestBuilder, sink::HoneycombSink, +}; + +pub(super) const HTTP_HEADER_HONEYCOMB: &str = "X-Honeycomb-Team"; + +/// Configuration for the `honeycomb` sink. +#[configurable_component(sink("honeycomb", "Deliver log events to Honeycomb."))] +#[derive(Clone, Debug)] +pub struct HoneycombConfig { + // This endpoint is not user-configurable and only exists for testing purposes + #[serde(skip, default = "default_endpoint")] + pub(super) endpoint: String, + + /// The API key that is used to authenticate against Honeycomb. + #[configurable(metadata(docs::examples = "${HONEYCOMB_API_KEY}"))] + #[configurable(metadata(docs::examples = "some-api-key"))] + api_key: SensitiveString, + + /// The dataset to which logs are sent. + #[configurable(metadata(docs::examples = "my-honeycomb-dataset"))] + // TODO: we probably want to make this a template + // but this limits us in how we can do our healthcheck. + dataset: String, + + #[configurable(derived)] + #[serde(default)] + batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + request: TowerRequestConfig, + + #[configurable(derived)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + encoding: Transformer, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +fn default_endpoint() -> String { + "https://api.honeycomb.io/1/batch".to_string() +} + +#[derive(Clone, Copy, Debug, Default)] +struct HoneycombDefaultBatchSettings; + +impl SinkBatchSettings for HoneycombDefaultBatchSettings { + const MAX_EVENTS: Option = None; + const MAX_BYTES: Option = Some(100_000); + const TIMEOUT_SECS: f64 = 1.0; +} + +impl GenerateConfig for HoneycombConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#"api_key = "${HONEYCOMB_API_KEY}" + dataset = "my-honeycomb-dataset""#, + ) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "honeycomb")] +impl SinkConfig for HoneycombConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let batch_settings = self.batch.validate()?.into_batcher_settings()?; + + let request_builder = HoneycombRequestBuilder { + encoder: HoneycombEncoder { + transformer: self.encoding.clone(), + }, + }; + + let uri = self.build_uri()?; + + let honeycomb_service_request_builder = HoneycombSvcRequestBuilder { + uri: uri.clone(), + api_key: self.api_key.clone(), + }; + + let client = HttpClient::new(None, cx.proxy())?; + + let service = HttpService::new(client.clone(), honeycomb_service_request_builder); + + let request_limits = self.request.unwrap_with(&TowerRequestConfig::default()); + + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = HoneycombSink::new(service, batch_settings, request_builder); + + let healthcheck = healthcheck(uri, self.api_key.clone(), client).boxed(); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); + + Input::log().with_schema_requirement(requirement) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +impl HoneycombConfig { + fn build_uri(&self) -> crate::Result { + let uri = format!("{}/{}", self.endpoint, self.dataset); + uri.parse::().map_err(Into::into) + } +} + +async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> crate::Result<()> { + let request = Request::post(uri).header(HTTP_HEADER_HONEYCOMB, api_key.inner()); + let body = crate::serde::json::to_bytes(&Vec::::new()) + .unwrap() + .freeze(); + let req: Request = request.body(body)?; + let req = req.map(hyper::Body::from); + + let res = client.send(req).await?; + + let status = res.status(); + let body = hyper::body::to_bytes(res.into_body()).await?; + + if status == StatusCode::BAD_REQUEST { + Ok(()) + } else if status == StatusCode::UNAUTHORIZED { + let json: serde_json::Value = serde_json::from_slice(&body[..])?; + + let message = if let Some(s) = json + .as_object() + .and_then(|o| o.get("error")) + .and_then(|s| s.as_str()) + { + s.to_string() + } else { + "Token is not valid, 401 returned.".to_string() + }; + + Err(message.into()) + } else { + let body = String::from_utf8_lossy(&body[..]); + + Err(format!( + "Server returned unexpected error status: {} body: {}", + status, body + ) + .into()) + } +} diff --git a/src/sinks/honeycomb/encoder.rs b/src/sinks/honeycomb/encoder.rs new file mode 100644 index 0000000000000..0b9ccd429063d --- /dev/null +++ b/src/sinks/honeycomb/encoder.rs @@ -0,0 +1,52 @@ +//! Encoding for the `honeycomb` sink. + +use bytes::BytesMut; +use chrono::{SecondsFormat, Utc}; +use serde_json::{json, to_vec}; +use std::io; + +use crate::sinks::{ + prelude::*, + util::encoding::{write_all, Encoder as SinkEncoder}, +}; + +pub(super) struct HoneycombEncoder { + pub(super) transformer: Transformer, +} + +impl SinkEncoder> for HoneycombEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + let mut body = BytesMut::new(); + let n_events = events.len(); + + for mut event in events { + self.transformer.transform(&mut event); + + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + let log = event.as_mut_log(); + + let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { + ts + } else { + Utc::now() + }; + + let data = to_vec(&json!({ + "time": timestamp.to_rfc3339_opts(SecondsFormat::Nanos, true), + "data": log.convert_to_fields(), + }))?; + + body.extend(&data); + } + + let body = body.freeze(); + + write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size)) + } +} diff --git a/src/sinks/honeycomb/mod.rs b/src/sinks/honeycomb/mod.rs new file mode 100644 index 0000000000000..e93745194e92d --- /dev/null +++ b/src/sinks/honeycomb/mod.rs @@ -0,0 +1,13 @@ +//! The Honeycomb [`vector_core::sink::VectorSink`]. +//! +//! This module contains the [`vector_core::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_core::event::Event`]s and forwarding them to the Honeycomb service. + +mod config; +mod encoder; +mod request_builder; +mod service; +mod sink; + +#[cfg(test)] +mod tests; diff --git a/src/sinks/honeycomb/request_builder.rs b/src/sinks/honeycomb/request_builder.rs new file mode 100644 index 0000000000000..a84c9dec2ba4b --- /dev/null +++ b/src/sinks/honeycomb/request_builder.rs @@ -0,0 +1,47 @@ +//! `RequestBuilder` implementation for the `honeycomb` sink. + +use bytes::Bytes; +use std::io; + +use crate::sinks::{prelude::*, util::http::HttpRequest}; + +use super::encoder::HoneycombEncoder; + +pub(super) struct HoneycombRequestBuilder { + pub(super) encoder: HoneycombEncoder, +} + +impl RequestBuilder> for HoneycombRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = HoneycombEncoder; + type Payload = Bytes; + type Request = HttpRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata) + } +} diff --git a/src/sinks/honeycomb/service.rs b/src/sinks/honeycomb/service.rs new file mode 100644 index 0000000000000..57eda27501558 --- /dev/null +++ b/src/sinks/honeycomb/service.rs @@ -0,0 +1,25 @@ +//! Service implementation for the `honeycomb` sink. + +use bytes::Bytes; +use http::{Request, Uri}; +use vector_common::sensitive_string::SensitiveString; + +use crate::sinks::util::http::HttpServiceRequestBuilder; + +use super::config::HTTP_HEADER_HONEYCOMB; + +#[derive(Debug, Clone)] +pub(super) struct HoneycombSvcRequestBuilder { + pub(super) uri: Uri, + pub(super) api_key: SensitiveString, +} + +impl HttpServiceRequestBuilder for HoneycombSvcRequestBuilder { + fn build(&self, body: Bytes) -> Request { + let request = Request::post(&self.uri).header(HTTP_HEADER_HONEYCOMB, self.api_key.inner()); + + request + .body(body) + .expect("Failed to assign body to request- builder has errors") + } +} diff --git a/src/sinks/honeycomb/sink.rs b/src/sinks/honeycomb/sink.rs new file mode 100644 index 0000000000000..9575577b91e11 --- /dev/null +++ b/src/sinks/honeycomb/sink.rs @@ -0,0 +1,77 @@ +//! Implementation of the `honeycomb` sink. + +use crate::sinks::{ + prelude::*, + util::http::{HttpJsonBatchSizer, HttpRequest}, +}; + +use super::request_builder::HoneycombRequestBuilder; + +pub(super) struct HoneycombSink { + service: S, + batch_settings: BatcherSettings, + request_builder: HoneycombRequestBuilder, +} + +impl HoneycombSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `HoneycombSink`. + pub(super) const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: HoneycombRequestBuilder, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + // Batch the input stream with size calculation based on the estimated encoded json size + .batched( + self.batch_settings + .into_item_size_config(HttpJsonBatchSizer), + ) + // Build requests with no concurrency limit. + .request_builder(None, self.request_builder) + // Filter out any errors that occurred in the request building. + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + // Generate the driver that will send requests and handle retries, + // event finalization, and logging/internal metric reporting. + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for HoneycombSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/honeycomb/tests.rs b/src/sinks/honeycomb/tests.rs new file mode 100644 index 0000000000000..29186f47f8012 --- /dev/null +++ b/src/sinks/honeycomb/tests.rs @@ -0,0 +1,35 @@ +//! Unit tests for the `honeycomb` sink. + +use futures::{future::ready, stream}; +use serde::Deserialize; + +use crate::{ + sinks::prelude::*, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, +}; + +use super::config::HoneycombConfig; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = HoneycombConfig::generate_config().to_string(); + let mut config = HoneycombConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); + config.endpoint = mock_endpoint.to_string(); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; +} diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 5c933c1265eae..c5782cae0cabc 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -14,7 +14,7 @@ use crate::{ sinks::{ prelude::*, util::{ - http::{HttpResponse, HttpService, HttpStatusRetryLogic, RequestConfig}, + http::{http_response_retry_logic, HttpService, RequestConfig}, RealtimeSizeBasedDefaultBatchSettings, UriSerde, }, }, @@ -288,11 +288,8 @@ impl SinkConfig for HttpSinkConfig { let request_limits = self.request.tower.unwrap_with(&Default::default()); - let retry_logic = - HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()); - let service = ServiceBuilder::new() - .settings(request_limits, retry_logic) + .settings(request_limits, http_response_retry_logic()) .service(service); let sink = HttpSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 4dbfcab0ab4f8..93a56f5bf6097 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -20,7 +20,9 @@ use snafu::{ResultExt, Snafu}; use tower::{Service, ServiceBuilder}; use tower_http::decompression::DecompressionLayer; use vector_config::configurable_component; -use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; +use vector_core::{ + stream::batcher::limiter::ItemBatchSize, ByteSizeOf, EstimatedJsonEncodedSizeOf, +}; use super::{ retries::{RetryAction, RetryLogic}, @@ -678,6 +680,24 @@ impl DriverResponse for HttpResponse { } } +/// Creates a `RetryLogic` for use with `HttpResponse`. +pub fn http_response_retry_logic() -> HttpStatusRetryLogic< + impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static, + HttpResponse, +> { + HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()) +} + +/// Uses the estimated json encoded size to determine batch sizing. +#[derive(Default)] +pub struct HttpJsonBatchSizer; + +impl ItemBatchSize for HttpJsonBatchSizer { + fn size(&self, item: &Event) -> usize { + item.estimated_json_encoded_size_of().get() + } +} + /// HTTP request builder for HTTP stream sinks using the generic `HttpService` pub trait HttpServiceRequestBuilder { fn build(&self, body: Bytes) -> Request;