From 03d74c8b053307ef456ad4c33ebb21275ff8c3b4 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 8 Aug 2023 17:29:16 -0400 Subject: [PATCH 1/5] feat: disable vrl 'string_path' feature --- Cargo.toml | 2 ++ lib/codecs/src/decoding/format/gelf.rs | 2 +- lib/codecs/src/encoding/format/gelf.rs | 5 ++++- src/api/schema/events/log.rs | 7 ++++--- src/sinks/datadog/events/sink.rs | 14 ++++++------- src/sinks/mezmo.rs | 7 ++++--- src/sinks/sematext/logs.rs | 5 +++-- src/sources/amqp.rs | 15 ++++++-------- src/sources/docker_logs/mod.rs | 27 +++++++++++++------------ src/sources/gcp_pubsub.rs | 9 +++++---- src/sources/nats.rs | 2 +- src/sources/splunk_hec/mod.rs | 10 ++++----- src/sources/syslog.rs | 7 ++++--- src/transforms/lua/v1/mod.rs | 26 +++++++++++++++--------- src/transforms/metric_to_log.rs | 2 +- src/transforms/reduce/merge_strategy.rs | 18 ++++++++++------- src/transforms/remap.rs | 16 +++++++-------- 17 files changed, 97 insertions(+), 77 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 83babb5661e22..d0a7d591cca28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -370,6 +370,8 @@ tokio = { version = "1.30.0", features = ["test-util"] } tokio-test = "0.4.2" tower-test = "0.4.0" vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] } +vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] } + wiremock = "0.5.19" zstd = { version = "0.12.4", default-features = false } diff --git a/lib/codecs/src/decoding/format/gelf.rs b/lib/codecs/src/decoding/format/gelf.rs index 88d7989c7e210..77633c328d9d0 100644 --- a/lib/codecs/src/decoding/format/gelf.rs +++ b/lib/codecs/src/decoding/format/gelf.rs @@ -360,7 +360,7 @@ mod tests { let events = deserialize_gelf_input(&input).unwrap(); assert_eq!(events.len(), 1); let log = events[0].as_log(); - assert!(!log.contains("_id")); + assert!(!log.contains(event_path!("_id"))); } } diff --git a/lib/codecs/src/encoding/format/gelf.rs b/lib/codecs/src/encoding/format/gelf.rs index d35bad66decce..363a377d4c519 100644 --- a/lib/codecs/src/encoding/format/gelf.rs +++ b/lib/codecs/src/encoding/format/gelf.rs @@ -224,7 +224,10 @@ fn to_gelf_event(log: LogEvent) -> vector_common::Result { coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| { // rename additional fields that were flagged as missing the underscore prefix for field in missing_prefix { - log.rename_key(event_path!(field.as_str()), format!("_{}", &field).as_str()); + log.rename_key( + event_path!(field.as_str()), + event_path!(format!("_{}", &field).as_str()), + ); } log }) diff --git a/src/api/schema/events/log.rs b/src/api/schema/events/log.rs index 251de727c27e7..ad53474622510 100644 --- a/src/api/schema/events/log.rs +++ b/src/api/schema/events/log.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use async_graphql::Object; use chrono::{DateTime, Utc}; use vector_common::encode_logfmt; +use vrl::event_path; use super::EventEncodingType; use crate::{event, topology::TapOutput}; @@ -19,11 +20,11 @@ impl Log { } pub fn get_message(&self) -> Option> { - Some(self.event.get("message")?.to_string_lossy()) + Some(self.event.get(event_path!("message"))?.to_string_lossy()) } pub fn get_timestamp(&self) -> Option<&DateTime> { - self.event.get("timestamp")?.as_timestamp() + self.event.get(event_path!("timestamp"))?.as_timestamp() } } @@ -69,7 +70,7 @@ impl Log { /// Get JSON field data on the log event, by field name async fn json(&self, field: String) -> Option { - self.event.get(field.as_str()).map(|field| { + self.event.get(event_path!(field.as_str())).map(|field| { serde_json::to_string(field) .expect("JSON serialization of trace event field failed. Please report.") }) diff --git a/src/sinks/datadog/events/sink.rs b/src/sinks/datadog/events/sink.rs index 4a7a1a04facd7..a85d23d829093 100644 --- a/src/sinks/datadog/events/sink.rs +++ b/src/sinks/datadog/events/sink.rs @@ -50,12 +50,12 @@ where async fn ensure_required_fields(event: Event) -> Option { let mut log = event.into_log(); - if !log.contains("title") { + if !log.contains(event_path!("title")) { emit!(ParserMissingFieldError:: { field: "title" }); return None; } - if !log.contains("text") { + if !log.contains(event_path!("text")) { let message_path = log .message_path() .expect("message is required (make sure the \"message\" semantic meaning is set)") @@ -63,21 +63,21 @@ async fn ensure_required_fields(event: Event) -> Option { log.rename_key(&message_path, event_path!("text")); } - if !log.contains("host") { + if !log.contains(event_path!("host")) { if let Some(host_path) = log.host_path().cloned().as_ref() { log.rename_key(host_path, event_path!("host")); } } - if !log.contains("date_happened") { + if !log.contains(event_path!("date_happened")) { if let Some(timestamp_path) = log.timestamp_path().cloned().as_ref() { - log.rename_key(timestamp_path, "date_happened"); + log.rename_key(timestamp_path, event_path!("date_happened")); } } - if !log.contains("source_type_name") { + if !log.contains(event_path!("source_type_name")) { if let Some(source_type_path) = log.source_type_path().cloned().as_ref() { - log.rename_key(source_type_path, "source_type_name"); + log.rename_key(source_type_path, event_path!("source_type_name")); } } diff --git a/src/sinks/mezmo.rs b/src/sinks/mezmo.rs index 97d96f40c8c40..25999e73163f0 100644 --- a/src/sinks/mezmo.rs +++ b/src/sinks/mezmo.rs @@ -6,6 +6,7 @@ use http::{Request, StatusCode, Uri}; use serde_json::json; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; +use vrl::event_path; use vrl::value::{Kind, Value}; use crate::{ @@ -269,15 +270,15 @@ impl HttpEventEncoder> for map.insert("line".to_string(), json!(line)); map.insert("timestamp".to_string(), json!(timestamp)); - if let Some(env) = log.remove("env") { + if let Some(env) = log.remove(event_path!("env")) { map.insert("env".to_string(), json!(env)); } - if let Some(app) = log.remove("app") { + if let Some(app) = log.remove(event_path!("app")) { map.insert("app".to_string(), json!(app)); } - if let Some(file) = log.remove("file") { + if let Some(file) = log.remove(event_path!("file")) { map.insert("file".to_string(), json!(file)); } diff --git a/src/sinks/sematext/logs.rs b/src/sinks/sematext/logs.rs index e5484b5dbd859..e5ba64f7e9524 100644 --- a/src/sinks/sematext/logs.rs +++ b/src/sinks/sematext/logs.rs @@ -3,6 +3,7 @@ use futures::stream::{BoxStream, StreamExt}; use indoc::indoc; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; +use vrl::event_path; use super::Region; use crate::{ @@ -144,11 +145,11 @@ fn map_timestamp(mut events: EventArray) -> EventArray { EventArray::Logs(logs) => { for log in logs { if let Some(path) = log.timestamp_path().cloned().as_ref() { - log.rename_key(path, "@timestamp"); + log.rename_key(path, event_path!("@timestamp")); } if let Some(path) = log.host_path().cloned().as_ref() { - log.rename_key(path, "os.host"); + log.rename_key(path, event_path!("os.host")); } } } diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index f6a9be1a46f09..ca010ff6e911c 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -20,7 +20,7 @@ use codecs::decoding::{DeserializerConfig, FramingConfig}; use futures::{FutureExt, StreamExt}; use futures_util::Stream; use lapin::{acker::Acker, message::Delivery, Channel}; -use lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path, PathPrefix}; +use lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path}; use snafu::Snafu; use std::{io::Cursor, pin::Pin}; use tokio_util::codec::FramedRead; @@ -253,7 +253,7 @@ fn populate_event( .path .as_ref() .map(LegacyKey::InsertIfEmpty), - "routing", + path!("routing"), keys.routing.to_string(), ); @@ -264,7 +264,7 @@ fn populate_event( .path .as_ref() .map(LegacyKey::InsertIfEmpty), - "exchange", + path!("exchange"), keys.exchange.to_string(), ); @@ -272,7 +272,7 @@ fn populate_event( AmqpSourceConfig::NAME, log, keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty), - "offset", + path!("offset"), keys.delivery_tag, ); @@ -298,11 +298,8 @@ fn populate_event( log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now()); } LogNamespace::Legacy => { - if let Some(timestamp_key) = log_schema().timestamp_key() { - log.try_insert( - (PathPrefix::Event, timestamp_key), - timestamp.unwrap_or_else(Utc::now), - ); + if let Some(timestamp_key) = log_schema().timestamp_key_target_path() { + log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now)); } } }; diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index bd32dfac4a431..c597c5ea5574f 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -25,6 +25,7 @@ use vector_common::internal_event::{ }; use vector_config::configurable_component; use vector_core::config::{LegacyKey, LogNamespace}; +use vrl::event_path; use vrl::value::{kind::Collection, Kind}; use super::util::MultilineConfig; @@ -1270,21 +1271,24 @@ fn line_agg_adapter( ) -> impl Stream { let line_agg_in = inner.map(move |mut log| { let message_value = match log_namespace { - LogNamespace::Vector => log.remove(".").expect("`.` must exist in the event"), + LogNamespace::Vector => log + .remove(event_path!()) + .expect("`.` must exist in the event"), LogNamespace::Legacy => log - .remove(( - PathPrefix::Event, + .remove( log_schema() - .message_key() + .message_key_target_path() .expect("global log_schema.message_key to be valid path"), - )) + ) .expect("`message` must exist in the event"), }; let stream_value = match log_namespace { LogNamespace::Vector => log .get(metadata_path!(DockerLogsConfig::NAME, STREAM)) .expect("`docker_logs.stream` must exist in the metadata"), - LogNamespace::Legacy => log.get(STREAM).expect("stream must exist in the event"), + LogNamespace::Legacy => log + .get(event_path!(STREAM)) + .expect("stream must exist in the event"), }; let stream = stream_value.coerce_to_bytes(); @@ -1294,14 +1298,11 @@ fn line_agg_adapter( let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic); line_agg_out.map(move |(_, message, mut log)| { match log_namespace { - LogNamespace::Vector => log.insert(".", message), + LogNamespace::Vector => log.insert(event_path!(), message), LogNamespace::Legacy => log.insert( - ( - PathPrefix::Event, - log_schema() - .message_key() - .expect("global log_schema.message_key to be valid path"), - ), + log_schema() + .message_key_target_path() + .expect("global log_schema.message_key to be valid path"), message, ), }; diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 29128200d9e6b..d160428b60986 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -26,6 +26,7 @@ use vector_common::internal_event::{ use vector_common::{byte_size_of::ByteSizeOf, finalizer::UnorderedFinalizer}; use vector_config::configurable_component; use vector_core::config::{LegacyKey, LogNamespace}; +use vrl::path; use vrl::value::{kind::Collection, Kind}; use crate::{ @@ -687,15 +688,15 @@ impl PubsubSource { log_namespace.insert_source_metadata( PubsubConfig::NAME, log, - Some(LegacyKey::Overwrite("message_id")), - "message_id", + Some(LegacyKey::Overwrite(path!("message_id"))), + path!("message_id"), message.message_id.clone(), ); log_namespace.insert_source_metadata( PubsubConfig::NAME, log, - Some(LegacyKey::Overwrite("attributes")), - "attributes", + Some(LegacyKey::Overwrite(path!("attributes"))), + path!("attributes"), attributes.clone(), ) } diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 2549a4771e40f..a6fb139759fef 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -224,7 +224,7 @@ async fn nats_source( NatsSourceConfig::NAME, log, legacy_subject_key_field, - "subject", + &owned_value_path!("subject"), msg.subject.as_str(), ) } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 78deae6cc60fc..e704a2e31cc49 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -676,7 +676,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { self.log_namespace.insert_vector_metadata( &mut log, log_schema().source_type_key(), - lookup::path!("source_type"), + &owned_value_path!("source_type"), SplunkConfig::NAME, ); @@ -685,7 +685,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { self.log_namespace.insert_source_metadata( SplunkConfig::NAME, &mut log, - Some(LegacyKey::Overwrite(CHANNEL)), + Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))), CHANNEL, guid, ); @@ -693,7 +693,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { self.log_namespace.insert_source_metadata( SplunkConfig::NAME, &mut log, - Some(LegacyKey::Overwrite(CHANNEL)), + Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))), CHANNEL, guid.clone(), ); @@ -705,7 +705,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { self.log_namespace.insert_source_metadata( SplunkConfig::NAME, &mut log, - Some(LegacyKey::Overwrite(key.as_str())), + Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))), key.as_str(), value, ); @@ -1009,7 +1009,7 @@ fn raw_event( log_namespace.insert_source_metadata( SplunkConfig::NAME, &mut log, - Some(LegacyKey::Overwrite(CHANNEL)), + Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))), CHANNEL, channel, ); diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index c674939daef23..1418d77730d6f 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -15,6 +15,7 @@ use smallvec::SmallVec; use tokio_util::udp::UdpFramed; use vector_config::configurable_component; use vector_core::config::{LegacyKey, LogNamespace}; +use vrl::event_path; #[cfg(unix)] use crate::sources::util::build_unix_stream_source; @@ -394,14 +395,14 @@ fn enrich_syslog_event( log_namespace.insert_source_metadata( SyslogConfig::NAME, log, - Some(LegacyKey::Overwrite("source_ip")), + Some(LegacyKey::Overwrite(path!("source_ip"))), path!("source_ip"), default_host.clone(), ); } let parsed_hostname = log - .get("hostname") + .get(event_path!("hostname")) .map(|hostname| hostname.coerce_to_bytes()); if let Some(parsed_host) = parsed_hostname.or(default_host) { @@ -420,7 +421,7 @@ fn enrich_syslog_event( if log_namespace == LogNamespace::Legacy { let timestamp = log - .get("timestamp") + .get(event_path!("timestamp")) .and_then(|timestamp| timestamp.as_timestamp().cloned()) .unwrap_or_else(Utc::now); log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp); diff --git a/src/transforms/lua/v1/mod.rs b/src/transforms/lua/v1/mod.rs index f5b143214141b..58a94f939261d 100644 --- a/src/transforms/lua/v1/mod.rs +++ b/src/transforms/lua/v1/mod.rs @@ -1,9 +1,11 @@ use std::{future::ready, pin::Pin}; use futures::{stream, Stream, StreamExt}; +use mlua::ExternalError; use ordered_float::NotNan; use snafu::{ResultExt, Snafu}; use vector_config::configurable_component; +use vrl::path::parse_target_path; use crate::config::OutputId; use crate::schema::Definition; @@ -223,6 +225,7 @@ impl mlua::UserData for LuaEvent { methods.add_meta_method_mut( mlua::MetaMethod::NewIndex, |_lua, this, (key, value): (String, Option>)| { + let key_path = parse_target_path(key.as_str()).map_err(|e| e.to_lua_err())?; match value { Some(mlua::Value::String(string)) => { this.inner.as_mut_log().insert( @@ -233,20 +236,20 @@ impl mlua::UserData for LuaEvent { Some(mlua::Value::Integer(integer)) => { this.inner .as_mut_log() - .insert(key.as_str(), Value::Integer(integer)); + .insert(&key_path, Value::Integer(integer)); } Some(mlua::Value::Number(number)) if !number.is_nan() => { this.inner .as_mut_log() - .insert(key.as_str(), Value::Float(NotNan::new(number).unwrap())); + .insert(&key_path, Value::Float(NotNan::new(number).unwrap())); } Some(mlua::Value::Boolean(boolean)) => { this.inner .as_mut_log() - .insert(key.as_str(), Value::Boolean(boolean)); + .insert(&key_path, Value::Boolean(boolean)); } Some(mlua::Value::Nil) | None => { - this.inner.as_mut_log().remove(key.as_str()); + this.inner.as_mut_log().remove(&key_path); } _ => { info!( @@ -255,7 +258,7 @@ impl mlua::UserData for LuaEvent { field = key.as_str(), internal_log_rate_limit = true ); - this.inner.as_mut_log().remove(key.as_str()); + this.inner.as_mut_log().remove(&key_path); } } @@ -287,10 +290,15 @@ impl mlua::UserData for LuaEvent { let keys: mlua::Table = state.raw_get("keys")?; let next: mlua::Function = lua.globals().raw_get("next")?; let key: Option = next.call((keys, prev))?; - match key - .clone() - .and_then(|k| event.inner.as_log().get(k.as_str())) - { + let value = key.clone().and_then(|k| { + event + .inner + .as_log() + .parse_path_and_get_value(k.as_str()) + .ok() + .flatten() + }); + match value { Some(value) => { Ok((key, Some(lua.create_string(&value.coerce_to_bytes())?))) } diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index 3748f6bac5c48..279646ecdf065 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -313,7 +313,7 @@ impl MetricToLog { if let Some(host_tag) = &self.host_tag { if let Some(host_value) = - log.remove_prune(host_tag.to_string().as_str(), true) + log.remove_prune((PathPrefix::Event, host_tag), true) { log.maybe_insert(log_schema().host_key_target_path(), host_value); } diff --git a/src/transforms/reduce/merge_strategy.rs b/src/transforms/reduce/merge_strategy.rs index 7cb954fa91e78..beef0558c1e3b 100644 --- a/src/transforms/reduce/merge_strategy.rs +++ b/src/transforms/reduce/merge_strategy.rs @@ -4,6 +4,7 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; use ordered_float::NotNan; use vector_config::configurable_component; +use vrl::event_path; use crate::event::{LogEvent, Value}; @@ -68,7 +69,7 @@ impl ReduceValueMerger for DiscardMerger { } fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { - v.insert(k.as_str(), self.v); + v.insert(event_path!(k.as_str()), self.v); Ok(()) } } @@ -327,8 +328,11 @@ impl ReduceValueMerger for TimestampWindowMerger { } fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { - v.insert(format!("{}_end", k).as_str(), Value::Timestamp(self.latest)); - v.insert(k.as_str(), Value::Timestamp(self.started)); + v.insert( + event_path!(format!("{}_end", k).as_str()), + Value::Timestamp(self.latest), + ); + v.insert(event_path!(k.as_str()), Value::Timestamp(self.started)); Ok(()) } } @@ -448,8 +452,8 @@ impl ReduceValueMerger for MaxNumberMerger { fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { match self.v { - NumberMergerValue::Float(f) => v.insert(k.as_str(), Value::Float(f)), - NumberMergerValue::Int(i) => v.insert(k.as_str(), Value::Integer(i)), + NumberMergerValue::Float(f) => v.insert(event_path!(k.as_str()), Value::Float(f)), + NumberMergerValue::Int(i) => v.insert(event_path!(k.as_str()), Value::Integer(i)), }; Ok(()) } @@ -507,8 +511,8 @@ impl ReduceValueMerger for MinNumberMerger { fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { match self.v { - NumberMergerValue::Float(f) => v.insert(k.as_str(), Value::Float(f)), - NumberMergerValue::Int(i) => v.insert(k.as_str(), Value::Integer(i)), + NumberMergerValue::Float(f) => v.insert(event_path!(k.as_str()), Value::Float(f)), + NumberMergerValue::Int(i) => v.insert(event_path!(k.as_str()), Value::Integer(i)), }; Ok(()) } diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index 50b01d4d6783b..94aa823d96463 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -609,8 +609,8 @@ mod tests { use indoc::{formatdoc, indoc}; use vector_core::{config::GlobalOptions, event::EventMetadata, metric_tags}; - use vrl::btreemap; use vrl::value::kind::Collection; + use vrl::{btreemap, event_path}; use super::*; use crate::{ @@ -1174,12 +1174,12 @@ mod tests { let log = output.as_log(); assert_eq!(log["hello"], "world".into()); assert_eq!(log["foo"], "bar".into()); - assert!(!log.contains("metadata")); + assert!(!log.contains(event_path!("metadata"))); let output = transform_one_fallible(&mut tform, abort).unwrap_err(); let log = output.as_log(); assert_eq!(log["hello"], "goodbye".into()); - assert!(!log.contains("foo")); + assert!(!log.contains(event_path!("foo"))); assert_eq!( log["metadata"], serde_json::json!({ @@ -1198,7 +1198,7 @@ mod tests { let output = transform_one_fallible(&mut tform, error).unwrap_err(); let log = output.as_log(); assert_eq!(log["hello"], 42.into()); - assert!(!log.contains("foo")); + assert!(!log.contains(event_path!("foo"))); assert_eq!( log["metadata"], serde_json::json!({ @@ -1310,7 +1310,7 @@ mod tests { transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err(); let log = output.as_log(); assert_eq!(log["hello"], 42.into()); - assert!(!log.contains("foo")); + assert!(!log.contains(event_path!("foo"))); assert_eq!( log["metadata"], serde_json::json!({ @@ -1330,7 +1330,7 @@ mod tests { transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err(); let log = output.as_log(); assert_eq!(log["hello"], 0.into()); - assert!(!log.contains("foo")); + assert!(!log.contains(event_path!("foo"))); assert_eq!( log["metadata"], serde_json::json!({ @@ -1368,7 +1368,7 @@ mod tests { let output = transform_one_fallible(&mut tform, error).unwrap_err(); let log = output.as_log(); assert_eq!(log["hello"], 42.into()); - assert!(!log.contains("foo")); + assert!(!log.contains(event_path!("foo"))); assert_eq!( log["metadata"], serde_json::json!({ @@ -1448,7 +1448,7 @@ mod tests { let log = output.as_log(); assert_eq!(log["hello"], "world".into()); assert_eq!(log["foo"], "bar".into()); - assert!(!log.contains("metadata")); + assert!(!log.contains(event_path!("metadata"))); let out = collect_outputs(&mut tform, abort); assert!(out.primary.is_empty()); From ebcefef161e210ed021f776c4cfb47c084101165 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 11 Aug 2023 16:12:29 -0400 Subject: [PATCH 2/5] more path refactoring --- src/codecs/encoding/transformer.rs | 62 ++++++++----------- src/sinks/aws_kinesis/config.rs | 3 +- src/sinks/aws_kinesis/sink.rs | 14 ++--- src/sinks/aws_kinesis/streams/config.rs | 3 +- src/sinks/datadog/events/request_builder.rs | 4 +- src/sinks/elasticsearch/config.rs | 3 +- src/sinks/elasticsearch/sink.rs | 15 ++--- src/sinks/gcp/stackdriver_logs.rs | 10 +-- src/sinks/humio/logs.rs | 4 +- src/sinks/splunk_hec/logs/config.rs | 4 +- src/sinks/splunk_hec/logs/sink.rs | 9 ++- src/sources/journald.rs | 9 +-- src/sources/kubernetes_logs/parser/cri.rs | 10 ++- src/sources/kubernetes_logs/parser/docker.rs | 15 ++--- src/sources/kubernetes_logs/parser/mod.rs | 6 +- .../kubernetes_logs/partial_events_merger.rs | 4 +- .../kubernetes_logs/transform_utils/mod.rs | 10 ++- src/sources/splunk_hec/mod.rs | 19 +++--- src/transforms/reduce/merge_strategy.rs | 21 ++++--- 19 files changed, 111 insertions(+), 114 deletions(-) diff --git a/src/codecs/encoding/transformer.rs b/src/codecs/encoding/transformer.rs index 7a7be839d904f..788ae68ad0fb2 100644 --- a/src/codecs/encoding/transformer.rs +++ b/src/codecs/encoding/transformer.rs @@ -4,15 +4,12 @@ use core::fmt::Debug; use std::collections::BTreeMap; use lookup::lookup_v2::ConfigValuePath; -use lookup::{ - event_path, - lookup_v2::{parse_value_path, OwnedValuePath}, - PathPrefix, -}; +use lookup::{event_path, PathPrefix}; use serde::{Deserialize, Deserializer}; use vector_config::configurable_component; use vector_core::event::{LogEvent, MaybeAsLogMut}; use vector_core::schema::meaning; +use vrl::path::OwnedValuePath; use vrl::value::Value; use crate::{event::Event, serde::skip_serializing_if_default}; @@ -27,7 +24,7 @@ pub struct Transformer { /// List of fields that are excluded from the encoded event. #[serde(default, skip_serializing_if = "skip_serializing_if_default")] - except_fields: Option>, + except_fields: Option>, /// Format used for timestamp fields. #[serde(default, skip_serializing_if = "skip_serializing_if_default")] @@ -45,15 +42,19 @@ impl<'de> Deserialize<'de> for Transformer { #[serde(default)] only_fields: Option>, #[serde(default)] - except_fields: Option>, + except_fields: Option>, #[serde(default)] timestamp_format: Option, } let inner: TransformerInner = Deserialize::deserialize(deserializer)?; Self::new( - inner.only_fields, - inner.except_fields, + inner + .only_fields + .map(|v| v.iter().map(|p| ConfigValuePath { 0: p.clone() }).collect()), + inner + .except_fields + .map(|v| v.iter().map(|p| ConfigValuePath { 0: p.clone() }).collect()), inner.timestamp_format, ) .map_err(serde::de::Error::custom) @@ -66,13 +67,12 @@ impl Transformer { /// Returns `Err` if `only_fields` and `except_fields` fail validation, i.e. are not mutually /// exclusive. pub fn new( - only_fields: Option>, - except_fields: Option>, + only_fields: Option>, + except_fields: Option>, timestamp_format: Option, ) -> Result { Self::validate_fields(only_fields.as_ref(), except_fields.as_ref())?; - let only_fields = only_fields.map(|x| x.into_iter().map(ConfigValuePath).collect()); Ok(Self { only_fields, except_fields, @@ -87,7 +87,7 @@ impl Transformer { } /// Get the `Transformer`'s `except_fields`. - pub const fn except_fields(&self) -> &Option> { + pub const fn except_fields(&self) -> &Option> { &self.except_fields } @@ -100,14 +100,14 @@ impl Transformer { /// /// If an error is returned, the entire encoding configuration should be considered inoperable. fn validate_fields( - only_fields: Option<&Vec>, - except_fields: Option<&Vec>, + only_fields: Option<&Vec>, + except_fields: Option<&Vec>, ) -> crate::Result<()> { if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields) { - if except_fields.iter().any(|f| { - let path_iter = parse_value_path(f).unwrap(); - only_fields.iter().any(|v| v == &path_iter) - }) { + if except_fields + .iter() + .any(|f| only_fields.iter().any(|v| v == f)) + { return Err( "`except_fields` and `only_fields` should be mutually exclusive.".into(), ); @@ -155,22 +155,19 @@ impl Transformer { } fn apply_except_fields(&self, log: &mut LogEvent) { - use lookup::path::TargetPath; - if let Some(except_fields) = self.except_fields.as_ref() { let service_path = log .metadata() .schema_definition() - .meaning_path(meaning::SERVICE) - .map(|path| path.value_path().to_string()); + .meaning_path(meaning::SERVICE); for field in except_fields { - let value = log.remove(field.as_str()); + let value = log.remove((PathPrefix::Event, field)); // If we are removing the service field we need to store this in a `dropped_fields` list as we may need to // refer to this later when emitting metrics. if let Some(v) = value { - if matches!(service_path.as_ref(), Some(path) if path == field) { + if matches!(service_path, Some(target_path) if target_path.path == field.0) { log.metadata_mut() .add_dropped_field(meaning::SERVICE.to_string(), v); } @@ -216,17 +213,12 @@ impl Transformer { /// Returns `Err` if the new `except_fields` fail validation, i.e. are not mutually exclusive /// with `only_fields`. #[cfg(test)] - pub fn set_except_fields(&mut self, except_fields: Option>) -> crate::Result<()> { - Self::validate_fields( - self.only_fields - .clone() - .map(|x| x.into_iter().map(|x| x.0).collect()) - .as_ref(), - except_fields.as_ref(), - )?; - + pub fn set_except_fields( + &mut self, + except_fields: Option>, + ) -> crate::Result<()> { + Self::validate_fields(self.only_fields.as_ref(), except_fields.as_ref())?; self.except_fields = except_fields; - Ok(()) } } diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index 31c35e74c8fdc..1b3292afd29dd 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -1,3 +1,4 @@ +use lookup::lookup_v2::ConfigValuePath; use std::marker::PhantomData; use vector_core::stream::BatcherSettings; @@ -79,7 +80,7 @@ impl KinesisSinkBaseConfig { /// Builds an aws_kinesis sink. pub fn build_sink( config: &KinesisSinkBaseConfig, - partition_key_field: Option, + partition_key_field: Option, batch_settings: BatcherSettings, client: C, retry_logic: RT, diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index 0341c0e8244d6..c39911a0705bc 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -1,6 +1,8 @@ use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize}; +use lookup::lookup_v2::ConfigValuePath; use rand::random; +use vrl::path::PathPrefix; use crate::{ internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError}, @@ -27,7 +29,7 @@ pub struct KinesisSink { pub batch_settings: BatcherSettings, pub service: S, pub request_builder: KinesisRequestBuilder, - pub partition_key_field: Option, + pub partition_key_field: Option, pub _phantom: PhantomData, } @@ -42,13 +44,11 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let request_builder_concurrency_limit = NonZeroUsize::new(50); - let partition_key_field = self.partition_key_field.clone(); - input .filter_map(|event| { // Panic: This sink only accepts Logs, so this should never panic let log = event.into_log(); - let processed = process_log(log, &partition_key_field); + let processed = process_log(log, self.partition_key_field.as_ref()); future::ready(processed) }) @@ -106,14 +106,14 @@ where /// events are emitted and None is returned. pub(crate) fn process_log( log: LogEvent, - partition_key_field: &Option, + partition_key_field: Option<&ConfigValuePath>, ) -> Option { let partition_key = if let Some(partition_key_field) = partition_key_field { - if let Some(v) = log.get(partition_key_field.as_str()) { + if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) { v.to_string_lossy() } else { emit!(AwsKinesisStreamNoPartitionKeyError { - partition_key_field + partition_key_field: partition_key_field.0.to_string().as_str() }); return None; } diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index 4ebcf2a724e64..8ace03b2b602b 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -3,6 +3,7 @@ use aws_sdk_kinesis::{ types::SdkError, }; use futures::FutureExt; +use lookup::lookup_v2::ConfigValuePath; use snafu::Snafu; use vector_config::{component::GenerateConfig, configurable_component}; @@ -80,7 +81,7 @@ pub struct KinesisStreamsSinkConfig { /// /// If not specified, a unique partition key is generated for each Kinesis record. #[configurable(metadata(docs::examples = "user_id"))] - pub partition_key_field: Option, + pub partition_key_field: Option, #[configurable(derived)] #[serde(default)] diff --git a/src/sinks/datadog/events/request_builder.rs b/src/sinks/datadog/events/request_builder.rs index 93e4eeeb17c31..bbb2c53e22fd7 100644 --- a/src/sinks/datadog/events/request_builder.rs +++ b/src/sinks/datadog/events/request_builder.rs @@ -2,7 +2,7 @@ use std::{io, sync::Arc}; use bytes::Bytes; use codecs::JsonSerializerConfig; -use lookup::lookup_v2::OwnedSegment; +use lookup::lookup_v2::ConfigValuePath; use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; use vector_core::ByteSizeOf; @@ -133,7 +133,7 @@ fn encoder() -> (Transformer, Encoder<()>) { "title", ] .iter() - .map(|field| vec![OwnedSegment::Field((*field).into())].into()) + .map(|field| ConfigValuePath::try_from((*field).to_string()).unwrap()) .collect(), ); // DataDog Event API requires unix timestamp. diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index ca52fdde0899c..2295555dc8c72 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -33,6 +33,7 @@ use crate::{ transforms::metric_to_log::MetricToLogConfig, }; use lookup::event_path; +use lookup::lookup_v2::ConfigValuePath; use vector_core::schema::Requirement; use vrl::value::Kind; @@ -108,7 +109,7 @@ pub struct ElasticsearchConfig { #[configurable(metadata(docs::advanced))] #[configurable(metadata(docs::examples = "id"))] #[configurable(metadata(docs::examples = "_id"))] - pub id_key: Option, + pub id_key: Option, /// The name of the pipeline to apply. #[serde(default)] diff --git a/src/sinks/elasticsearch/sink.rs b/src/sinks/elasticsearch/sink.rs index 6e2fca3fb18d1..7800f4d975a91 100644 --- a/src/sinks/elasticsearch/sink.rs +++ b/src/sinks/elasticsearch/sink.rs @@ -2,8 +2,10 @@ use std::{fmt, num::NonZeroUsize}; use async_trait::async_trait; use futures::{future, stream::BoxStream, StreamExt}; +use lookup::lookup_v2::ConfigValuePath; use tower::Service; use vector_core::stream::{BatcherSettings, DriverResponse}; +use vrl::path::PathPrefix; use crate::{ codecs::Transformer, @@ -34,7 +36,7 @@ pub struct ElasticsearchSink { pub service: S, pub metric_to_log: MetricToLog, pub mode: ElasticsearchCommonMode, - pub id_key_field: Option, + pub id_key_field: Option, } impl ElasticsearchSink { @@ -68,7 +70,7 @@ where let request_builder_concurrency_limit = NonZeroUsize::new(50); let mode = self.mode; - let id_key_field = self.id_key_field; + let id_key_field = self.id_key_field.as_ref(); let transformer = self.transformer.clone(); input @@ -86,7 +88,7 @@ where }) .filter_map(|x| async move { x }) .filter_map(move |log| { - future::ready(process_log(log, &mode, &id_key_field, &transformer)) + future::ready(process_log(log, &mode, id_key_field, &transformer)) }) .batched(self.batch_settings.into_byte_size_config()) .request_builder(request_builder_concurrency_limit, self.request_builder) @@ -110,7 +112,7 @@ where pub(super) fn process_log( mut log: LogEvent, mode: &ElasticsearchCommonMode, - id_key_field: &Option, + id_key_field: Option<&ConfigValuePath>, transformer: &Transformer, ) -> Option { let index = mode.index(&log)?; @@ -120,9 +122,8 @@ pub(super) fn process_log( cfg.sync_fields(&mut log); cfg.remap_timestamp(&mut log); }; - let id = if let Some(Value::Bytes(key)) = id_key_field - .as_ref() - .and_then(|key| log.remove(key.as_str())) + let id = if let Some(Value::Bytes(key)) = + id_key_field.and_then(|key| log.remove((PathPrefix::Event, key))) { Some(String::from_utf8_lossy(&key).into_owned()) } else { diff --git a/src/sinks/gcp/stackdriver_logs.rs b/src/sinks/gcp/stackdriver_logs.rs index fb24bce81a40b..6cf277405fafd 100644 --- a/src/sinks/gcp/stackdriver_logs.rs +++ b/src/sinks/gcp/stackdriver_logs.rs @@ -4,9 +4,11 @@ use bytes::Bytes; use futures::{FutureExt, SinkExt}; use http::{Request, Uri}; use hyper::Body; +use lookup::lookup_v2::ConfigValuePath; use serde_json::{json, map}; use snafu::Snafu; use vector_config::configurable_component; +use vrl::path::PathPrefix; use vrl::value::Kind; use crate::{ @@ -71,7 +73,7 @@ pub struct StackdriverConfig { /// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity /// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity #[configurable(metadata(docs::examples = "severity"))] - pub severity_key: Option, + pub severity_key: Option, #[serde(flatten)] pub auth: GcpAuthConfig, @@ -111,7 +113,7 @@ fn default_endpoint() -> String { struct StackdriverSink { config: StackdriverConfig, auth: GcpAuthenticator, - severity_key: Option, + severity_key: Option, uri: Uri, } @@ -259,7 +261,7 @@ impl SinkConfig for StackdriverConfig { struct StackdriverEventEncoder { config: StackdriverConfig, - severity_key: Option, + severity_key: Option, } impl HttpEventEncoder for StackdriverEventEncoder { @@ -294,7 +296,7 @@ impl HttpEventEncoder for StackdriverEventEncoder { let severity = self .severity_key .as_ref() - .and_then(|key| log.remove(key.as_str())) + .and_then(|key| log.remove((PathPrefix::Event, &key.0))) .map(remap_severity) .unwrap_or_else(|| 0.into()); diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 617bc9785ec89..bdfddfda59f2e 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -1,5 +1,5 @@ use codecs::JsonSerializerConfig; -use lookup::lookup_v2::OptionalValuePath; +use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath}; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; @@ -85,7 +85,7 @@ pub struct HumioLogsConfig { /// /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data #[serde(default)] - pub(super) indexed_fields: Vec, + pub(super) indexed_fields: Vec, /// Optional name of the repository to ingest into. /// diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index a205f1a0110de..05b53c013e677 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use codecs::TextSerializerConfig; use futures_util::FutureExt; -use lookup::lookup_v2::OptionalValuePath; +use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath}; use tower::ServiceBuilder; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; @@ -73,7 +73,7 @@ pub struct HecLogsSinkConfig { #[configurable(metadata(docs::advanced))] #[serde(default)] #[configurable(metadata(docs::examples = "field1", docs::examples = "field2"))] - pub indexed_fields: Vec, + pub indexed_fields: Vec, /// The name of the index to send events to. /// diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 595f8f2119dce..5a69e95f1f34f 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -38,7 +38,7 @@ pub struct HecLogsSink { pub sourcetype: Option