diff --git a/lib/codecs/src/encoding/format/native_json.rs b/lib/codecs/src/encoding/format/native_json.rs index 854bba9d97ec2..94fd6b3c14527 100644 --- a/lib/codecs/src/encoding/format/native_json.rs +++ b/lib/codecs/src/encoding/format/native_json.rs @@ -52,7 +52,8 @@ impl Encoder for NativeJsonSerializer { #[cfg(test)] mod tests { use bytes::BytesMut; - use vector_core::event::{LogEvent, Value}; + use vector_core::buckets; + use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, Value}; use vrl::btreemap; use super::*; @@ -84,4 +85,25 @@ mod tests { assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap()); } + + #[test] + fn serialize_aggregated_histogram() { + let histogram_event = Event::from(Metric::new( + "histogram", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 1, + sum: 1.0, + buckets: buckets!(f64::NEG_INFINITY => 0 ,2.0 => 1, f64::INFINITY => 0), + }, + )); + + let mut serializer = NativeJsonSerializer::new(); + let mut bytes = BytesMut::new(); + serializer + .encode(histogram_event.clone(), &mut bytes) + .unwrap(); + let json = serializer.to_json_value(histogram_event).unwrap(); + assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap()); + } } diff --git a/lib/codecs/tests/native_json.rs b/lib/codecs/tests/native_json.rs new file mode 100644 index 0000000000000..dcc08bdaa187b --- /dev/null +++ b/lib/codecs/tests/native_json.rs @@ -0,0 +1,86 @@ +use bytes::BytesMut; +use codecs::decoding::format::Deserializer; +use codecs::encoding::format::Serializer; +use codecs::{NativeJsonDeserializerConfig, NativeJsonSerializerConfig}; +use vector_core::buckets; +use vector_core::config::LogNamespace; +use vector_core::event::{Event, Metric}; +use vector_core::event::{MetricKind, MetricValue}; + +fn assert_roundtrip( + input_event: Event, + serializer: &mut dyn Serializer, + deserializer: &dyn Deserializer, + expected_json_value: serde_json::Value, +) { + let mut bytes_mut = BytesMut::new(); + serializer + .encode(input_event.clone(), &mut bytes_mut) + .unwrap(); + let bytes = bytes_mut.freeze(); + let events = deserializer.parse(bytes, LogNamespace::Vector).unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0], input_event); + + let json_value = serde_json::to_value(input_event.as_metric()).unwrap(); + assert_eq!(json_value, expected_json_value); +} + +#[test] +fn histogram_metric_roundtrip() { + let histogram_event = Event::from(Metric::new( + "histogram", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 1, + sum: 1.0, + buckets: buckets!( + f64::NEG_INFINITY => 10 , + f64::MIN => 10, 1.5 => 10, + f64::MAX => 10, + f64::INFINITY => 10), + }, + )); + + let expected_json_value = serde_json::from_str( + r#" + { + "aggregated_histogram": { + "buckets": [ + { + "count": 10, + "upper_limit": "-inf" + }, + { + "count": 10, + "upper_limit": -1.7976931348623157e308 + }, + { + "count": 10, + "upper_limit": 1.5 + }, + { + "count": 10, + "upper_limit": 1.7976931348623157e308 + }, + { + "count": 10, + "upper_limit": "inf" + } + ], + "count": 1, + "sum": 1.0 + }, + "kind": "absolute", + "name": "histogram" + }"#, + ) + .unwrap(); + + assert_roundtrip( + histogram_event, + &mut NativeJsonSerializerConfig.build(), + &NativeJsonDeserializerConfig::default().build(), + expected_json_value, + ) +} diff --git a/lib/vector-core/src/event/metric/value.rs b/lib/vector-core/src/event/metric/value.rs index 6e27e61f3848d..a260f72e8fa34 100644 --- a/lib/vector-core/src/event/metric/value.rs +++ b/lib/vector-core/src/event/metric/value.rs @@ -1,12 +1,19 @@ use core::fmt; use std::collections::BTreeSet; +use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; + use vector_common::byte_size_of::ByteSizeOf; use vector_config::configurable_component; -use super::{samples_to_buckets, write_list, write_word}; use crate::{float_eq, metrics::AgentDDSketch}; +use super::{samples_to_buckets, write_list, write_word}; + +const INFINITY: &str = "inf"; +const NEG_INFINITY: &str = "-inf"; +const NAN: &str = "NaN"; + /// Metric value. #[configurable_component] #[derive(Clone, Debug)] @@ -597,14 +604,62 @@ impl ByteSizeOf for Sample { } } +/// Custom serialization function which converts special `f64` values to strings. +/// Non-special values are serialized as numbers. +#[allow(clippy::trivially_copy_pass_by_ref)] +fn serialize_f64(value: &f64, serializer: S) -> Result +where + S: Serializer, +{ + if value.is_infinite() { + serializer.serialize_str(if *value > 0.0 { INFINITY } else { NEG_INFINITY }) + } else if value.is_nan() { + serializer.serialize_str(NAN) + } else { + serializer.serialize_f64(*value) + } +} + +/// Custom deserialization function for handling special f64 values. +fn deserialize_f64<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + struct UpperLimitVisitor; + + impl<'de> de::Visitor<'de> for UpperLimitVisitor { + type Value = f64; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a number or a special string value") + } + + fn visit_f64(self, value: f64) -> Result { + Ok(value) + } + + fn visit_str(self, value: &str) -> Result { + match value { + NAN => Ok(f64::NAN), + INFINITY => Ok(f64::INFINITY), + NEG_INFINITY => Ok(f64::NEG_INFINITY), + _ => Err(E::custom("unsupported string value")), + } + } + } + + deserializer.deserialize_any(UpperLimitVisitor) +} + /// A histogram bucket. /// /// Histogram buckets represent the `count` of observations where the value of the observations does /// not exceed the specified `upper_limit`. -#[configurable_component] -#[derive(Clone, Copy, Debug)] +#[configurable_component(no_deser, no_ser)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct Bucket { /// The upper limit of values in the bucket. + #[serde(serialize_with = "serialize_f64", deserialize_with = "deserialize_f64")] pub upper_limit: f64, /// The number of values tracked in this bucket.