Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(codecs): add lossy option to gelf, native_json, and syslog deserializers #17680

Merged
merged 3 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use bytes::Bytes;
use chrono::{DateTime, NaiveDateTime, Utc};
use derivative::Derivative;
use lookup::{event_path, owned_value_path, PathPrefix};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::collections::HashMap;
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::{
config::{log_schema, DataType},
Expand All @@ -14,7 +16,7 @@ use vector_core::{
use vrl::value::kind::Collection;
use vrl::value::{Kind, Value};

use super::Deserializer;
use super::{default_lossy, Deserializer};
use crate::{gelf_fields::*, VALID_FIELD_REGEX};

/// On GELF decoding behavior:
Expand All @@ -25,12 +27,26 @@ use crate::{gelf_fields::*, VALID_FIELD_REGEX};

/// Config used to build a `GelfDeserializer`.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct GelfDeserializerConfig;
pub struct GelfDeserializerConfig {
#[serde(
default,
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
/// GELF-specific decoding options.
pub gelf: GelfDeserializerOptions,
}

impl GelfDeserializerConfig {
/// Creates a new `GelfDeserializerConfig`.
pub fn new(options: GelfDeserializerOptions) -> Self {
Self { gelf: options }
}

/// Build the `GelfDeserializer` from this configuration.
pub fn build(&self) -> GelfDeserializer {
GelfDeserializer::default()
GelfDeserializer {
lossy: self.gelf.lossy,
}
}

/// Return the type of event built by this deserializer.
Expand Down Expand Up @@ -60,21 +76,36 @@ impl GelfDeserializerConfig {
}
}

/// Deserializer that builds an `Event` from a byte frame containing a GELF log
/// message.
#[derive(Debug, Clone)]
pub struct GelfDeserializer;
/// GELF-specific decoding options.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct GelfDeserializerOptions {
/// Determines whether or not to replace invalid UTF-8 sequences instead of failing.
///
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
///
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
#[derivative(Default(value = "default_lossy()"))]
pub lossy: bool,
}

impl Default for GelfDeserializer {
fn default() -> Self {
Self::new()
}
/// Deserializer that builds an `Event` from a byte frame containing a GELF log message.
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct GelfDeserializer {
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}

impl GelfDeserializer {
/// Create a new GelfDeserializer
pub fn new() -> GelfDeserializer {
GelfDeserializer
/// Create a new `GelfDeserializer`.
pub fn new(lossy: bool) -> GelfDeserializer {
GelfDeserializer { lossy }
}

/// Builds a LogEvent from the parsed GelfMessage.
Expand Down Expand Up @@ -195,10 +226,10 @@ impl Deserializer for GelfDeserializer {
bytes: Bytes,
_log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
let line = std::str::from_utf8(&bytes)?;
let line = line.trim();

let parsed: GelfMessage = serde_json::from_str(line)?;
let parsed: GelfMessage = match self.lossy {
true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
false => serde_json::from_slice(&bytes),
}?;
let event = self.message_to_event(&parsed)?;

Ok(smallvec![event])
Expand All @@ -220,7 +251,7 @@ mod tests {
fn deserialize_gelf_input(
input: &serde_json::Value,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
let config = GelfDeserializerConfig;
let config = GelfDeserializerConfig::default();
let deserializer = config.build();
let buffer = Bytes::from(serde_json::to_vec(&input).unwrap());
deserializer.parse(buffer, LogNamespace::Legacy)
Expand Down
57 changes: 26 additions & 31 deletions lib/codecs/src/decoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use lookup::PathPrefix;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::{
Expand All @@ -14,42 +13,27 @@ use vector_core::{
};
use vrl::value::Kind;

use super::Deserializer;
use super::{default_lossy, Deserializer};

/// Config used to build a `JsonDeserializer`.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct JsonDeserializerConfig {
#[serde(
default,
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
/// Options for the JSON deserializer.
pub json: JsonDeserializerOptions,
}

/// JSON-specific decoding options.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct JsonDeserializerOptions {
/// Determines whether or not to replace invalid UTF-8 sequences instead of returning an error.
///
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
///
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
pub struct JsonDeserializerConfig {
#[serde(
default = "default_lossy",
default,
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}

const fn default_lossy() -> bool {
true
/// JSON-specific decoding options.
pub json: JsonDeserializerOptions,
}

impl JsonDeserializerConfig {
/// Creates a new `JsonDeserializerConfig`.
pub fn new(options: JsonDeserializerOptions) -> Self {
Self { json: options }
}

/// Build the `JsonDeserializer` from this configuration.
pub fn build(&self) -> JsonDeserializer {
Into::<JsonDeserializer>::into(self)
Expand Down Expand Up @@ -85,11 +69,22 @@ impl JsonDeserializerConfig {
}
}

impl JsonDeserializerConfig {
/// Creates a new `JsonDeserializerConfig`.
pub fn new(options: JsonDeserializerOptions) -> Self {
Self { json: options }
}
/// JSON-specific decoding options.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct JsonDeserializerOptions {
/// Determines whether or not to replace invalid UTF-8 sequences instead of failing.
///
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
///
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
#[derivative(Default(value = "default_lossy()"))]
pub lossy: bool,
}

/// Deserializer that builds `Event`s from a byte frame containing JSON.
Expand Down
15 changes: 11 additions & 4 deletions lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ mod syslog;

use ::bytes::Bytes;
use dyn_clone::DynClone;
pub use gelf::{GelfDeserializer, GelfDeserializerConfig};
pub use gelf::{GelfDeserializer, GelfDeserializerConfig, GelfDeserializerOptions};
pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions};
pub use native::{NativeDeserializer, NativeDeserializerConfig};
pub use native_json::{NativeJsonDeserializer, NativeJsonDeserializerConfig};
pub use native_json::{
NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions,
};
use smallvec::SmallVec;
#[cfg(feature = "syslog")]
pub use syslog::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
use vector_core::config::LogNamespace;
use vector_core::event::Event;

pub use self::bytes::{BytesDeserializer, BytesDeserializerConfig};
#[cfg(feature = "syslog")]
pub use self::syslog::{SyslogDeserializer, SyslogDeserializerConfig};

/// Parse structured events from bytes.
pub trait Deserializer: DynClone + Send + Sync {
Expand All @@ -44,3 +46,8 @@ dyn_clone::clone_trait_object!(Deserializer);

/// A `Box` containing a `Deserializer`.
pub type BoxedDeserializer = Box<dyn Deserializer>;

/// Default value for the UTF-8 lossy option.
const fn default_lossy() -> bool {
true
}
57 changes: 48 additions & 9 deletions lib/codecs/src/decoding/format/native_json.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
use bytes::Bytes;
use derivative::Derivative;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};
use vrl::value::kind::Collection;
use vrl::value::Kind;

use super::Deserializer;
use super::{default_lossy, Deserializer};
use vector_core::config::LogNamespace;

/// Config used to build a `NativeJsonDeserializer`.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct NativeJsonDeserializerConfig;
pub struct NativeJsonDeserializerConfig {
/// Vector's native JSON-specific decoding options.
pub native_json: NativeJsonDeserializerOptions,
}

impl NativeJsonDeserializerConfig {
/// Creates a new `NativeJsonDeserializerConfig`.
pub fn new(options: NativeJsonDeserializerOptions) -> Self {
Self {
native_json: options,
}
}

/// Build the `NativeJsonDeserializer` from this configuration.
pub const fn build(&self) -> NativeJsonDeserializer {
NativeJsonDeserializer
pub fn build(&self) -> NativeJsonDeserializer {
NativeJsonDeserializer {
lossy: self.native_json.lossy,
}
}

/// Return the type of event build by this deserializer.
Expand All @@ -37,10 +51,32 @@ impl NativeJsonDeserializerConfig {
}
}

/// Vector's native JSON-specific decoding options.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct NativeJsonDeserializerOptions {
/// Determines whether or not to replace invalid UTF-8 sequences instead of failing.
///
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
///
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
#[derivative(Default(value = "default_lossy()"))]
pub lossy: bool,
}

/// Deserializer that builds `Event`s from a byte frame containing Vector's native JSON
/// representation.
#[derive(Debug, Clone, Default)]
pub struct NativeJsonDeserializer;
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct NativeJsonDeserializer {
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}

impl Deserializer for NativeJsonDeserializer {
fn parse(
Expand All @@ -56,8 +92,11 @@ impl Deserializer for NativeJsonDeserializer {
return Ok(smallvec![]);
}

let json: serde_json::Value = serde_json::from_slice(&bytes)
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;
let json: serde_json::Value = match self.lossy {
true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
false => serde_json::from_slice(&bytes),
}
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;

let events = match json {
serde_json::Value::Array(values) => values
Expand All @@ -79,7 +118,7 @@ mod test {

#[test]
fn parses_top_level_arrays() {
let config = NativeJsonDeserializerConfig;
let config = NativeJsonDeserializerConfig::default();
let deserializer = config.build();

let json1 = json!({"a": "b", "c": "d"});
Expand Down
Loading