Skip to content

Commit

Permalink
chore(datadog_metrics sink): incrementally encode sketches (#17764)
Browse files Browse the repository at this point in the history
## Context

When support was added for encoding/sending sketches in #9178, logic was
added to handle "splitting" payloads if a metric exceeded the
(un)compressed payload limits. As we lacked (at the time) the ability to
encode sketch metrics one-by-one, we were forced to collect all of them,
and then attempt to encode them all at once, which had a tendency to
grow the response size past the (un)compressed payload limits. This
"splitting" mechanism allowed us to compensate for that.

However, in order to avoid getting stuck in pathological loops where
payloads were too big, and thus required multiple splits (after already
attempting at least one split), the logic was configured such that a
batch of metrics would only be split once, and if the two subsequent
slices couldn't be encoded without also exceeding the limits, they would
be dropped and we would give up trying to split further.

Despite the gut feeling during that work that it should be exceedingly
rare to ever need to split further, real life has shown otherwise:
#13175

## Solution

This PR introduces proper incremental encoding of sketches, which
doesn't eliminate the possibility of needing to split (more below) but
significantly reduces the likelihood that splitting will need to happen
down to a purely theoretical level.

We're taking advantage of hidden-from-docs methods in `prost` to encode
each `SketchPayload` object and append the bytes into a single buffer.
This is possible due to how Protocol Buffers functions. Additionally,
we're now generating "file descriptors" for our compiled Protocol
Buffers definitions. We use this to let us programmatically query the
field number of the "sketches" field in the `SketchPayload` message,
which is a slightly more robust way than just hardcoding it and hoping
it doesn't ever change in the future.

In Protocol Buffers, each field in a message is written out such that
the field data is preceded by the field number. This is part and parcel
to its ability to allow for backwards compatible changes to a
definition. Further, for repeated fields -- i.e. `Vec<Sketch>` -- the
repetitive nature is determined simply by write the same field multiple
times rather than needing to write everything all together. Practically
speaking, this means that we can encode a vector of two messages, or
encode those two messages individually, and end up with the same encoded
output of `[field N][field data][field N][field data]`.

### Ancillary changes

We've additionally fixed a bug with the "bytes sent" metric being
reported for the `datadog_metrics` sink due to some very tangled and
miswired code around how compressed/uncompressed/event bytes/etc sizes
were being shuttled from the request builder logic down to `Driver`.

We've also reworked some of the encoder error types just to clean them
up and simplify things a bit.

## Reviewer notes

### Still needing to handle splits

The encoder still does need to care about splits, in a theoretical
sense, because while we can accurately track and avoid ever exceeding
the uncompressed payload limit, we can't know the final compressed
payload size until we finalize the builder/payload.

Currently, the encoder does a check to see if adding the current metric
would cause us to exceed the compressed payload limit, assuming the
compressor couldn't actually compress the encoded metric at all. This is
a fairly robust check since it tries to optimally account for the
overhead of an entirely incompressible payload, and so on... but we
really want to avoid dropping events if possible, obviously, and that's
why the splitting code is still in place.
  • Loading branch information
tobz authored Jun 29, 2023
1 parent e66e285 commit 0ac94d4
Show file tree
Hide file tree
Showing 13 changed files with 545 additions and 359 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ serde_yaml = { version = "0.9.22", default-features = false }
rmp-serde = { version = "1.1.1", default-features = false, optional = true }
rmpv = { version = "1.0.0", default-features = false, features = ["with-serde"], optional = true }

# Prost
# Prost / Protocol Buffers
prost = { version = "0.11", default-features = false, features = ["std"] }
prost-reflect = { version = "0.11", default-features = false, optional = true }
prost-types = { version = "0.11", default-features = false, optional = true }

# GCP
Expand Down Expand Up @@ -671,7 +672,7 @@ sinks-console = []
sinks-databend = []
sinks-datadog_events = []
sinks-datadog_logs = []
sinks-datadog_metrics = ["protobuf-build"]
sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"]
sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
sinks-file = ["dep:async-compression"]
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ proc-macro2,https://github.com/dtolnay/proc-macro2,MIT OR Apache-2.0,"David Toln
proptest,https://github.com/proptest-rs/proptest,MIT OR Apache-2.0,Jason Lingle
prost,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert <dan@danburkert.com>, Lucio Franco <luciofranco14@gmail.com, Tokio Contributors <team@tokio.rs>"
prost-derive,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert <dan@danburkert.com>, Lucio Franco <luciofranco14@gmail.com>, Tokio Contributors <team@tokio.rs>"
prost-reflect,https://github.com/andrewhickman/prost-reflect,MIT OR Apache-2.0,Andrew Hickman <andrew.hickman1@sky.com>
ptr_meta,https://github.com/djkoloski/ptr_meta,MIT,David Koloski <djkoloski@gmail.com>
pulsar,https://github.com/streamnative/pulsar-rs,MIT OR Apache-2.0,"Colin Stearns <cstearns@developers.wyyerd.com>, Kevin Stenerson <kstenerson@developers.wyyerd.com>, Geoffroy Couprie <contact@geoffroycouprie.com>"
quad-rand,https://github.com/not-fl3/quad-rand,MIT,not-fl3 <not.fl3@gmail.com>
Expand Down
22 changes: 20 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::{collections::HashSet, env, fs::File, io::Write, path::Path, process::Command};
use std::{
collections::HashSet,
env,
fs::File,
io::Write,
path::{Path, PathBuf},
process::Command,
};

struct TrackedEnv {
tracked: HashSet<String>,
Expand Down Expand Up @@ -124,8 +131,19 @@ fn main() {
println!("cargo:rerun-if-changed=proto/google/rpc/status.proto");
println!("cargo:rerun-if-changed=proto/vector.proto");

// Create and store the "file descriptor set" from the compiled Protocol Buffers packages.
//
// This allows us to use runtime reflection to manually build Protocol Buffers payloads
// in a type-safe way, which is necessary for incrementally building certain payloads, like
// the ones generated in the `datadog_metrics` sink.
let protobuf_fds_path =
PathBuf::from(std::env::var("OUT_DIR").expect("OUT_DIR environment variable not set"))
.join("protobuf-fds.bin");

let mut prost_build = prost_build::Config::new();
prost_build.btree_map(["."]);
prost_build
.btree_map(["."])
.file_descriptor_set_path(protobuf_fds_path);

tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
Expand Down
14 changes: 6 additions & 8 deletions src/internal_events/datadog_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ use vector_common::internal_event::{
};

#[derive(Debug)]
pub struct DatadogMetricsEncodingError {
pub error_message: &'static str,
pub struct DatadogMetricsEncodingError<'a> {
pub reason: &'a str,
pub error_code: &'static str,
pub dropped_events: usize,
}

impl InternalEvent for DatadogMetricsEncodingError {
impl<'a> InternalEvent for DatadogMetricsEncodingError<'a> {
fn emit(self) {
let reason = "Failed to encode Datadog metrics.";
error!(
message = reason,
error = %self.error_message,
error_code = %self.error_code,
message = self.reason,
error_code = self.error_code,
error_type = error_type::ENCODER_FAILED,
intentional = "false",
stage = error_stage::PROCESSING,
Expand All @@ -35,7 +33,7 @@ impl InternalEvent for DatadogMetricsEncodingError {
if self.dropped_events > 0 {
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: self.dropped_events,
reason,
reason: self.reason,
});
}
}
Expand Down
5 changes: 0 additions & 5 deletions src/proto.rs

This file was deleted.

19 changes: 19 additions & 0 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))]
use crate::event::proto as event;

#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))]
pub mod vector;

#[cfg(feature = "sinks-datadog_metrics")]
pub mod fds {
use once_cell::sync::OnceCell;
use prost_reflect::DescriptorPool;

pub fn protobuf_descriptors() -> &'static DescriptorPool {
static PROTOBUF_FDS: OnceCell<DescriptorPool> = OnceCell::new();
PROTOBUF_FDS.get_or_init(|| {
DescriptorPool::decode(include_bytes!(concat!(env!("OUT_DIR"), "/protobuf-fds.bin")).as_ref())
.expect("should not fail to decode protobuf file descriptor set generated from build script")
})
}
}
5 changes: 5 additions & 0 deletions src/sinks/datadog/metrics/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl DatadogMetricsEndpoint {
DatadogMetricsEndpoint::Sketches => "application/x-protobuf",
}
}

// Gets whether or not this is a series endpoint.
pub const fn is_series(self) -> bool {
matches!(self, Self::Series)
}
}

/// Maps Datadog metric endpoints to their actual URI.
Expand Down
Loading

0 comments on commit 0ac94d4

Please sign in to comment.