Skip to content

Commit

Permalink
Refactor a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
neuronull committed Aug 15, 2023
1 parent 4aa3eca commit 68ff83c
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 359 deletions.
53 changes: 38 additions & 15 deletions src/sinks/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,27 @@ use indexmap::IndexMap;

use crate::{
codecs::{EncodingConfigWithFraming, SinkType},
http::{get_http_scheme_from_uri, Auth, HttpClient, MaybeAuth},
http::{Auth, HttpClient, MaybeAuth},
sinks::{
prelude::*,
util::{
http::RequestConfig,
http_service::{HttpRetryLogic, HttpService},
http::{HttpStatusRetryLogic, RequestConfig},
RealtimeSizeBasedDefaultBatchSettings, UriSerde,
},
},
};

use super::{
encoder::HttpEncoder, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder,
encoder::HttpEncoder,
request_builder::HttpRequestBuilder,
service::{HttpResponse, HttpService, HttpSinkRequestBuilder},
sink::HttpSink,
};

const CONTENT_TYPE_TEXT: &str = "text/plain";
const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson";
const CONTENT_TYPE_JSON: &str = "application/json";

/// Configuration for the `http` sink.
#[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))]
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -238,10 +243,6 @@ impl SinkConfig for HttpSinkConfig {
let (payload_prefix, payload_suffix) =
validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?;

let endpoint = self.uri.with_default_parts();

let protocol = get_http_scheme_from_uri(&endpoint.uri);

let client = self.build_http_client(&cx)?;

let healthcheck = match cx.healthcheck.uri {
Expand All @@ -251,27 +252,49 @@ impl SinkConfig for HttpSinkConfig {
None => future::ok(()).boxed(),
};

let content_type = {
use Framer::*;
use Serializer::*;
match (encoder.serializer(), encoder.framer()) {
(RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()),
(Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()),
(Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
Some(CONTENT_TYPE_JSON.to_owned())
}
_ => None,
}
};

let request_builder = HttpRequestBuilder {
encoder: HttpEncoder::new(encoder.clone(), transformer),
encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix),
compression: self.compression,
};

let content_encoding = self.compression.is_compressed().then(|| {
self.compression
.content_encoding()
.expect("Encoding should be specified for compression.")
.to_string()
});

let http_service_request_builder = HttpSinkRequestBuilder {
uri: self.uri.with_default_parts(),
method: self.method,
auth: self.auth.choose_one(&self.uri.auth)?,
headers,
payload_prefix,
payload_suffix,
compression: self.compression,
encoder,
content_type,
content_encoding,
};

let service = HttpService::new(http_service_request_builder, client, protocol.to_string());
let service = HttpService::new(client, http_service_request_builder);

let request_limits = self.request.tower.unwrap_with(&Default::default());

let retry_logic =
HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status());

let service = ServiceBuilder::new()
.settings(request_limits, HttpRetryLogic)
.settings(request_limits, retry_logic)
.service(service);

let sink = HttpSink::new(service, batch_settings, request_builder);
Expand Down
48 changes: 45 additions & 3 deletions src/sinks/http/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@ use crate::{
event::Event,
sinks::util::encoding::{write_all, Encoder as SinkEncoder},
};
use bytes::BytesMut;
use codecs::encoding::Framer;
use bytes::{BufMut, BytesMut};
use codecs::{
encoding::{
Framer,
Framer::{CharacterDelimited, NewlineDelimited},
Serializer::Json,
},
CharacterDelimitedEncoder,
};
use std::io;
use tokio_util::codec::Encoder as _;

Expand All @@ -15,14 +22,23 @@ use crate::sinks::prelude::*;
pub(super) struct HttpEncoder {
pub(super) encoder: Encoder<Framer>,
pub(super) transformer: Transformer,
pub(super) payload_prefix: String,
pub(super) payload_suffix: String,
}

impl HttpEncoder {
/// Creates a new `HttpEncoder`.
pub(super) const fn new(encoder: Encoder<Framer>, transformer: Transformer) -> Self {
pub(super) const fn new(
encoder: Encoder<Framer>,
transformer: Transformer,
payload_prefix: String,
payload_suffix: String,
) -> Self {
Self {
encoder,
transformer,
payload_prefix,
payload_suffix,
}
}
}
Expand All @@ -47,6 +63,32 @@ impl SinkEncoder<Vec<Event>> for HttpEncoder {
.map_err(|_| io::Error::new(io::ErrorKind::Other, "unable to encode event"))?;
}

match (self.encoder.serializer(), self.encoder.framer()) {
(Json(_), NewlineDelimited(_)) => {
if !body.is_empty() {
// Remove trailing newline for backwards-compatibility
// with Vector `0.20.x`.
body.truncate(body.len() - 1);
}
}
(Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
// TODO(https://github.com/vectordotdev/vector/issues/11253):
// Prepend before building a request body to eliminate the
// additional copy here.
let message = body.split();
body.put(self.payload_prefix.as_bytes());
body.put_u8(b'[');
if !message.is_empty() {
body.unsplit(message);
// remove trailing comma from last record
body.truncate(body.len() - 1);
}
body.put_u8(b']');
body.put(self.payload_suffix.as_bytes());
}
_ => {}
}

let body = body.freeze();

write_all(writer, 1, body.as_ref()).map(|()| (body.len(), byte_size))
Expand Down
8 changes: 5 additions & 3 deletions src/sinks/http/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
use bytes::Bytes;
use std::io;

use crate::sinks::{prelude::*, util::http_service::HttpRequest};
use crate::sinks::prelude::*;

use super::encoder::HttpEncoder;
use super::{encoder::HttpEncoder, service::HttpRequest};

pub(super) struct HttpRequestBuilder {
pub(super) encoder: HttpEncoder,
pub(super) compression: Compression,
}

impl RequestBuilder<Vec<Event>> for HttpRequestBuilder {
Expand All @@ -20,8 +21,9 @@ impl RequestBuilder<Vec<Event>> for HttpRequestBuilder {
type Error = io::Error;

fn compression(&self) -> Compression {
self.compression
// Compression is handled in the Service
Compression::None
//Compression::None
}

fn encoder(&self) -> &Self::Encoder {
Expand Down
Loading

0 comments on commit 68ff83c

Please sign in to comment.