From f15144bb16a5d7a7389b20e2625c162101907f02 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 24 Aug 2023 15:50:07 -0400 Subject: [PATCH] fix(json codec): Fix deserializing non-object values with the `Vector` namespace (#18379) * fix remap array return values when using the Vector namespace * fix json codec when using vector namespace --- lib/codecs/src/decoding/format/json.rs | 23 ++++++-- lib/codecs/src/decoding/format/native_json.rs | 4 +- lib/vector-core/src/event/mod.rs | 52 ++++++++++--------- src/transforms/remap.rs | 47 ++++++++++++----- src/transforms/route.rs | 17 ++++-- 5 files changed, 95 insertions(+), 48 deletions(-) diff --git a/lib/codecs/src/decoding/format/json.rs b/lib/codecs/src/decoding/format/json.rs index 0906f739b803a..e842898cf5d6f 100644 --- a/lib/codecs/src/decoding/format/json.rs +++ b/lib/codecs/src/decoding/format/json.rs @@ -1,5 +1,3 @@ -use std::convert::TryInto; - use bytes::Bytes; use chrono::Utc; use derivative::Derivative; @@ -122,9 +120,9 @@ impl Deserializer for JsonDeserializer { let mut events = match json { serde_json::Value::Array(values) => values .into_iter() - .map(TryInto::try_into) + .map(|json| Event::from_json_value(json, log_namespace)) .collect::, _>>()?, - _ => smallvec![json.try_into()?], + _ => smallvec![Event::from_json_value(json, log_namespace)?], }; let events = match log_namespace { @@ -160,6 +158,7 @@ impl From<&JsonDeserializerConfig> for JsonDeserializer { #[cfg(test)] mod tests { use vector_core::config::log_schema; + use vrl::core::Value; use super::*; @@ -190,6 +189,22 @@ mod tests { } } + #[test] + fn deserialize_non_object_vector_namespace() { + let input = Bytes::from(r#"null"#); + let deserializer = JsonDeserializer::default(); + + let namespace = LogNamespace::Vector; + let events = deserializer.parse(input.clone(), namespace).unwrap(); + let mut events = events.into_iter(); + + let event = events.next().unwrap(); + let log = event.as_log(); + assert_eq!(log["."], Value::Null); + + assert_eq!(events.next(), None); + } + #[test] fn deserialize_json_array() { let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#); diff --git a/lib/codecs/src/decoding/format/native_json.rs b/lib/codecs/src/decoding/format/native_json.rs index 43e09ec86f5f6..d23c50a996cc9 100644 --- a/lib/codecs/src/decoding/format/native_json.rs +++ b/lib/codecs/src/decoding/format/native_json.rs @@ -132,8 +132,8 @@ mod test { let events = deserializer.parse(input, LogNamespace::Legacy).unwrap(); - let event1 = Event::try_from(json1).unwrap(); - let event2 = Event::try_from(json2).unwrap(); + let event1 = Event::from_json_value(json1, LogNamespace::Legacy).unwrap(); + let event2 = Event::from_json_value(json2, LogNamespace::Legacy).unwrap(); let expected: SmallVec<[Event; 1]> = smallvec![event1, event2]; assert_eq!(events, expected); } diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index 5c385df1fd913..fb38633c811f8 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -1,10 +1,6 @@ -use std::{ - collections::BTreeMap, - convert::{TryFrom, TryInto}, - fmt::Debug, - sync::Arc, -}; +use std::{collections::BTreeMap, convert::TryInto, fmt::Debug, sync::Arc}; +use crate::config::LogNamespace; use crate::{config::OutputId, ByteSizeOf}; pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray}; pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf; @@ -324,6 +320,31 @@ impl Event { self.metadata_mut().set_upstream_id(upstream_id); self } + + /// Creates an Event from a JSON value. + /// + /// # Errors + /// If a non-object JSON value is passed in with the `Legacy` namespace, this will return an error. + pub fn from_json_value( + value: serde_json::Value, + log_namespace: LogNamespace, + ) -> crate::Result { + match log_namespace { + LogNamespace::Vector => Ok(LogEvent::from(Value::from(value)).into()), + LogNamespace::Legacy => match value { + serde_json::Value::Object(fields) => Ok(LogEvent::from( + fields + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect::>(), + ) + .into()), + _ => Err(crate::Error::from( + "Attempted to convert non-Object JSON into an Event.", + )), + }, + } + } } impl EventDataEq for Event { @@ -348,25 +369,6 @@ impl finalization::AddBatchNotifier for Event { } } -impl TryFrom for Event { - type Error = crate::Error; - - fn try_from(map: serde_json::Value) -> Result { - match map { - serde_json::Value::Object(fields) => Ok(LogEvent::from( - fields - .into_iter() - .map(|(k, v)| (k, v.into())) - .collect::>(), - ) - .into()), - _ => Err(crate::Error::from( - "Attempted to convert non-Object JSON into an Event.", - )), - } - } -} - impl TryInto for Event { type Error = serde_json::Error; diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index 18b5abe0e5516..dad1e54d95c62 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -1024,8 +1024,11 @@ mod tests { #[test] fn remap_timezone_fallback() { - let error = - Event::try_from(serde_json::json!({"timestamp": "2022-12-27 00:00:00"})).unwrap(); + let error = Event::from_json_value( + serde_json::json!({"timestamp": "2022-12-27 00:00:00"}), + LogNamespace::Legacy, + ) + .unwrap(); let conf = RemapConfig { source: Some(formatdoc! {r#" .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S") @@ -1058,8 +1061,11 @@ mod tests { #[test] fn remap_timezone_override() { - let error = - Event::try_from(serde_json::json!({"timestamp": "2022-12-27 00:00:00"})).unwrap(); + let error = Event::from_json_value( + serde_json::json!({"timestamp": "2022-12-27 00:00:00"}), + LogNamespace::Legacy, + ) + .unwrap(); let conf = RemapConfig { source: Some(formatdoc! {r#" .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S") @@ -1093,9 +1099,16 @@ mod tests { #[test] fn check_remap_branching() { - let happy = Event::try_from(serde_json::json!({"hello": "world"})).unwrap(); - let abort = Event::try_from(serde_json::json!({"hello": "goodbye"})).unwrap(); - let error = Event::try_from(serde_json::json!({"hello": 42})).unwrap(); + let happy = + Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy) + .unwrap(); + let abort = Event::from_json_value( + serde_json::json!({"hello": "goodbye"}), + LogNamespace::Legacy, + ) + .unwrap(); + let error = + Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap(); let happy_metric = { let mut metric = Metric::new( @@ -1287,9 +1300,9 @@ mod tests { #[test] fn check_remap_branching_assert_with_message() { let error_trigger_assert_custom_message = - Event::try_from(serde_json::json!({"hello": 42})).unwrap(); + Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap(); let error_trigger_default_assert_message = - Event::try_from(serde_json::json!({"hello": 0})).unwrap(); + Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap(); let conf = RemapConfig { source: Some(formatdoc! {r#" assert_eq!(.hello, 0, "custom message here") @@ -1349,7 +1362,8 @@ mod tests { #[test] fn check_remap_branching_abort_with_message() { - let error = Event::try_from(serde_json::json!({"hello": 42})).unwrap(); + let error = + Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap(); let conf = RemapConfig { source: Some(formatdoc! {r#" abort "custom message here" @@ -1387,9 +1401,16 @@ mod tests { #[test] fn check_remap_branching_disabled() { - let happy = Event::try_from(serde_json::json!({"hello": "world"})).unwrap(); - let abort = Event::try_from(serde_json::json!({"hello": "goodbye"})).unwrap(); - let error = Event::try_from(serde_json::json!({"hello": 42})).unwrap(); + let happy = + Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy) + .unwrap(); + let abort = Event::from_json_value( + serde_json::json!({"hello": "goodbye"}), + LogNamespace::Legacy, + ) + .unwrap(); + let error = + Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap(); let conf = RemapConfig { source: Some(formatdoc! {r#" diff --git a/src/transforms/route.rs b/src/transforms/route.rs index 721d92a9789ac..a7d71c4fc4696 100644 --- a/src/transforms/route.rs +++ b/src/transforms/route.rs @@ -189,8 +189,9 @@ mod test { #[test] fn route_pass_all_route_conditions() { let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE]; - let event = Event::try_from( + let event = Event::from_json_value( serde_json::json!({"message": "hello world", "second": "second", "third": "third"}), + LogNamespace::Legacy, ) .unwrap(); let config = toml::from_str::( @@ -234,7 +235,11 @@ mod test { #[test] fn route_pass_one_route_condition() { let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE]; - let event = Event::try_from(serde_json::json!({"message": "hello world"})).unwrap(); + let event = Event::from_json_value( + serde_json::json!({"message": "hello world"}), + LogNamespace::Legacy, + ) + .unwrap(); let config = toml::from_str::( r#" route.first.type = "vrl" @@ -275,7 +280,9 @@ mod test { #[test] fn route_pass_no_route_condition() { let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE]; - let event = Event::try_from(serde_json::json!({"message": "NOPE"})).unwrap(); + let event = + Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy) + .unwrap(); let config = toml::from_str::( r#" route.first.type = "vrl" @@ -316,7 +323,9 @@ mod test { #[test] fn route_no_unmatched_output() { let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE]; - let event = Event::try_from(serde_json::json!({"message": "NOPE"})).unwrap(); + let event = + Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy) + .unwrap(); let config = toml::from_str::( r#" reroute_unmatched = false