From 5efc7893598d9f08e54cf63ea09385e44e8f16a1 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 26 Jun 2023 13:33:06 -0400 Subject: [PATCH 1/6] chore(datadog_metrics sink): incrementally encode sketches --- Cargo.lock | 12 + Cargo.toml | 5 +- build.rs | 22 +- src/internal_events/datadog_metrics.rs | 14 +- src/proto.rs | 5 - src/proto/mod.rs | 20 + src/sinks/datadog/metrics/config.rs | 5 + src/sinks/datadog/metrics/encoder.rs | 578 ++++++++++++------- src/sinks/datadog/metrics/request_builder.rs | 139 ++--- src/sinks/datadog/metrics/service.rs | 28 +- src/sinks/datadog/metrics/sink.rs | 4 +- 11 files changed, 504 insertions(+), 328 deletions(-) delete mode 100644 src/proto.rs create mode 100644 src/proto/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 132e547f43b48..683c505264492 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6300,6 +6300,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-reflect" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "000e1e05ebf7b26e1eba298e66fe4eee6eb19c567d0ffb35e0dd34231cdac4c8" +dependencies = [ + "once_cell", + "prost", + "prost-types", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -9206,6 +9217,7 @@ dependencies = [ "proptest", "prost", "prost-build", + "prost-reflect", "prost-types", "pulsar", "quickcheck", diff --git a/Cargo.toml b/Cargo.toml index 2615e080e739c..f3673b1e2bbd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -198,8 +198,9 @@ serde_yaml = { version = "0.9.21", default-features = false } rmp-serde = { version = "1.1.1", default-features = false, optional = true } rmpv = { version = "1.0.0", default-features = false, features = ["with-serde"], optional = true } -# Prost +# Prost / Protocol Buffers prost = { version = "0.11", default-features = false, features = ["std"] } +prost-reflect = { version = "0.11", default-features = false, optional = true } prost-types = { version = "0.11", default-features = false, optional = true } # GCP @@ -670,7 +671,7 @@ sinks-databend = [] sinks-datadog_archives = ["sinks-aws_s3", "sinks-azure_blob", "sinks-gcp"] sinks-datadog_events = [] sinks-datadog_logs = [] -sinks-datadog_metrics = ["protobuf-build"] +sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"] sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"] sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"] sinks-file = ["dep:async-compression"] diff --git a/build.rs b/build.rs index 20cfd23d52a88..3db5ce0508997 100644 --- a/build.rs +++ b/build.rs @@ -1,4 +1,11 @@ -use std::{collections::HashSet, env, fs::File, io::Write, path::Path, process::Command}; +use std::{ + collections::HashSet, + env, + fs::File, + io::Write, + path::{Path, PathBuf}, + process::Command, +}; struct TrackedEnv { tracked: HashSet, @@ -124,8 +131,19 @@ fn main() { println!("cargo:rerun-if-changed=proto/google/rpc/status.proto"); println!("cargo:rerun-if-changed=proto/vector.proto"); + // Create and store the "file descriptor set" from the compiled Protocol Buffers packages. + // This allows us to use runtime reflection to manually build Protocol Buffers payloads + // in a type-safe way, which is necessary for incrementally building certain payloads, like + // the ones generated in the `datadog_metrics` sink. + let protobuf_fds_path = std::env::var("OUT_DIR") + .map(PathBuf::from) + .map(|path| path.join("protobuf-fds.bin")) + .expect("OUT_DIR environment variable not set"); + let mut prost_build = prost_build::Config::new(); - prost_build.btree_map(["."]); + prost_build + .btree_map(["."]) + .file_descriptor_set_path(protobuf_fds_path); tonic_build::configure() .protoc_arg("--experimental_allow_proto3_optional") diff --git a/src/internal_events/datadog_metrics.rs b/src/internal_events/datadog_metrics.rs index 792d8496f041d..c4daf1d3ce7f8 100644 --- a/src/internal_events/datadog_metrics.rs +++ b/src/internal_events/datadog_metrics.rs @@ -7,19 +7,17 @@ use vector_common::internal_event::{ }; #[derive(Debug)] -pub struct DatadogMetricsEncodingError { - pub error_message: &'static str, +pub struct DatadogMetricsEncodingError<'a> { + pub reason: &'a str, pub error_code: &'static str, pub dropped_events: usize, } -impl InternalEvent for DatadogMetricsEncodingError { +impl<'a> InternalEvent for DatadogMetricsEncodingError<'a> { fn emit(self) { - let reason = "Failed to encode Datadog metrics."; error!( - message = reason, - error = %self.error_message, - error_code = %self.error_code, + message = self.reason, + error_code = self.error_code, error_type = error_type::ENCODER_FAILED, intentional = "false", stage = error_stage::PROCESSING, @@ -35,7 +33,7 @@ impl InternalEvent for DatadogMetricsEncodingError { if self.dropped_events > 0 { emit!(ComponentEventsDropped:: { count: self.dropped_events, - reason, + reason: self.reason, }); } } diff --git a/src/proto.rs b/src/proto.rs deleted file mode 100644 index b77e94c30f793..0000000000000 --- a/src/proto.rs +++ /dev/null @@ -1,5 +0,0 @@ -#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))] -use crate::event::proto as event; - -#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))] -pub mod vector; diff --git a/src/proto/mod.rs b/src/proto/mod.rs new file mode 100644 index 0000000000000..876d6ebc7ee54 --- /dev/null +++ b/src/proto/mod.rs @@ -0,0 +1,20 @@ +#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))] +use crate::event::proto as event; + +#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))] +pub mod vector; + +#[cfg(feature = "sinks-datadog_metrics")] +pub mod fds { + use once_cell::sync::OnceCell; + use prost_reflect::DescriptorPool; + + static PROTOBUF_FDS: OnceCell = OnceCell::new(); + + pub fn get_protobuf_descriptors() -> &'static DescriptorPool { + PROTOBUF_FDS.get_or_init(|| { + DescriptorPool::decode(include_bytes!(concat!(env!("OUT_DIR"), "/protobuf-fds.bin")).as_ref()) + .expect("should not fail to decode protobuf file descriptor set generated from build script") + }) + } +} diff --git a/src/sinks/datadog/metrics/config.rs b/src/sinks/datadog/metrics/config.rs index 1acedc003c079..9fb8c4cd48137 100644 --- a/src/sinks/datadog/metrics/config.rs +++ b/src/sinks/datadog/metrics/config.rs @@ -59,6 +59,11 @@ impl DatadogMetricsEndpoint { DatadogMetricsEndpoint::Sketches => "application/x-protobuf", } } + + // Gets whether or not this is a series endpoint. + pub const fn is_series(self) -> bool { + matches!(self, Self::Series) + } } /// Maps Datadog metric endpoints to their actual URI. diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index a2bd8330c5f35..c043626f82571 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -7,11 +7,13 @@ use std::{ use bytes::{BufMut, Bytes}; use chrono::{DateTime, Utc}; +use once_cell::sync::OnceCell; use prost::Message; use snafu::{ResultExt, Snafu}; use vector_core::{ config::{log_schema, LogSchema}, event::{metric::MetricSketch, Metric, MetricTags, MetricValue}, + metrics::AgentDDSketch, }; use super::config::{ @@ -19,6 +21,7 @@ use super::config::{ }; use crate::{ common::datadog::{DatadogMetricType, DatadogPoint, DatadogSeriesMetric}, + proto::fds::get_protobuf_descriptors, sinks::util::{encode_namespace, Compression, Compressor}, }; @@ -37,6 +40,17 @@ pub enum CreateError { InvalidLimits, } +impl CreateError { + /// Gets the telemetry-friendly string version of this error. + /// + /// The value will be a short string with only lowercase letters and underscores. + pub const fn as_error_type(&self) -> &'static str { + match self { + Self::InvalidLimits => "invalid_payload_limits", + } + } +} + #[derive(Debug, Snafu)] pub enum EncoderError { #[snafu(display( @@ -49,11 +63,31 @@ pub enum EncoderError { metric_value: &'static str, }, - #[snafu(display("Failed to encode series metrics to JSON: {}", source))] + #[snafu( + context(false), + display("Failed to encode series metric to JSON: {source}") + )] JsonEncodingFailed { source: serde_json::Error }, - #[snafu(display("Failed to encode sketch metrics to Protocol Buffers: {}", source))] - ProtoEncodingFailed { source: prost::EncodeError }, + // Currently, the only time `prost` ever emits `EncodeError` is when there is insufficient + // buffer capacity, so we don't need to hold on to the error, and we can just hardcode this. + #[snafu(display( + "Failed to encode sketch metric to Protocol Buffers: insufficient buffer capacity." + ))] + ProtoEncodingFailed, +} + +impl EncoderError { + /// Gets the telemetry-friendly string version of this error. + /// + /// The value will be a short string with only lowercase letters and underscores. + pub const fn as_error_type(&self) -> &'static str { + match self { + Self::InvalidMetric { .. } => "invalid_metric", + Self::JsonEncodingFailed { .. } => "failed_to_encode_series", + Self::ProtoEncodingFailed => "failed_to_encode_sketch", + } + } } #[derive(Debug, Snafu)] @@ -64,9 +98,6 @@ pub enum FinishError { ))] CompressionFailed { source: io::Error }, - #[snafu(display("Failed to encode pending metrics: {}", source))] - PendingEncodeFailed { source: EncoderError }, - #[snafu(display("Finished payload exceeded the (un)compressed size limits"))] TooLarge { metrics: Vec, @@ -81,7 +112,6 @@ impl FinishError { pub const fn as_error_type(&self) -> &'static str { match self { Self::CompressionFailed { .. } => "compression_failed", - Self::PendingEncodeFailed { .. } => "pending_encode_failed", Self::TooLarge { .. } => "too_large", } } @@ -91,21 +121,15 @@ struct EncoderState { writer: Compressor, written: usize, buf: Vec, - - pending: Vec, processed: Vec, } impl Default for EncoderState { fn default() -> Self { - EncoderState { - // We use the "zlib default" compressor because it's all Datadog supports, and adding it - // generically to `Compression` would make things a little weird because of the - // conversion trait implementations that are also only none vs gzip. + Self { writer: get_compressor(), written: 0, buf: Vec::with_capacity(1024), - pending: Vec::new(), processed: Vec::new(), } } @@ -145,7 +169,7 @@ impl DatadogMetricsEncoder { compressed_limit: usize, ) -> Result { let (uncompressed_limit, compressed_limit) = - validate_payload_size_limits(uncompressed_limit, compressed_limit) + validate_payload_size_limits(endpoint, uncompressed_limit, compressed_limit) .ok_or(CreateError::InvalidLimits)?; Ok(Self { @@ -195,15 +219,23 @@ impl DatadogMetricsEncoder { { return Ok(Some(metric)); } - serde_json::to_writer(&mut self.state.buf, series) - .context(JsonEncodingFailedSnafu)?; + serde_json::to_writer(&mut self.state.buf, series)?; } } - // We can't encode sketches incrementally (yet), so we don't do any encoding here. We - // simply store it for later, and in `try_encode_pending`, any such pending metrics will be - // encoded in a single operation. + // Sketches are encoded via ProtoBuf, also in an incremental fashion. DatadogMetricsEndpoint::Sketches => match metric.value() { - MetricValue::Sketch { .. } => {} + MetricValue::Sketch { sketch } => match sketch { + MetricSketch::AgentDDSketch(ddsketch) => { + encode_sketch_incremental( + &metric, + ddsketch, + &self.default_namespace, + self.log_schema, + &mut self.state.buf, + ) + .map_err(|_| EncoderError::ProtoEncodingFailed)?; + } + }, value => { return Err(EncoderError::InvalidMetric { expected: "sketches", @@ -213,21 +245,14 @@ impl DatadogMetricsEncoder { }, } - // If we actually encoded a metric, we try to see if our temporary buffer can be compressed - // and added to the overall payload. Otherwise, it means we're deferring the metric for - // later encoding, so we store it off to the side. - if !self.state.buf.is_empty() { - match self.try_compress_buffer() { - Err(_) | Ok(false) => return Ok(Some(metric)), - Ok(true) => {} + // Try and see if our temporary buffer can be written to the compressor. + match self.try_compress_buffer() { + Err(_) | Ok(false) => Ok(Some(metric)), + Ok(true) => { + self.state.processed.push(metric); + Ok(None) } - - self.state.processed.push(metric); - } else { - self.state.pending.push(metric); } - - Ok(None) } fn try_compress_buffer(&mut self) -> io::Result { @@ -292,56 +317,7 @@ impl DatadogMetricsEncoder { self.encode_single_metric(metric) } - fn try_encode_pending(&mut self) -> Result<(), FinishError> { - // The Datadog Agent uses a particular Protocol Buffers library to incrementally encode the - // DDSketch structures into a payload, similar to how we incrementally encode the series - // metrics. Unfortunately, there's no existing Rust crate that allows writing out Protocol - // Buffers payloads by hand, so we have to cheat a little and buffer up the metrics until - // the very end. - // - // `try_encode`, and thus `encode_single_metric`, specifically store sketch-oriented metrics - // off to the side for this very purpose, letting us gather them all here, encoding them - // into a single Protocol Buffers payload. - // - // Naturally, this means we might actually generate a payload that's too big. This is a - // problem for the caller to figure out. Presently, the only usage of this encoder will - // naively attempt to split the batch into two and try again. - - // Only go through this if we're targeting the sketch endpoint. - if !(matches!(self.endpoint, DatadogMetricsEndpoint::Sketches)) { - return Ok(()); - } - - // Consume of all of the "pending" metrics and try to write them out as sketches. - let pending = mem::take(&mut self.state.pending); - write_sketches( - &pending, - &self.default_namespace, - self.log_schema, - &mut self.state.buf, - ) - .context(PendingEncodeFailedSnafu)?; - - if self.try_compress_buffer().context(CompressionFailedSnafu)? { - // Since we encoded and compressed them successfully, add them to the "processed" list. - self.state.processed.extend(pending); - Ok(()) - } else { - // The payload was too big overall, which we can't do anything about. Up to the caller - // now to try to encode them again after splitting the batch. - Err(FinishError::TooLarge { - metrics: pending, - // TODO: Hard-coded split code for now because we need to hoist up the logic for - // calculating the recommended splits to an instance method or something. - recommended_splits: 2, - }) - } - } - pub fn finish(&mut self) -> Result<(Bytes, Vec, usize), FinishError> { - // Try to encode any pending metrics we had stored up. - self.try_encode_pending()?; - // Write any payload footer necessary for the configured endpoint. let n = write_payload_footer(self.endpoint, &mut self.state.writer) .context(CompressionFailedSnafu)?; @@ -381,6 +357,104 @@ impl DatadogMetricsEncoder { } } +fn get_sketch_payload_sketches_field_number() -> u32 { + static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceCell = OnceCell::new(); + *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| { + let descriptors = get_protobuf_descriptors(); + let descriptor = descriptors + .get_message_by_name("datadog.agentpayload.SketchPayload") + .expect("should not fail to find `SketchPayload` message in descriptor pool"); + + descriptor + .get_field_by_name("sketches") + .map(|field| field.number()) + .expect("`sketches` field must exist in `SketchPayload` message") + }) +} + +fn sketch_to_proto_message( + metric: &Metric, + ddsketch: &AgentDDSketch, + default_namespace: &Option>, + log_schema: &'static LogSchema, +) -> ddmetric_proto::sketch_payload::Sketch { + let name = get_namespaced_name(metric, default_namespace); + let ts = encode_timestamp(metric.timestamp()); + let mut tags = metric.tags().cloned().unwrap_or_default(); + let host = tags.remove(log_schema.host_key()).unwrap_or_default(); + let tags = encode_tags(&tags); + + let cnt = ddsketch.count() as i64; + let min = ddsketch + .min() + .expect("min should be present for non-empty sketch"); + let max = ddsketch + .max() + .expect("max should be present for non-empty sketch"); + let avg = ddsketch + .avg() + .expect("avg should be present for non-empty sketch"); + let sum = ddsketch + .sum() + .expect("sum should be present for non-empty sketch"); + + let (bins, counts) = ddsketch.bin_map().into_parts(); + let k = bins.into_iter().map(Into::into).collect(); + let n = counts.into_iter().map(Into::into).collect(); + + ddmetric_proto::sketch_payload::Sketch { + metric: name, + tags, + host, + distributions: Vec::new(), + dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch { + ts, + cnt, + min, + max, + avg, + sum, + k, + n, + }], + } +} + +fn encode_sketch_incremental( + metric: &Metric, + ddsketch: &AgentDDSketch, + default_namespace: &Option>, + log_schema: &'static LogSchema, + buf: &mut B, +) -> Result<(), prost::EncodeError> +where + B: BufMut, +{ + // This encodes a single sketch metric incrementally, which means that we specifically write it + // as if we were writing a single field entry in the overall `SketchPayload` message + // type. + // + // By doing so, we can encode multiple sketches and concatenate all the buffers, and have the + // resulting buffer appear as if it's a normal `SketchPayload` message with a bunch of repeats + // of the `sketches` field. + // + // Crucially, this code works because `SketchPayload` has two fields -- metadata and sketches -- + // and we never actually set the metadata field... so the resulting message generated overall + // for `SketchPayload` with a single sketch looks just like as if we literally wrote out a + // single value for the given field. + + let sketch_proto = sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema); + + // Manually write the field tag for `sketches` and then encode the sketch payload directly as a + // length-delimited message. + prost::encoding::encode_key( + get_sketch_payload_sketches_field_number(), + prost::encoding::WireType::LengthDelimited, + buf, + ); + sketch_proto.encode_length_delimited(buf) +} + fn get_namespaced_name(metric: &Metric, default_namespace: &Option>) -> String { encode_namespace( metric @@ -481,89 +555,10 @@ fn generate_series_metrics( Ok(results) } -fn write_sketches( - metrics: &[Metric], - default_namespace: &Option>, - log_schema: &'static LogSchema, - buf: &mut B, -) -> Result<(), EncoderError> -where - B: BufMut, -{ - let mut sketches = Vec::new(); - for metric in metrics { - match metric.value() { - MetricValue::Sketch { sketch } => match sketch { - MetricSketch::AgentDDSketch(ddsketch) => { - // Don't encode any empty sketches. - if ddsketch.is_empty() { - continue; - } - - let name = get_namespaced_name(metric, default_namespace); - let ts = encode_timestamp(metric.timestamp()); - let mut tags = metric.tags().cloned().unwrap_or_default(); - let host = tags.remove(log_schema.host_key()).unwrap_or_default(); - let tags = encode_tags(&tags); - - let cnt = ddsketch.count() as i64; - let min = ddsketch - .min() - .expect("min should be present for non-empty sketch"); - let max = ddsketch - .max() - .expect("max should be present for non-empty sketch"); - let avg = ddsketch - .avg() - .expect("avg should be present for non-empty sketch"); - let sum = ddsketch - .sum() - .expect("sum should be present for non-empty sketch"); - - let (bins, counts) = ddsketch.bin_map().into_parts(); - let k = bins.into_iter().map(Into::into).collect(); - let n = counts.into_iter().map(Into::into).collect(); - - let sketch = ddmetric_proto::sketch_payload::Sketch { - metric: name, - tags, - host, - distributions: Vec::new(), - dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch { - ts, - cnt, - min, - max, - avg, - sum, - k, - n, - }], - }; - - sketches.push(sketch); - } - }, - // We filter out non-sketch metrics during `encode_single_metric` if we're targeting - // the sketches endpoint. - _ => unreachable!(), - } - } - - let sketch_payload = ddmetric_proto::SketchPayload { - // TODO: The "common metadata" fields are things that only very loosely apply to Vector, or - // are hard to characterize -- for example, what's the API key for a sketch that didn't originate - // from the Datadog Agent? -- so we're just omitting it here in the hopes it doesn't - // actually matter. - metadata: None, - sketches, - }; - - // Now try encoding this sketch payload, and then try to compress it. - sketch_payload.encode(buf).context(ProtoEncodingFailedSnafu) -} - fn get_compressor() -> Compressor { + // We use the "zlib default" compressor because it's all Datadog supports, and adding it + // generically to `Compression` would make things a little weird because of the conversion trait + // implementations that are also only none vs gzip. Compression::zlib_default().into() } @@ -593,17 +588,20 @@ const fn max_compression_overhead_len(compressed_limit: usize) -> usize { } const fn validate_payload_size_limits( + endpoint: DatadogMetricsEndpoint, uncompressed_limit: usize, compressed_limit: usize, ) -> Option<(usize, usize)> { - // Get the maximum possible length of the header/footer combined. - // - // This only matters for series metrics at the moment, since sketches are encoded in a single - // shot to their Protocol Buffers representation. We're "wasting" `header_len` bytes in the - // case of sketches, but we're also talking about like 10 bytes: not enough to care about. - let header_len = max_uncompressed_header_len(); - if uncompressed_limit <= header_len { - return None; + if endpoint.is_series() { + // For series, we need to make sure the uncompressed limit can account for the header/footer + // we would add that wraps the encoded metrics up in the expected JSON object. This does + // imply that adding 1 to this limit would be allowed, and obviously we can't encode a + // series metric in a single byte, but this is just a simple sanity check, not an exhaustive + // search of the absolute bare minimum size. + let header_len = max_uncompressed_header_len(); + if uncompressed_limit <= header_len { + return None; + } } // Get the maximum possible overhead of the compression container, based on the incoming @@ -659,6 +657,7 @@ mod tests { use std::{ io::{self, copy}, num::NonZeroU32, + sync::Arc, }; use bytes::{BufMut, Bytes, BytesMut}; @@ -668,16 +667,21 @@ mod tests { arbitrary::any, collection::btree_map, num::f64::POSITIVE as ARB_POSITIVE_F64, prop_assert, proptest, strategy::Strategy, string::string_regex, }; + use prost::Message; use vector_core::{ - config::log_schema, - event::{metric::TagValue, Metric, MetricKind, MetricTags, MetricValue}, + config::{log_schema, LogSchema}, + event::{ + metric::{MetricSketch, TagValue}, + Metric, MetricKind, MetricTags, MetricValue, + }, metric_tags, metrics::AgentDDSketch, }; use super::{ - encode_tags, encode_timestamp, generate_series_metrics, get_compressor, - max_compression_overhead_len, max_uncompressed_header_len, validate_payload_size_limits, + ddmetric_proto, encode_sketch_incremental, encode_tags, encode_timestamp, + generate_series_metrics, get_compressor, max_compression_overhead_len, + max_uncompressed_header_len, sketch_to_proto_message, validate_payload_size_limits, write_payload_footer, write_payload_header, DatadogMetricsEncoder, EncoderError, }; use crate::{ @@ -714,6 +718,10 @@ mod tests { compressor.finish().expect("should not fail").freeze() } + fn get_compressed_empty_sketches_payload() -> Bytes { + get_compressor().finish().expect("should not fail").freeze() + } + fn decompress_payload(payload: Bytes) -> io::Result { let mut decompressor = ZlibDecoder::new(&payload[..]); let mut decompressed = BytesMut::new().writer(); @@ -738,6 +746,47 @@ mod tests { } } + fn encode_sketches_normal( + metrics: &[Metric], + default_namespace: &Option>, + log_schema: &'static LogSchema, + buf: &mut B, + ) where + B: BufMut, + { + let mut sketches = Vec::new(); + for metric in metrics { + match metric.value() { + MetricValue::Sketch { sketch } => match sketch { + MetricSketch::AgentDDSketch(ddsketch) => { + // Don't encode any empty sketches. + if ddsketch.is_empty() { + continue; + } + + let sketch = sketch_to_proto_message( + metric, + ddsketch, + default_namespace, + log_schema, + ); + + sketches.push(sketch); + } + }, + _ => panic!("bad"), + } + } + + let sketch_payload = ddmetric_proto::SketchPayload { + metadata: None, + sketches, + }; + + // Now try encoding this sketch payload, and then try to compress it. + sketch_payload.encode(buf).unwrap() + } + #[test] fn test_encode_tags() { assert_eq!( @@ -864,16 +913,54 @@ mod tests { } #[test] - fn payload_size_limits() { + fn encode_multiple_sketch_metrics_normal_vs_incremental() { + // This tests our incremental sketch encoding against the more straightforward approach of + // just building/encoding a full `SketchPayload` message. + let metrics = vec![ + get_simple_sketch(), + get_simple_sketch(), + get_simple_sketch(), + ]; + + let mut normal_buf = Vec::new(); + encode_sketches_normal(&metrics, &None, log_schema(), &mut normal_buf); + + let mut incremental_buf = Vec::new(); + for metric in &metrics { + match metric.value() { + MetricValue::Sketch { sketch } => match sketch { + MetricSketch::AgentDDSketch(ddsketch) => encode_sketch_incremental( + metric, + ddsketch, + &None, + log_schema(), + &mut incremental_buf, + ) + .unwrap(), + }, + _ => panic!("should be a sketch"), + } + } + + assert_eq!(normal_buf, incremental_buf); + } + + #[test] + fn payload_size_limits_series() { // Get the maximum length of the header/trailer data. let header_len = max_uncompressed_header_len(); // This is too small. - let result = validate_payload_size_limits(header_len, usize::MAX); + let result = + validate_payload_size_limits(DatadogMetricsEndpoint::Series, header_len, usize::MAX); assert_eq!(result, None); // This is just right. - let result = validate_payload_size_limits(header_len + 1, usize::MAX); + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Series, + header_len + 1, + usize::MAX, + ); assert_eq!(result, Some((header_len + 1, usize::MAX))); // Get the maximum compressed overhead length, based on our input uncompressed size. This @@ -882,16 +969,52 @@ mod tests { let compression_overhead_len = max_compression_overhead_len(usize::MAX); // This is too small. - let result = validate_payload_size_limits(usize::MAX, compression_overhead_len); + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Series, + usize::MAX, + compression_overhead_len, + ); assert_eq!(result, None); // This is just right. - let result = validate_payload_size_limits(usize::MAX, compression_overhead_len + 1); + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Series, + usize::MAX, + compression_overhead_len + 1, + ); assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1))); } #[test] - fn encode_breaks_out_when_limit_reached_uncompressed() { + fn payload_size_limits_sketches() { + // There's no lower bound on uncompressed size for the sketches payload. + let result = validate_payload_size_limits(DatadogMetricsEndpoint::Sketches, 0, usize::MAX); + assert_eq!(result, Some((0, usize::MAX))); + + // Get the maximum compressed overhead length, based on our input uncompressed size. This + // represents the worst case overhead based on the input data (of length usize::MAX, in this + // case) being entirely incompressible. + let compression_overhead_len = max_compression_overhead_len(usize::MAX); + + // This is too small. + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Sketches, + usize::MAX, + compression_overhead_len, + ); + assert_eq!(result, None); + + // This is just right. + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Sketches, + usize::MAX, + compression_overhead_len + 1, + ); + assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1))); + } + + #[test] + fn encode_series_breaks_out_when_limit_reached_uncompressed() { // We manually create the encoder with an arbitrarily low "uncompressed" limit but high // "compressed" limit to exercise the codepath that should avoid encoding a metric when the // uncompressed payload would exceed the limit. @@ -905,7 +1028,8 @@ mod tests { .expect("payload size limits should be valid"); // Trying to encode a metric that would cause us to exceed our uncompressed limits will - // _not_ return an error from `try_encode`. + // _not_ return an error from `try_encode`, but instead will simply return back the metric + // as it could not be added. let counter = get_simple_counter(); let result = encoder.try_encode(counter.clone()); assert!(result.is_ok()); @@ -917,17 +1041,46 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, processed, raw_bytes) = result.unwrap(); - let empty_payload = get_compressed_empty_series_payload(); - assert_eq!(payload, empty_payload); + let (payload, processed, raw_bytes_written) = result.unwrap(); + assert_eq!(payload, get_compressed_empty_series_payload()); assert_eq!(processed.len(), 0); + assert_eq!(raw_bytes_written, max_uncompressed_header_len()); + } - // Just the header and footer. - assert_eq!(13, raw_bytes); + #[test] + fn encode_sketches_breaks_out_when_limit_reached_uncompressed() { + // We manually create the encoder with an arbitrarily low "uncompressed" limit but high + // "compressed" limit to exercise the codepath that should avoid encoding a metric when the + // uncompressed payload would exceed the limit. + let mut encoder = DatadogMetricsEncoder::with_payload_limits( + DatadogMetricsEndpoint::Sketches, + None, + 1, + usize::MAX, + ) + .expect("payload size limits should be valid"); + + // Trying to encode a metric that would cause us to exceed our uncompressed limits will + // _not_ return an error from `try_encode`, but instead will simply return back the metric + // as it could not be added. + let sketch = get_simple_sketch(); + let result = encoder.try_encode(sketch.clone()); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), Some(sketch)); + + // And similarly, since we didn't actually encode a metric, we _should_ be able to finish + // this payload, but it will be empty and no processed metrics should be returned. + let result = encoder.finish(); + assert!(result.is_ok()); + + let (payload, processed, raw_bytes_written) = result.unwrap(); + assert_eq!(payload, get_compressed_empty_sketches_payload()); + assert_eq!(processed.len(), 0); + assert_eq!(raw_bytes_written, 0); } #[test] - fn encode_breaks_out_when_limit_reached_compressed() { + fn encode_series_breaks_out_when_limit_reached_compressed() { // We manually create the encoder with an arbitrarily low "compressed" limit but high // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the // compressed payload would exceed the limit. @@ -942,7 +1095,8 @@ mod tests { .expect("payload size limits should be valid"); // Trying to encode a metric that would cause us to exceed our compressed limits will - // _not_ return an error from `try_encode`. + // _not_ return an error from `try_encode`, but instead will simply return back the metric + // as it could not be added. let counter = get_simple_counter(); let result = encoder.try_encode(counter.clone()); assert!(result.is_ok()); @@ -954,13 +1108,45 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, processed, raw_bytes) = result.unwrap(); - let empty_payload = get_compressed_empty_series_payload(); - assert_eq!(payload, empty_payload); + let (payload, processed, raw_bytes_written) = result.unwrap(); + assert_eq!(payload, get_compressed_empty_series_payload()); assert_eq!(processed.len(), 0); + assert_eq!(raw_bytes_written, max_uncompressed_header_len()); + } - // Just the header and footer. - assert_eq!(13, raw_bytes); + #[test] + fn encode_sketches_breaks_out_when_limit_reached_compressed() { + // We manually create the encoder with an arbitrarily low "compressed" limit but high + // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the + // compressed payload would exceed the limit. + let uncompressed_limit = 128; + let compressed_limit = 16; + let mut encoder = DatadogMetricsEncoder::with_payload_limits( + DatadogMetricsEndpoint::Sketches, + None, + uncompressed_limit, + compressed_limit, + ) + .expect("payload size limits should be valid"); + + // Trying to encode a metric that would cause us to exceed our compressed limits will + // _not_ return an error from `try_encode`, but instead will simply return back the metric + // as it could not be added. + let sketch = get_simple_sketch(); + let result = encoder.try_encode(sketch.clone()); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), Some(sketch)); + + // And similarly, since we didn't actually encode a metric, we _should_ be able to finish + // this payload, but it will be empty (effectively, the header/footer will exist) and no + // processed metrics should be returned. + let result = encoder.finish(); + assert!(result.is_ok()); + + let (payload, processed, raw_bytes_written) = result.unwrap(); + assert_eq!(payload, get_compressed_empty_sketches_payload()); + assert_eq!(processed.len(), 0); + assert_eq!(raw_bytes_written, 0); } fn arb_counter_metric() -> impl Strategy { diff --git a/src/sinks/datadog/metrics/request_builder.rs b/src/sinks/datadog/metrics/request_builder.rs index 64b1226b661bf..1125e56607879 100644 --- a/src/sinks/datadog/metrics/request_builder.rs +++ b/src/sinks/datadog/metrics/request_builder.rs @@ -1,11 +1,10 @@ use bytes::Bytes; -use serde_json::error::Category; use snafu::Snafu; -use std::{num::NonZeroUsize, sync::Arc}; +use std::sync::Arc; use vector_common::request_metadata::RequestMetadata; use vector_core::{ event::{EventFinalizers, Finalizable, Metric}, - EstimatedJsonEncodedSizeOf, + ByteSizeOf, EstimatedJsonEncodedSizeOf, }; use super::{ @@ -13,23 +12,23 @@ use super::{ encoder::{CreateError, DatadogMetricsEncoder, EncoderError, FinishError}, service::DatadogMetricsRequest, }; -use crate::sinks::util::{metadata::RequestMetadataBuilder, IncrementalRequestBuilder}; +use crate::sinks::util::IncrementalRequestBuilder; #[derive(Debug, Snafu)] pub enum RequestBuilderError { - #[snafu(display("Failed to build the request builder: {}", error_type))] - FailedToBuild { error_type: &'static str }, + #[snafu( + context(false), + display("Failed to build the request builder: {source}") + )] + FailedToBuild { source: CreateError }, - #[snafu(display("Encoding of a metric failed ({})", reason))] - FailedToEncode { - reason: &'static str, - dropped_events: u64, - }, + #[snafu(context(false), display("Failed to encode metric: {source}"))] + FailedToEncode { source: EncoderError }, - #[snafu(display("A split payload was still too big to encode/compress within size limits"))] + #[snafu(display("A split payload was still too big to encode/compress within size limits."))] FailedToSplit { dropped_events: u64 }, - #[snafu(display("An unexpected error occurred"))] + #[snafu(display("An unexpected error occurred: {error_type}"))] Unexpected { error_type: &'static str, dropped_events: u64, @@ -37,78 +36,28 @@ pub enum RequestBuilderError { } impl RequestBuilderError { - /// Converts this error into its constituent parts: the error reason, and how many events were - /// dropped as a result. - pub const fn into_parts(self) -> (&'static str, &'static str, u64) { + /// Converts this error into its constituent parts: the error reason, the error type, and how + /// many events were dropped as a result. + pub fn into_parts(self) -> (String, &'static str, u64) { match self { - Self::FailedToBuild { error_type } => { - ("Failed to build the request builder.", error_type, 0) - } - Self::FailedToEncode { - reason, - dropped_events, - } => ("Encoding of a metric failed.", reason, dropped_events), + Self::FailedToBuild { source } => (source.to_string(), source.as_error_type(), 0), + // Encoding errors always happen at the per-metric level, so we could only ever drop a + // single metric/event at a time. + Self::FailedToEncode { source } => (source.to_string(), source.as_error_type(), 1), Self::FailedToSplit { dropped_events } => ( - "A split payload was still too big to encode/compress withing size limits.", + "A split payload was still too big to encode/compress withing size limits." + .to_string(), "split_failed", dropped_events, ), Self::Unexpected { error_type, dropped_events, - } => ("An unexpected error occurred.", error_type, dropped_events), - } - } -} - -impl From for RequestBuilderError { - fn from(e: CreateError) -> Self { - match e { - CreateError::InvalidLimits => Self::FailedToBuild { - error_type: "invalid_payload_limits", - }, - } - } -} - -impl From for RequestBuilderError { - fn from(e: EncoderError) -> Self { - match e { - // Series metrics (JSON) are encoded incrementally, so we can only ever lose a single - // metric for a JSON encoding failure. - EncoderError::JsonEncodingFailed { source } => Self::FailedToEncode { - reason: match source.classify() { - Category::Io => "json_io", - Category::Syntax => "json_syntax", - Category::Data => "json_data", - Category::Eof => "json_eof", - }, - dropped_events: 1, - }, - // Sketch metrics (Protocol Buffers) are encoded in a single shot, so naturally we would - // expect `dropped_events` to be 1-N, instead of always 1. We should never emit this - // metric when calling `try_encode`, which is where we'd see the JSON variant of it. - // This is because sketch encoding happens at the end. - // - // Thus, we default `dropped_events` to 1, and if we actually hit this error when - // finishing up a payload, we'll fix up the true number of dropped events at that point. - EncoderError::ProtoEncodingFailed { .. } => Self::FailedToEncode { - // `prost` states that for an encoding error specifically, it can only ever fail due - // to insufficient capacity in the encoding buffer. - reason: "protobuf_insufficient_buf_capacity", - dropped_events: 1, - }, - // Not all metric types for valid depending on the configured endpoint of the encoder. - EncoderError::InvalidMetric { metric_value, .. } => Self::FailedToEncode { - // TODO: At some point, it would be nice to use `const_format` to build the reason - // as " _via_" to better understand in what context - // metric X is being considered as invalid. Practically it's not a huge issue, - // because the number of metric types are fixed and we should be able to inspect the - // code for issues, or if it became a big problem, we could just go ahead and do the - // `const_format` work... but it'd be nice to be ahead of curve when trivially possible. - reason: metric_value, - dropped_events: 1, - }, + } => ( + "An unexpected error occurred.".to_string(), + error_type, + dropped_events, + ), } } } @@ -118,7 +67,6 @@ pub struct DDMetricsMetadata { api_key: Option>, endpoint: DatadogMetricsEndpoint, finalizers: EventFinalizers, - raw_bytes: usize, } /// Incremental request builder specific to Datadog metrics. @@ -211,23 +159,22 @@ impl IncrementalRequestBuilder<((Option>, DatadogMetricsEndpoint), Vec< // If we encoded one or more metrics this pass, finalize the payload. if n > 0 { match encoder.finish() { - Ok((payload, mut metrics, raw_bytes_written)) => { - let json_size = metrics.estimated_json_encoded_size_of(); + Ok((payload, mut metrics, uncompressed_payload_len)) => { let finalizers = metrics.take_finalizers(); let metadata = DDMetricsMetadata { api_key: api_key.as_ref().map(Arc::clone), endpoint, finalizers, - raw_bytes: raw_bytes_written, }; - let builder = RequestMetadataBuilder::new( + + let request_metadata = RequestMetadata::new( metrics.len(), - raw_bytes_written, - json_size, + metrics.size_of(), + uncompressed_payload_len, + payload.len(), + metrics.estimated_json_encoded_size_of(), ); - let bytes_len = NonZeroUsize::new(payload.len()) - .expect("payload should never be zero length"); - let request_metadata = builder.with_request_size(bytes_len); + results.push(Ok(((metadata, request_metadata), payload))); } Err(err) => match err { @@ -299,7 +246,6 @@ impl IncrementalRequestBuilder<((Option>, DatadogMetricsEndpoint), Vec< uri, content_type: ddmetrics_metadata.endpoint.content_type(), finalizers: ddmetrics_metadata.finalizers, - raw_bytes: ddmetrics_metadata.raw_bytes, metadata: request_metadata, } } @@ -332,19 +278,22 @@ fn encode_now_or_never( encoder .finish() - .map(|(payload, mut processed, raw_bytes_written)| { - let json_size = processed.estimated_json_encoded_size_of(); + .map(|(payload, mut processed, uncompressed_payload_len)| { let finalizers = processed.take_finalizers(); let ddmetrics_metadata = DDMetricsMetadata { api_key, endpoint, finalizers, - raw_bytes: raw_bytes_written, }; - let builder = RequestMetadataBuilder::new(metrics_len, raw_bytes_written, json_size); - let bytes_len = - NonZeroUsize::new(payload.len()).expect("payload should never be zero length"); - let request_metadata = builder.with_request_size(bytes_len); + + let request_metadata = RequestMetadata::new( + processed.len(), + processed.size_of(), + uncompressed_payload_len, + payload.len(), + processed.estimated_json_encoded_size_of(), + ); + debug!("dd metrics request metadata: {:?}", request_metadata); ((ddmetrics_metadata, request_metadata), payload) }) diff --git a/src/sinks/datadog/metrics/service.rs b/src/sinks/datadog/metrics/service.rs index d15716b99d8ad..af8170daa23bf 100644 --- a/src/sinks/datadog/metrics/service.rs +++ b/src/sinks/datadog/metrics/service.rs @@ -10,10 +10,7 @@ use http::{ use hyper::Body; use snafu::ResultExt; use tower::Service; -use vector_common::{ - json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; +use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, internal_event::CountByteSize, @@ -64,7 +61,6 @@ pub struct DatadogMetricsRequest { pub uri: Uri, pub content_type: &'static str, pub finalizers: EventFinalizers, - pub raw_bytes: usize, pub metadata: RequestMetadata, } @@ -125,9 +121,7 @@ impl MetaDescriptive for DatadogMetricsRequest { pub struct DatadogMetricsResponse { status_code: StatusCode, body: Bytes, - batch_size: usize, - byte_size: JsonSize, - raw_byte_size: usize, + request_metadata: RequestMetadata, } impl DriverResponse for DatadogMetricsResponse { @@ -142,11 +136,15 @@ impl DriverResponse for DatadogMetricsResponse { } fn events_sent(&self) -> CountByteSize { - CountByteSize(self.batch_size, self.byte_size) + CountByteSize( + self.request_metadata.event_count(), + self.request_metadata + .events_estimated_json_encoded_byte_size(), + ) } fn bytes_sent(&self) -> Option { - Some(self.raw_byte_size) + Some(self.request_metadata.request_wire_size()) } } @@ -185,11 +183,7 @@ impl Service for DatadogMetricsService { let api_key = self.api_key.clone(); Box::pin(async move { - let byte_size = request - .get_metadata() - .events_estimated_json_encoded_byte_size(); - let batch_size = request.get_metadata().event_count(); - let raw_byte_size = request.raw_bytes; + let request_metadata = request.get_metadata(); let request = request .into_http_request(api_key) @@ -208,9 +202,7 @@ impl Service for DatadogMetricsService { Ok(DatadogMetricsResponse { status_code: parts.status, body, - batch_size, - byte_size, - raw_byte_size, + request_metadata, }) }) } diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index a85eaaf7a3a11..5ceefc3c487d2 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -123,9 +123,9 @@ where .filter_map(|request| async move { match request { Err(e) => { - let (error_message, error_code, dropped_events) = e.into_parts(); + let (reason, error_code, dropped_events) = e.into_parts(); emit!(DatadogMetricsEncodingError { - error_message, + reason: reason.as_str(), error_code, dropped_events: dropped_events as usize, }); From 5be36b4e0056fd8dbcc93706f4bcb789c705598b Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 26 Jun 2023 14:14:22 -0400 Subject: [PATCH 2/6] more precise value when doing our worst-case-compressed-len check --- src/sinks/datadog/metrics/encoder.rs | 40 +++++++++++++++++++--------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index c043626f82571..b2e0de519af74 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -279,7 +279,8 @@ impl DatadogMetricsEncoder { // assume the worst case while our limits assume the worst case _overhead_. Maybe our // numbers are technically off in the end, but `finish` catches that for us, too. let compressed_len = self.state.writer.get_ref().len(); - if compressed_len + n > self.compressed_limit { + let max_compressed_metric_len = n + max_compressed_overhead_len(n); + if compressed_len + max_compressed_metric_len > self.compressed_limit { return Ok(false); } @@ -566,25 +567,38 @@ const fn max_uncompressed_header_len() -> usize { SERIES_PAYLOAD_HEADER.len() + SERIES_PAYLOAD_FOOTER.len() } +// Datadog ingest APIs accept zlib, which is what we're accounting for here. By default, zlib +// has a 2 byte header and 4 byte CRC trailer. [1] +// +// [1] https://www.zlib.net/zlib_tech.html +const ZLIB_HEADER_TRAILER: usize = 6; + const fn max_compression_overhead_len(compressed_limit: usize) -> usize { - // Datadog ingest APIs accept zlib, which is what we're accounting for here. By default, zlib - // has a 2 byte header and 4 byte CRC trailer. Additionally, Deflate, the underlying - // compression algorithm, has a technique to ensure that input data can't be encoded in such a - // way where it's expanded by a meaningful amount. + // We calculate the overhead as the zlib header/trailer plus the worst case overhead of + // compressing `compressed_limit` bytes, such that we assume all of the data we write may not be + // compressed at all. + // + // [1] https://www.zlib.net/zlib_tech.html + // [2] https://www.bolet.org/~pornin/deflate-flush-fr.html + ZLIB_HEADER_TRAILER + max_compressed_overhead_len(compressed_limit) +} + +const fn max_compressed_overhead_len(len: usize) -> usize { + // Datadog ingest APIs accept zlib, which is what we're accounting for here. // - // This technique allows storing blocks of uncompressed data with only 5 bytes of overhead per - // block. Technically, the blocks can be up to 65KB in Deflate, but modern zlib implementations - // use block sizes of 16KB. [1][2] + // Deflate, the underlying compression algorithm, has a technique to ensure that input data + // can't be encoded in such a way where it's expanded by a meaningful amount. This technique + // allows storing blocks of uncompressed data with only 5 bytes of overhead per block. + // Technically, the blocks can be up to 65KB in Deflate, but modern zlib implementations use + // block sizes of 16KB. [1][2] // - // With all of that said, we calculate the overhead as the header plus trailer plus the given - // compressed size limit, minus the known overhead, multiplied such that it accounts for the - // worse case of entirely uncompressed data. + // We calculate the overhead of compressing a given `len` bytes as the worst case of that many + // bytes being written to the compressor and being unable to be compressed at all // // [1] https://www.zlib.net/zlib_tech.html // [2] https://www.bolet.org/~pornin/deflate-flush-fr.html - const HEADER_TRAILER: usize = 6; const STORED_BLOCK_SIZE: usize = 16384; - HEADER_TRAILER + (1 + compressed_limit.saturating_sub(HEADER_TRAILER) / STORED_BLOCK_SIZE) * 5 + (1 + len.saturating_sub(ZLIB_HEADER_TRAILER) / STORED_BLOCK_SIZE) * 5 } const fn validate_payload_size_limits( From a3c8227c61c1a362608126d34c927a04c24a5cd7 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 26 Jun 2023 15:17:41 -0400 Subject: [PATCH 3/6] address PR feedback --- src/sinks/datadog/metrics/encoder.rs | 3 --- src/sinks/datadog/metrics/request_builder.rs | 1 - 2 files changed, 4 deletions(-) diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index b2e0de519af74..107066dee54a2 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -577,9 +577,6 @@ const fn max_compression_overhead_len(compressed_limit: usize) -> usize { // We calculate the overhead as the zlib header/trailer plus the worst case overhead of // compressing `compressed_limit` bytes, such that we assume all of the data we write may not be // compressed at all. - // - // [1] https://www.zlib.net/zlib_tech.html - // [2] https://www.bolet.org/~pornin/deflate-flush-fr.html ZLIB_HEADER_TRAILER + max_compressed_overhead_len(compressed_limit) } diff --git a/src/sinks/datadog/metrics/request_builder.rs b/src/sinks/datadog/metrics/request_builder.rs index 1125e56607879..88c39ff6bf820 100644 --- a/src/sinks/datadog/metrics/request_builder.rs +++ b/src/sinks/datadog/metrics/request_builder.rs @@ -293,7 +293,6 @@ fn encode_now_or_never( payload.len(), processed.estimated_json_encoded_size_of(), ); - debug!("dd metrics request metadata: {:?}", request_metadata); ((ddmetrics_metadata, request_metadata), payload) }) From 51ef2fd8341705f2e5b5c661d1276d4dcb752d00 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Tue, 27 Jun 2023 11:50:28 -0400 Subject: [PATCH 4/6] fix broken tests --- src/sinks/datadog/metrics/encoder.rs | 59 ++++++++++++++++------------ 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index ccf0c6654ebef..72bd5bc1a0843 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -888,16 +888,9 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, mut processed, raw_bytes) = result.unwrap(); + let (_payload, mut processed) = result.unwrap(); assert_eq!(processed.len(), 1); assert_eq!(expected, processed.pop().unwrap()); - assert_eq!(100, payload.len()); - - // The payload is: - // {"series":[{"metric":"basic_counter","type":"count","interval":null,"points":[[1651664333,3.14]],"tags":[]}]} - // which comes to a total of 98 bytes. - // There are extra bytes that make up the header and footer. These should not be included in the raw bytes. - assert_eq!(109, raw_bytes); } #[test] @@ -918,12 +911,9 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, mut processed, raw_bytes) = result.unwrap(); + let (_payload, mut processed) = result.unwrap(); assert_eq!(processed.len(), 1); assert_eq!(expected, processed.pop().unwrap()); - - assert_eq!(81, payload.len()); - assert_eq!(70, raw_bytes); } #[test] @@ -1055,10 +1045,16 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, processed, raw_bytes_written) = result.unwrap(); - assert_eq!(payload, get_compressed_empty_series_payload()); + let (payload, processed) = result.unwrap(); + assert_eq!( + payload.uncompressed_byte_size, + max_uncompressed_header_len() + ); + assert_eq!( + payload.into_payload(), + get_compressed_empty_series_payload() + ); assert_eq!(processed.len(), 0); - assert_eq!(raw_bytes_written, max_uncompressed_header_len()); } #[test] @@ -1087,10 +1083,13 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, processed, raw_bytes_written) = result.unwrap(); - assert_eq!(payload, get_compressed_empty_sketches_payload()); + let (payload, processed) = result.unwrap(); + assert_eq!(payload.uncompressed_byte_size, 0); + assert_eq!( + payload.into_payload(), + get_compressed_empty_sketches_payload() + ); assert_eq!(processed.len(), 0); - assert_eq!(raw_bytes_written, 0); } #[test] @@ -1122,10 +1121,16 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, processed, raw_bytes_written) = result.unwrap(); - assert_eq!(payload, get_compressed_empty_series_payload()); + let (payload, processed) = result.unwrap(); + assert_eq!( + payload.uncompressed_byte_size, + max_uncompressed_header_len() + ); + assert_eq!( + payload.into_payload(), + get_compressed_empty_series_payload() + ); assert_eq!(processed.len(), 0); - assert_eq!(raw_bytes_written, max_uncompressed_header_len()); } #[test] @@ -1157,10 +1162,13 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, processed, raw_bytes_written) = result.unwrap(); - assert_eq!(payload, get_compressed_empty_sketches_payload()); + let (payload, processed) = result.unwrap(); + assert_eq!(payload.uncompressed_byte_size, 0); + assert_eq!( + payload.into_payload(), + get_compressed_empty_sketches_payload() + ); assert_eq!(processed.len(), 0); - assert_eq!(raw_bytes_written, 0); } fn arb_counter_metric() -> impl Strategy { @@ -1203,7 +1211,8 @@ mod tests { if let Ok(mut encoder) = result { _ = encoder.try_encode(metric); - if let Ok((payload, _processed, _raw_bytes)) = encoder.finish() { + if let Ok((payload, _processed)) = encoder.finish() { + let payload = payload.into_payload(); prop_assert!(payload.len() <= compressed_limit); let result = decompress_payload(payload); From c72aaddedc99ff0f31c35da90fea31e088953c5d Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Tue, 27 Jun 2023 13:48:33 -0400 Subject: [PATCH 5/6] update licenses --- LICENSE-3rdparty.csv | 1 + 1 file changed, 1 insertion(+) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c137f340d73e9..c956f00af7ab0 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -394,6 +394,7 @@ proc-macro2,https://github.com/dtolnay/proc-macro2,MIT OR Apache-2.0,"David Toln proptest,https://github.com/proptest-rs/proptest,MIT OR Apache-2.0,Jason Lingle prost,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco " prost-derive,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Tokio Contributors " +prost-reflect,https://github.com/andrewhickman/prost-reflect,MIT OR Apache-2.0,Andrew Hickman ptr_meta,https://github.com/djkoloski/ptr_meta,MIT,David Koloski pulsar,https://github.com/streamnative/pulsar-rs,MIT OR Apache-2.0,"Colin Stearns , Kevin Stenerson , Geoffroy Couprie " quad-rand,https://github.com/not-fl3/quad-rand,MIT,not-fl3 From f533dd7eff0f36003547ad284ad98d6875928567 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Thu, 29 Jun 2023 09:12:14 -0400 Subject: [PATCH 6/6] address PR feedback --- build.rs | 8 +++---- src/proto/mod.rs | 5 ++--- src/sinks/datadog/metrics/encoder.rs | 32 +++++++++++----------------- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/build.rs b/build.rs index 3db5ce0508997..31c151f4b61d7 100644 --- a/build.rs +++ b/build.rs @@ -132,13 +132,13 @@ fn main() { println!("cargo:rerun-if-changed=proto/vector.proto"); // Create and store the "file descriptor set" from the compiled Protocol Buffers packages. + // // This allows us to use runtime reflection to manually build Protocol Buffers payloads // in a type-safe way, which is necessary for incrementally building certain payloads, like // the ones generated in the `datadog_metrics` sink. - let protobuf_fds_path = std::env::var("OUT_DIR") - .map(PathBuf::from) - .map(|path| path.join("protobuf-fds.bin")) - .expect("OUT_DIR environment variable not set"); + let protobuf_fds_path = + PathBuf::from(std::env::var("OUT_DIR").expect("OUT_DIR environment variable not set")) + .join("protobuf-fds.bin"); let mut prost_build = prost_build::Config::new(); prost_build diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 876d6ebc7ee54..efa1728fb6988 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -9,9 +9,8 @@ pub mod fds { use once_cell::sync::OnceCell; use prost_reflect::DescriptorPool; - static PROTOBUF_FDS: OnceCell = OnceCell::new(); - - pub fn get_protobuf_descriptors() -> &'static DescriptorPool { + pub fn protobuf_descriptors() -> &'static DescriptorPool { + static PROTOBUF_FDS: OnceCell = OnceCell::new(); PROTOBUF_FDS.get_or_init(|| { DescriptorPool::decode(include_bytes!(concat!(env!("OUT_DIR"), "/protobuf-fds.bin")).as_ref()) .expect("should not fail to decode protobuf file descriptor set generated from build script") diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 72bd5bc1a0843..0dd6c393e31b5 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -21,7 +21,7 @@ use super::config::{ }; use crate::{ common::datadog::{DatadogMetricType, DatadogPoint, DatadogSeriesMetric}, - proto::fds::get_protobuf_descriptors, + proto::fds::protobuf_descriptors, sinks::util::{encode_namespace, request_builder::EncodeResult, Compression, Compressor}, }; @@ -364,7 +364,7 @@ impl DatadogMetricsEncoder { fn get_sketch_payload_sketches_field_number() -> u32 { static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceCell = OnceCell::new(); *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| { - let descriptors = get_protobuf_descriptors(); + let descriptors = protobuf_descriptors(); let descriptor = descriptors .get_message_by_name("datadog.agentpayload.SketchPayload") .expect("should not fail to find `SketchPayload` message in descriptor pool"); @@ -770,25 +770,19 @@ mod tests { { let mut sketches = Vec::new(); for metric in metrics { - match metric.value() { - MetricValue::Sketch { sketch } => match sketch { - MetricSketch::AgentDDSketch(ddsketch) => { - // Don't encode any empty sketches. - if ddsketch.is_empty() { - continue; - } + let MetricValue::Sketch { sketch } = metric.value() else { panic!("must be sketch") }; + match sketch { + MetricSketch::AgentDDSketch(ddsketch) => { + // Don't encode any empty sketches. + if ddsketch.is_empty() { + continue; + } - let sketch = sketch_to_proto_message( - metric, - ddsketch, - default_namespace, - log_schema, - ); + let sketch = + sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema); - sketches.push(sketch); - } - }, - _ => panic!("bad"), + sketches.push(sketch); + } } }