From ae0aa11c409a5bb6b74c35adf22a06dcd1b42895 Mon Sep 17 00:00:00 2001 From: Luke Steensen Date: Mon, 11 Sep 2023 09:05:26 -0500 Subject: [PATCH] fix: skip encoding empty sketches (#18530) Signed-off-by: Luke Steensen --- src/sinks/datadog/metrics/encoder.rs | 75 ++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 21 deletions(-) diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 0e0ad6c685291..d6aa869999f11 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -390,7 +390,11 @@ fn sketch_to_proto_message( ddsketch: &AgentDDSketch, default_namespace: &Option>, log_schema: &'static LogSchema, -) -> ddmetric_proto::sketch_payload::Sketch { +) -> Option { + if ddsketch.is_empty() { + return None; + } + let name = get_namespaced_name(metric, default_namespace); let ts = encode_timestamp(metric.timestamp()); let mut tags = metric.tags().cloned().unwrap_or_default(); @@ -418,7 +422,7 @@ fn sketch_to_proto_message( let k = bins.into_iter().map(Into::into).collect(); let n = counts.into_iter().map(Into::into).collect(); - ddmetric_proto::sketch_payload::Sketch { + Some(ddmetric_proto::sketch_payload::Sketch { metric: name, tags, host, @@ -433,7 +437,7 @@ fn sketch_to_proto_message( k, n, }], - } + }) } fn encode_sketch_incremental( @@ -459,16 +463,21 @@ where // for `SketchPayload` with a single sketch looks just like as if we literally wrote out a // single value for the given field. - let sketch_proto = sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema); - - // Manually write the field tag for `sketches` and then encode the sketch payload directly as a - // length-delimited message. - prost::encoding::encode_key( - get_sketch_payload_sketches_field_number(), - prost::encoding::WireType::LengthDelimited, - buf, - ); - sketch_proto.encode_length_delimited(buf) + if let Some(sketch_proto) = + sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema) + { + // Manually write the field tag for `sketches` and then encode the sketch payload directly as a + // length-delimited message. + prost::encoding::encode_key( + get_sketch_payload_sketches_field_number(), + prost::encoding::WireType::LengthDelimited, + buf, + ); + sketch_proto.encode_length_delimited(buf) + } else { + // If the sketch was empty, that's fine too + Ok(()) + } } fn get_namespaced_name(metric: &Metric, default_namespace: &Option>) -> String { @@ -790,15 +799,11 @@ mod tests { }; match sketch { MetricSketch::AgentDDSketch(ddsketch) => { - // Don't encode any empty sketches. - if ddsketch.is_empty() { - continue; + if let Some(sketch) = + sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema) + { + sketches.push(sketch); } - - let sketch = - sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema); - - sketches.push(sketch); } } } @@ -927,6 +932,34 @@ mod tests { assert_eq!(expected, processed.pop().unwrap()); } + #[test] + fn encode_empty_sketch() { + // This is a simple test where we ensure that a single metric, with the default limits, can + // be encoded without hitting any errors. + let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None) + .expect("default payload size limits should be valid"); + let sketch = Metric::new( + "empty", + MetricKind::Incremental, + AgentDDSketch::with_agent_defaults().into(), + ) + .with_timestamp(Some(ts())); + let expected = sketch.clone(); + + // Encode the sketch. + let result = encoder.try_encode(sketch); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), None); + + // Finish the payload, make sure we got what we came for. + let result = encoder.finish(); + assert!(result.is_ok()); + + let (_payload, mut processed) = result.unwrap(); + assert_eq!(processed.len(), 1); + assert_eq!(expected, processed.pop().unwrap()); + } + #[test] fn encode_multiple_sketch_metrics_normal_vs_incremental() { // This tests our incremental sketch encoding against the more straightforward approach of