Skip to content

Commit

Permalink
chore(datadog_logs sink): remove semantic encoder for the Datadog Log…
Browse files Browse the repository at this point in the history
…s sink (#16496)

simplify / fix datadog logs sink
  • Loading branch information
fuchsnj authored Feb 17, 2023
1 parent ef4c5c0 commit faa96f8
Showing 1 changed file with 23 additions and 168 deletions.
191 changes: 23 additions & 168 deletions src/sinks/datadog/logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use std::{fmt::Debug, io, num::NonZeroUsize, sync::Arc};
use async_trait::async_trait;
use bytes::Bytes;
use codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig};
use futures::{stream::BoxStream, StreamExt};
use futures::stream::{BoxStream, StreamExt};
use lookup::event_path;
use snafu::Snafu;
use tower::Service;
use vector_common::request_metadata::RequestMetadata;
use vector_core::{
config::{log_schema, LogSchema},
event::{Event, EventFinalizers, Finalizable, Value},
partition::Partitioner,
sink::StreamSink,
Expand Down Expand Up @@ -76,7 +75,6 @@ impl<S> LogSinkBuilder<S> {
LogSink {
default_api_key: self.default_api_key,
encoding: self.encoding,
schema_enabled: false,
service: self.service,
batch_settings: self.batch_settings,
compression: self.compression.unwrap_or_default(),
Expand All @@ -97,8 +95,6 @@ pub struct LogSink<S> {
service: S,
/// The encoding of payloads
encoding: JsonEncoding,
/// Whether to enable schema support.
schema_enabled: bool,
/// The compression technique to use when building the request body
compression: Compression,
/// Batch settings: timeout, max events, max bytes, etc.
Expand All @@ -111,14 +107,12 @@ pub struct LogSink<S> {
/// log lines, and requires some specific normalization of certain event fields.
#[derive(Clone, Debug)]
pub struct JsonEncoding {
log_schema: &'static LogSchema,
encoder: (Transformer, Encoder<Framer>),
}

impl JsonEncoding {
pub fn new(transformer: Transformer) -> Self {
Self {
log_schema: log_schema(),
encoder: (
transformer,
Encoder::<Framer>::new(
Expand All @@ -130,12 +124,6 @@ impl JsonEncoding {
}
}

#[derive(Clone, Debug)]
pub struct SemanticJsonEncoding {
log_schema: &'static LogSchema,
encoder: (Transformer, Encoder<Framer>),
}

impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
fn encode_input(&self, mut input: Vec<Event>, writer: &mut dyn io::Write) -> io::Result<usize> {
for event in input.iter_mut() {
Expand All @@ -145,15 +133,16 @@ impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
.expect("message is required (make sure the \"message\" semantic meaning is set)");
log.rename_key(message_path.as_str(), event_path!("message"));

let host_path = log
.host_path()
.expect("message is required (make sure the \"host\" semantic meaning is set)");
log.rename_key(host_path.as_str(), event_path!("host"));
if let Some(host_path) = log.host_path() {
log.rename_key(host_path.as_str(), event_path!("hostname"));
}

if let Some(Value::Timestamp(ts)) = log
if let Some(Value::Timestamp(ts)) = log.remove(
log
.timestamp_path()
.and_then(|path| log.remove(path.as_str()))
{
.expect("timestamp is required (make sure the \"timestamp\" semantic meaning is set)")
.as_str()
) {
log.insert(
event_path!("timestamp"),
Value::Integer(ts.timestamp_millis()),
Expand All @@ -165,36 +154,6 @@ impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
}
}

impl crate::sinks::util::encoding::Encoder<Vec<Event>> for SemanticJsonEncoding {
fn encode_input(&self, mut input: Vec<Event>, writer: &mut dyn io::Write) -> io::Result<usize> {
for event in input.iter_mut() {
let log = event.as_mut_log();

// message
let message_key = log
.find_key_by_meaning("message")
.expect("enforced by schema");
log.rename_key(message_key.as_str(), event_path!("message"));

// host
let host_key = log
.find_key_by_meaning("host")
.unwrap_or_else(|| self.log_schema.host_key().into());
log.rename_key(host_key.as_str(), event_path!("host"));

// timestamp
let ts = log
.get_by_meaning("timestamp")
.expect("enforced by schema")
.as_timestamp_unwrap();
let ms = ts.timestamp_millis();
log.insert(event_path!("timestamp"), Value::Integer(ms));
}

self.encoder.encode_input(input, writer)
}
}

#[derive(Debug, Snafu)]
pub enum RequestBuildError {
#[snafu(display("Encoded payload is greater than the max limit."))]
Expand Down Expand Up @@ -297,97 +256,6 @@ impl RequestBuilder<(Option<Arc<str>>, Vec<Event>)> for LogRequestBuilder {
}
}

struct SemanticLogRequestBuilder {
default_api_key: Arc<str>,
encoding: SemanticJsonEncoding,
compression: Compression,
}

impl RequestBuilder<(Option<Arc<str>>, Vec<Event>)> for SemanticLogRequestBuilder {
type Metadata = (Arc<str>, EventFinalizers);
type Events = Vec<Event>;
type Encoder = SemanticJsonEncoding;
type Payload = Bytes;
type Request = LogApiRequest;
type Error = RequestBuildError;

fn compression(&self) -> Compression {
self.compression
}

fn encoder(&self) -> &Self::Encoder {
&self.encoding
}

fn split_input(
&self,
input: (Option<Arc<str>>, Vec<Event>),
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let (api_key, mut events) = input;

let builder = RequestMetadataBuilder::from_events(&events);

let finalizers = events.take_finalizers();

let api_key = api_key.unwrap_or_else(|| Arc::clone(&self.default_api_key));

((api_key, finalizers), builder, events)
}

fn encode_events(
&self,
events: Self::Events,
) -> Result<EncodeResult<Self::Payload>, Self::Error> {
// We need to first serialize the payload separately so that we can figure out how big it is
// before compression. The Datadog Logs API has a limit on uncompressed data, so we can't
// use the default implementation of this method.
//
// TODO: We should probably make `build_request` fallible itself, because then this override of `encode_events`
// wouldn't even need to exist, and we could handle it in `build_request` which is required by all implementors.
//
// On the flip side, it would mean that we'd potentially be compressing payloads that we would inevitably end up
// rejecting anyways, which is meh. This might be a signal that the true "right" fix is to actually switch this
// sink to incremental encoding and simply put up with suboptimal batch sizes if we need to end up splitting due
// to (un)compressed size limitations.
let mut buf = Vec::new();
let n_events = events.len();
let uncompressed_size = self.encoder().encode_input(events, &mut buf)?;
if uncompressed_size > MAX_PAYLOAD_BYTES {
return Err(RequestBuildError::PayloadTooBig);
}

// Now just compress it like normal.
let mut compressor = Compressor::from(self.compression);
write_all(&mut compressor, n_events, &buf)?;
let bytes = compressor.into_inner().freeze();

if self.compression.is_compressed() {
Ok(EncodeResult::compressed(bytes, uncompressed_size))
} else {
Ok(EncodeResult::uncompressed(bytes))
}
}

fn build_request(
&self,
dd_metadata: Self::Metadata,
metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
let (api_key, finalizers) = dd_metadata;
let uncompressed_size = payload.uncompressed_byte_size;

LogApiRequest {
api_key,
compression: self.compression,
body: payload.into_payload(),
finalizers,
uncompressed_size,
metadata,
}
}
}

impl<S> LogSink<S>
where
S: Service<LogApiRequest> + Send + 'static,
Expand All @@ -402,41 +270,28 @@ where

let builder_limit = NonZeroUsize::new(64);
let input = input.batched_partitioned(partitioner, self.batch_settings);
if self.schema_enabled {
input.request_builder(
builder_limit,
SemanticLogRequestBuilder {
default_api_key,
encoding: SemanticJsonEncoding {
log_schema: self.encoding.log_schema,
encoder: self.encoding.encoder,
},
compression: self.compression,
},
)
} else {
input.request_builder(
input
.request_builder(
builder_limit,
LogRequestBuilder {
default_api_key,
encoding: self.encoding,
compression: self.compression,
},
)
}
.filter_map(|request| async move {
match request {
Err(error) => {
emit!(SinkRequestBuildError { error });
None
.filter_map(|request| async move {
match request {
Err(error) => {
emit!(SinkRequestBuildError { error });
None
}
Ok(req) => Some(req),
}
Ok(req) => Some(req),
}
})
.into_driver(self.service)
.protocol(self.protocol)
.run()
.await
})
.into_driver(self.service)
.protocol(self.protocol)
.run()
.await
}
}

Expand Down

0 comments on commit faa96f8

Please sign in to comment.