From 68ff83c5fa2a572057a476b68e26e869e6bca623 Mon Sep 17 00:00:00 2001 From: neuronull Date: Tue, 15 Aug 2023 14:34:33 -0600 Subject: [PATCH] Refactor a bit --- src/sinks/http/config.rs | 53 ++++-- src/sinks/http/encoder.rs | 48 +++++- src/sinks/http/request_builder.rs | 8 +- src/sinks/http/service.rs | 213 +++++++++++++++++-------- src/sinks/http/sink.rs | 35 ++-- src/sinks/http/tests.rs | 4 +- src/sinks/util/http_service/mod.rs | 15 -- src/sinks/util/http_service/request.rs | 45 ------ src/sinks/util/http_service/retry.rs | 74 --------- src/sinks/util/http_service/service.rs | 123 -------------- src/sinks/util/mod.rs | 1 - 11 files changed, 260 insertions(+), 359 deletions(-) delete mode 100644 src/sinks/util/http_service/mod.rs delete mode 100644 src/sinks/util/http_service/request.rs delete mode 100644 src/sinks/util/http_service/retry.rs delete mode 100644 src/sinks/util/http_service/service.rs diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 99788605e72db..e7de875bf73d9 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -10,22 +10,27 @@ use indexmap::IndexMap; use crate::{ codecs::{EncodingConfigWithFraming, SinkType}, - http::{get_http_scheme_from_uri, Auth, HttpClient, MaybeAuth}, + http::{Auth, HttpClient, MaybeAuth}, sinks::{ prelude::*, util::{ - http::RequestConfig, - http_service::{HttpRetryLogic, HttpService}, + http::{HttpStatusRetryLogic, RequestConfig}, RealtimeSizeBasedDefaultBatchSettings, UriSerde, }, }, }; use super::{ - encoder::HttpEncoder, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder, + encoder::HttpEncoder, + request_builder::HttpRequestBuilder, + service::{HttpResponse, HttpService, HttpSinkRequestBuilder}, sink::HttpSink, }; +const CONTENT_TYPE_TEXT: &str = "text/plain"; +const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson"; +const CONTENT_TYPE_JSON: &str = "application/json"; + /// Configuration for the `http` sink. #[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))] #[derive(Clone, Debug)] @@ -238,10 +243,6 @@ impl SinkConfig for HttpSinkConfig { let (payload_prefix, payload_suffix) = validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?; - let endpoint = self.uri.with_default_parts(); - - let protocol = get_http_scheme_from_uri(&endpoint.uri); - let client = self.build_http_client(&cx)?; let healthcheck = match cx.healthcheck.uri { @@ -251,27 +252,49 @@ impl SinkConfig for HttpSinkConfig { None => future::ok(()).boxed(), }; + let content_type = { + use Framer::*; + use Serializer::*; + match (encoder.serializer(), encoder.framer()) { + (RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()), + (Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()), + (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => { + Some(CONTENT_TYPE_JSON.to_owned()) + } + _ => None, + } + }; + let request_builder = HttpRequestBuilder { - encoder: HttpEncoder::new(encoder.clone(), transformer), + encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix), + compression: self.compression, }; + let content_encoding = self.compression.is_compressed().then(|| { + self.compression + .content_encoding() + .expect("Encoding should be specified for compression.") + .to_string() + }); + let http_service_request_builder = HttpSinkRequestBuilder { uri: self.uri.with_default_parts(), method: self.method, auth: self.auth.choose_one(&self.uri.auth)?, headers, - payload_prefix, - payload_suffix, - compression: self.compression, - encoder, + content_type, + content_encoding, }; - let service = HttpService::new(http_service_request_builder, client, protocol.to_string()); + let service = HttpService::new(client, http_service_request_builder); let request_limits = self.request.tower.unwrap_with(&Default::default()); + let retry_logic = + HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()); + let service = ServiceBuilder::new() - .settings(request_limits, HttpRetryLogic) + .settings(request_limits, retry_logic) .service(service); let sink = HttpSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/http/encoder.rs b/src/sinks/http/encoder.rs index 5c483528498f2..15c4c10bd6af8 100644 --- a/src/sinks/http/encoder.rs +++ b/src/sinks/http/encoder.rs @@ -4,8 +4,15 @@ use crate::{ event::Event, sinks::util::encoding::{write_all, Encoder as SinkEncoder}, }; -use bytes::BytesMut; -use codecs::encoding::Framer; +use bytes::{BufMut, BytesMut}; +use codecs::{ + encoding::{ + Framer, + Framer::{CharacterDelimited, NewlineDelimited}, + Serializer::Json, + }, + CharacterDelimitedEncoder, +}; use std::io; use tokio_util::codec::Encoder as _; @@ -15,14 +22,23 @@ use crate::sinks::prelude::*; pub(super) struct HttpEncoder { pub(super) encoder: Encoder, pub(super) transformer: Transformer, + pub(super) payload_prefix: String, + pub(super) payload_suffix: String, } impl HttpEncoder { /// Creates a new `HttpEncoder`. - pub(super) const fn new(encoder: Encoder, transformer: Transformer) -> Self { + pub(super) const fn new( + encoder: Encoder, + transformer: Transformer, + payload_prefix: String, + payload_suffix: String, + ) -> Self { Self { encoder, transformer, + payload_prefix, + payload_suffix, } } } @@ -47,6 +63,32 @@ impl SinkEncoder> for HttpEncoder { .map_err(|_| io::Error::new(io::ErrorKind::Other, "unable to encode event"))?; } + match (self.encoder.serializer(), self.encoder.framer()) { + (Json(_), NewlineDelimited(_)) => { + if !body.is_empty() { + // Remove trailing newline for backwards-compatibility + // with Vector `0.20.x`. + body.truncate(body.len() - 1); + } + } + (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => { + // TODO(https://github.com/vectordotdev/vector/issues/11253): + // Prepend before building a request body to eliminate the + // additional copy here. + let message = body.split(); + body.put(self.payload_prefix.as_bytes()); + body.put_u8(b'['); + if !message.is_empty() { + body.unsplit(message); + // remove trailing comma from last record + body.truncate(body.len() - 1); + } + body.put_u8(b']'); + body.put(self.payload_suffix.as_bytes()); + } + _ => {} + } + let body = body.freeze(); write_all(writer, 1, body.as_ref()).map(|()| (body.len(), byte_size)) diff --git a/src/sinks/http/request_builder.rs b/src/sinks/http/request_builder.rs index 9ffae11fc4e13..55a3a723d5ac7 100644 --- a/src/sinks/http/request_builder.rs +++ b/src/sinks/http/request_builder.rs @@ -3,12 +3,13 @@ use bytes::Bytes; use std::io; -use crate::sinks::{prelude::*, util::http_service::HttpRequest}; +use crate::sinks::prelude::*; -use super::encoder::HttpEncoder; +use super::{encoder::HttpEncoder, service::HttpRequest}; pub(super) struct HttpRequestBuilder { pub(super) encoder: HttpEncoder, + pub(super) compression: Compression, } impl RequestBuilder> for HttpRequestBuilder { @@ -20,8 +21,9 @@ impl RequestBuilder> for HttpRequestBuilder { type Error = io::Error; fn compression(&self) -> Compression { + self.compression // Compression is handled in the Service - Compression::None + //Compression::None } fn encoder(&self) -> &Self::Encoder { diff --git a/src/sinks/http/service.rs b/src/sinks/http/service.rs index 87c67f0415895..97cb62f595543 100644 --- a/src/sinks/http/service.rs +++ b/src/sinks/http/service.rs @@ -3,108 +3,134 @@ //! As this sink leverages a common HTTP implementation of the `Service` itself, //! this module only contains the `http` sink specific logic. -use std::io::Write; - -use bytes::{BufMut, Bytes, BytesMut}; -use codecs::{ - encoding::{Framer, Serializer}, - CharacterDelimitedEncoder, +use std::{ + sync::Arc, + task::{Context, Poll}, }; -use http::{HeaderName, HeaderValue, Method, Request, Uri}; + +use bytes::Bytes; +use http::{HeaderName, HeaderValue, Method, Request, Response, Uri}; +use hyper::Body; use indexmap::IndexMap; use crate::{ - http::Auth, + http::{Auth, HttpClient}, sinks::{ prelude::*, - util::{http_service::HttpServiceRequestBuilder, Compressor, UriSerde}, + util::{http::HttpBatchService, UriSerde}, }, }; use super::config::HttpMethod; +/// Request type for use in `RequestBuilder` implementations of HTTP stream sinks. +#[derive(Clone)] +pub struct HttpRequest { + pub payload: Bytes, + pub finalizers: EventFinalizers, + pub request_metadata: RequestMetadata, +} + +impl HttpRequest { + /// Creates a new `HttpRequest`. + pub fn new( + payload: Bytes, + finalizers: EventFinalizers, + request_metadata: RequestMetadata, + ) -> Self { + Self { + payload, + finalizers, + request_metadata, + } + } +} + +impl Finalizable for HttpRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +impl MetaDescriptive for HttpRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.request_metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.request_metadata + } +} + +impl ByteSizeOf for HttpRequest { + fn allocated_bytes(&self) -> usize { + self.payload.allocated_bytes() + self.finalizers.allocated_bytes() + } +} + +/// Response type for use in the `Service` implementation of HTTP stream sinks. +pub struct HttpResponse { + pub(super) http_response: Response, + pub(super) events_byte_size: GroupedCountByteSize, + pub(super) raw_byte_size: usize, +} + +impl DriverResponse for HttpResponse { + fn event_status(&self) -> EventStatus { + if self.http_response.status().is_success() { + EventStatus::Delivered + } else { + EventStatus::Rejected + } + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.raw_byte_size) + } +} + #[derive(Debug, Clone)] pub(super) struct HttpSinkRequestBuilder { pub(super) uri: UriSerde, pub(super) method: HttpMethod, pub(super) auth: Option, pub(super) headers: IndexMap, - pub(super) payload_prefix: String, - pub(super) payload_suffix: String, - pub(super) compression: Compression, - pub(super) encoder: Encoder, + pub(super) content_type: Option, + pub(super) content_encoding: Option, } -impl HttpServiceRequestBuilder for HttpSinkRequestBuilder { - fn build(&self, mut body: BytesMut) -> Request { +impl HttpSinkRequestBuilder { + fn build(&self, body: Bytes) -> Request { let method: Method = self.method.into(); let uri: Uri = self.uri.uri.clone(); - let content_type = { - use Framer::*; - use Serializer::*; - match (self.encoder.serializer(), self.encoder.framer()) { - (RawMessage(_) | Text(_), _) => Some("text/plain"), - (Json(_), NewlineDelimited(_)) => { - if !body.is_empty() { - // Remove trailing newline for backwards-compatibility - // with Vector `0.20.x`. - body.truncate(body.len() - 1); - } - Some("application/x-ndjson") - } - (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => { - // TODO(https://github.com/vectordotdev/vector/issues/11253): - // Prepend before building a request body to eliminate the - // additional copy here. - let message = body.split(); - body.put(self.payload_prefix.as_bytes()); - body.put_u8(b'['); - if !message.is_empty() { - body.unsplit(message); - // remove trailing comma from last record - body.truncate(body.len() - 1); - } - body.put_u8(b']'); - body.put(self.payload_suffix.as_bytes()); - Some("application/json") - } - _ => None, - } - }; - let mut builder = Request::builder().method(method).uri(uri); - if let Some(content_type) = content_type { + if let Some(content_type) = &self.content_type { builder = builder.header("Content-Type", content_type); } - let compression = self.compression; - - if compression.is_compressed() { - builder = builder.header( - "Content-Encoding", - compression - .content_encoding() - .expect("Encoding should be specified."), - ); - - let mut compressor = Compressor::from(compression); - compressor - .write_all(&body) - .expect("Writing to Vec can't fail."); - body = compressor.finish().expect("Writing to Vec can't fail."); + if let Some(content_encoding) = &self.content_encoding { + builder = builder.header("Content-Encoding", content_encoding); } let headers = builder .headers_mut() // The request building should not have errors at this point, and if it did it would fail in the call to `body()` also. .expect("Failed to access headers in http::Request builder- builder has errors."); + for (header, value) in self.headers.iter() { headers.insert(header, value.clone()); } - let mut request = builder.body(body.freeze()).unwrap(); + // The request building should not have errors at this point + let mut request = builder + .body(body) + .expect("Failed to assign body to request- builder has errors"); if let Some(auth) = &self.auth { auth.apply(&mut request); @@ -113,3 +139,60 @@ impl HttpServiceRequestBuilder for HttpSinkRequestBuilder { request } } + +#[derive(Clone)] +pub(super) struct HttpService { + // pub(crate) batch_service: + // HttpBatchService, crate::Error>>, HttpRequest>, + batch_service: + HttpBatchService, crate::Error>>, HttpRequest>, +} + +impl HttpService { + pub fn new( + http_client: HttpClient, + http_request_builder: HttpSinkRequestBuilder, + ) -> Self { + let http_request_builder = Arc::new(http_request_builder); + + let batch_service = HttpBatchService::new(http_client, move |req| { + let req: HttpRequest = req; + + let request_builder = Arc::clone(&http_request_builder); + + let fut: BoxFuture<'static, Result, crate::Error>> = + Box::pin(async move { Ok(request_builder.build(req.payload)) }); + + fut + }); + Self { batch_service } + } +} + +impl Service for HttpService { + type Response = HttpResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, mut request: HttpRequest) -> Self::Future { + let mut http_service = self.batch_service.clone(); + + let raw_byte_size = request.payload.len(); + let metadata = std::mem::take(request.metadata_mut()); + let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); + + Box::pin(async move { + let http_response = http_service.call(request).await?; + + Ok(HttpResponse { + http_response, + events_byte_size, + raw_byte_size, + }) + }) + } +} diff --git a/src/sinks/http/sink.rs b/src/sinks/http/sink.rs index 0dc213846cf6a..0a4b2da5c6674 100644 --- a/src/sinks/http/sink.rs +++ b/src/sinks/http/sink.rs @@ -1,24 +1,25 @@ //! Implementation of the `http` sink. -use crate::sinks::{ - prelude::*, - util::http_service::{HttpRetryLogic, HttpService}, -}; +use crate::sinks::prelude::*; -use super::{ - batch::HttpBatchSizer, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder, -}; +use super::{batch::HttpBatchSizer, request_builder::HttpRequestBuilder, service::HttpRequest}; -pub(super) struct HttpSink { - service: Svc, HttpRetryLogic>, +pub(super) struct HttpSink { + service: S, batch_settings: BatcherSettings, request_builder: HttpRequestBuilder, } -impl HttpSink { +impl HttpSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ /// Creates a new `HttpSink`. pub(super) const fn new( - service: Svc, HttpRetryLogic>, + service: S, batch_settings: BatcherSettings, request_builder: HttpRequestBuilder, ) -> Self { @@ -30,7 +31,9 @@ impl HttpSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let service = ServiceBuilder::new().service(self.service); input + // .batched(self.batch_settings.into_byte_size_config()) // Batch the input stream with size calculation dependent on the configured codec .batched(self.batch_settings.into_item_size_config(HttpBatchSizer { encoder: self.request_builder.encoder.encoder.clone(), @@ -49,14 +52,20 @@ impl HttpSink { }) // Generate the driver that will send requests and handle retries, // event finalization, and logging/internal metric reporting. - .into_driver(self.service) + .into_driver(service) .run() .await } } #[async_trait::async_trait] -impl StreamSink for HttpSink { +impl StreamSink for HttpSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ async fn run( self: Box, input: futures_util::stream::BoxStream<'_, Event>, diff --git a/src/sinks/http/tests.rs b/src/sinks/http/tests.rs index 039bc9d016af3..4cda846156d64 100644 --- a/src/sinks/http/tests.rs +++ b/src/sinks/http/tests.rs @@ -73,7 +73,7 @@ fn http_encode_event_text() { let encoder = cfg.build_encoder().unwrap(); let transformer = cfg.encoding.transformer(); - let encoder = HttpEncoder::new(encoder, transformer); + let encoder = HttpEncoder::new(encoder, transformer, "".to_owned(), "".to_owned()); let mut encoded = vec![]; let (encoded_size, _byte_size) = encoder.encode_input(vec![event], &mut encoded).unwrap(); @@ -96,7 +96,7 @@ fn http_encode_event_ndjson() { let encoder = cfg.build_encoder().unwrap(); let transformer = cfg.encoding.transformer(); - let encoder = HttpEncoder::new(encoder, transformer); + let encoder = HttpEncoder::new(encoder, transformer, "".to_owned(), "".to_owned()); let mut encoded = vec![]; encoder.encode_input(vec![event], &mut encoded).unwrap(); diff --git a/src/sinks/util/http_service/mod.rs b/src/sinks/util/http_service/mod.rs deleted file mode 100644 index 2f71930b5f72b..0000000000000 --- a/src/sinks/util/http_service/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! This module contains a generic implementation of `Service` and related infrastructure -//! for HTTP based stream sinks. -//! -//! In particular, HTTP based stream sinks can use the `HttpService` and only need to define -//! a struct that implements the `HttpServiceRequestBuilder` trait. -//! -//! The `HttpRequest` is used in the `RequestBuilder` implementation. - -mod request; -mod retry; -mod service; - -pub use request::HttpRequest; -pub use retry::HttpRetryLogic; -pub use service::{HttpService, HttpServiceRequestBuilder}; diff --git a/src/sinks/util/http_service/request.rs b/src/sinks/util/http_service/request.rs deleted file mode 100644 index a0adebcd48f6f..0000000000000 --- a/src/sinks/util/http_service/request.rs +++ /dev/null @@ -1,45 +0,0 @@ -use bytes::Bytes; - -use vector_common::{ - finalization::{EventFinalizers, Finalizable}, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; - -/// Request type for use in `RequestBuilder` implementations of HTTP stream sinks. -#[derive(Clone)] -pub struct HttpRequest { - pub payload: Bytes, - pub finalizers: EventFinalizers, - pub request_metadata: RequestMetadata, -} - -impl HttpRequest { - /// Creates a new `HttpRequest`. - pub fn new( - payload: Bytes, - finalizers: EventFinalizers, - request_metadata: RequestMetadata, - ) -> Self { - Self { - payload, - finalizers, - request_metadata, - } - } -} - -impl Finalizable for HttpRequest { - fn take_finalizers(&mut self) -> EventFinalizers { - self.finalizers.take_finalizers() - } -} - -impl MetaDescriptive for HttpRequest { - fn get_metadata(&self) -> &RequestMetadata { - &self.request_metadata - } - - fn metadata_mut(&mut self) -> &mut RequestMetadata { - &mut self.request_metadata - } -} diff --git a/src/sinks/util/http_service/retry.rs b/src/sinks/util/http_service/retry.rs deleted file mode 100644 index 27d409ecf2edd..0000000000000 --- a/src/sinks/util/http_service/retry.rs +++ /dev/null @@ -1,74 +0,0 @@ -use http::StatusCode; - -use crate::{ - http::HttpError, - sinks::util::retries::{RetryAction, RetryLogic}, -}; - -use super::service::HttpResponse; - -/// `RetryLogic` implementation for use in HTTP based stream sinks. -#[derive(Debug, Default, Clone)] -pub struct HttpRetryLogic; - -impl RetryLogic for HttpRetryLogic { - type Error = HttpError; - type Response = HttpResponse; - - fn is_retriable_error(&self, _error: &Self::Error) -> bool { - true - } - - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { - let response = &response.http_response; - let status = response.status(); - - match status { - StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()), - StatusCode::NOT_IMPLEMENTED => { - RetryAction::DontRetry("endpoint not implemented".into()) - } - _ if status.is_server_error() => RetryAction::Retry( - format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(), - ), - _ if status.is_success() => RetryAction::Successful, - _ => RetryAction::DontRetry(format!("response status: {}", status).into()), - } - } -} - -#[cfg(test)] -mod tests { - use bytes::Bytes; - use hyper::Response; - use vector_common::request_metadata::GroupedCountByteSize; - - use super::{HttpResponse, HttpRetryLogic}; - use crate::sinks::util::retries::RetryLogic; - - #[test] - fn validate_retry_logic() { - let logic = HttpRetryLogic; - - fn generate_response(code: u16) -> HttpResponse { - HttpResponse { - http_response: Response::builder().status(code).body(Bytes::new()).unwrap(), - events_byte_size: GroupedCountByteSize::new_untagged(), - raw_byte_size: 0, - } - } - - assert!(logic - .should_retry_response(&generate_response(429)) - .is_retryable()); - assert!(logic - .should_retry_response(&generate_response(500)) - .is_retryable()); - assert!(logic - .should_retry_response(&generate_response(400)) - .is_not_retryable()); - assert!(logic - .should_retry_response(&generate_response(501)) - .is_not_retryable()); - } -} diff --git a/src/sinks/util/http_service/service.rs b/src/sinks/util/http_service/service.rs deleted file mode 100644 index dad050319ef9d..0000000000000 --- a/src/sinks/util/http_service/service.rs +++ /dev/null @@ -1,123 +0,0 @@ -use std::task::{Context, Poll}; - -use bytes::{Buf, Bytes, BytesMut}; -use http::Request; -use hyper::{body, Body, Response}; -use tower_http::decompression::DecompressionLayer; - -use crate::{http::HttpClient, internal_events::EndpointBytesSent, sinks::prelude::*}; - -use super::request::HttpRequest; - -/// Response type for use in the `Service` implementation of HTTP stream sinks. -pub struct HttpResponse { - pub(super) http_response: Response, - pub(super) events_byte_size: GroupedCountByteSize, - pub(super) raw_byte_size: usize, -} - -impl DriverResponse for HttpResponse { - fn event_status(&self) -> EventStatus { - if self.http_response.status().is_success() { - EventStatus::Delivered - } else { - EventStatus::Rejected - } - } - - fn events_sent(&self) -> &GroupedCountByteSize { - &self.events_byte_size - } - - fn bytes_sent(&self) -> Option { - Some(self.raw_byte_size) - } -} - -/// Build HTTP requests for the `HttpService`. -/// -/// This trait exists to allow HTTP based stream sinks to utilize the common `HttpService` -/// while being able to define sink-specific HTTP requests. -pub trait HttpServiceRequestBuilder { - fn build(&self, body: BytesMut) -> Request; -} - -/// `Service` implementation of HTTP stream sinks. -/// -/// `http_request_builder` `` must implement the `HttpServiceRequestBuilder` trait, which is -/// used in the `Service::call()` function to handle sink-specific HTTP request building. -#[derive(Debug, Clone)] -pub struct HttpService { - http_request_builder: R, - client: HttpClient, - protocol: String, -} - -impl HttpService { - /// Creates a new `HttpService`. - pub const fn new(http_request_builder: R, client: HttpClient, protocol: String) -> Self { - Self { - http_request_builder, - client, - protocol, - } - } -} - -impl Service for HttpService -where - R: HttpServiceRequestBuilder, -{ - type Response = HttpResponse; - type Error = crate::Error; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, request: HttpRequest) -> Self::Future { - // for internal metrics reporting - let raw_byte_size = request.payload.len(); - - let events_byte_size = request - .request_metadata - .into_events_estimated_json_encoded_byte_size(); - - // build the request - let mut bytes_mut = BytesMut::new(); - bytes_mut.extend(request.payload); - let http_request = self.http_request_builder.build(bytes_mut); - let req = http_request.map(Body::from); - - // for internal metrics reporting - let endpoint = req.uri().to_string(); - let protocol = self.protocol.clone(); - - let mut decompression_service = ServiceBuilder::new() - .layer(DecompressionLayer::new()) - .service(self.client.clone()); - - Box::pin(async move { - let response = decompression_service.call(req).await?; - - if response.status().is_success() { - emit!(EndpointBytesSent { - byte_size: raw_byte_size, - protocol: protocol.as_str(), - endpoint: endpoint.as_str(), - }); - } - - let (parts, body) = response.into_parts(); - let mut body = body::aggregate(body).await?; - let http_response = Response::from_parts(parts, body.copy_to_bytes(body.remaining())); - - Ok(HttpResponse { - http_response, - events_byte_size, - raw_byte_size, - }) - }) - } -} diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 0b783cac6d6da..4f222063312fd 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -5,7 +5,6 @@ pub mod builder; pub mod compressor; pub mod encoding; pub mod http; -pub mod http_service; pub mod metadata; pub mod normalizer; pub mod partitioner;