From 5ce5ff19365a42afbb857acbcd48636bb2d99194 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 17 Aug 2023 13:17:02 -0400 Subject: [PATCH] feat: disable vrl 'string_path' feature (#18188) * feat: disable vrl 'string_path' feature * more path refactoring * more changes * lua changes * fix bad toml rebase --- Cargo.toml | 5 +- lib/codecs/src/decoding/format/gelf.rs | 2 +- lib/codecs/src/encoding/format/gelf.rs | 5 +- lib/vector-lookup/src/lookup_v2/mod.rs | 7 ++ src/api/schema/events/log.rs | 7 +- src/codecs/encoding/config.rs | 15 +--- src/codecs/encoding/transformer.rs | 78 +++++++++---------- src/internal_events/kafka.rs | 5 +- src/sinks/aws_kinesis/config.rs | 3 +- src/sinks/aws_kinesis/sink.rs | 14 ++-- src/sinks/aws_kinesis/streams/config.rs | 3 +- src/sinks/datadog/events/request_builder.rs | 4 +- src/sinks/datadog/events/sink.rs | 14 ++-- src/sinks/elasticsearch/config.rs | 3 +- src/sinks/elasticsearch/sink.rs | 15 ++-- src/sinks/elasticsearch/tests.rs | 27 +++---- src/sinks/gcp/stackdriver_logs.rs | 10 ++- src/sinks/humio/logs.rs | 4 +- src/sinks/humio/metrics.rs | 4 +- src/sinks/kafka/config.rs | 9 ++- src/sinks/kafka/request_builder.rs | 31 ++++---- src/sinks/kafka/sink.rs | 9 ++- src/sinks/kafka/tests.rs | 5 +- src/sinks/loki/sink.rs | 5 +- src/sinks/mezmo.rs | 7 +- src/sinks/sematext/logs.rs | 5 +- src/sinks/splunk_hec/logs/config.rs | 10 ++- .../splunk_hec/logs/integration_tests.rs | 29 +++---- src/sinks/splunk_hec/logs/sink.rs | 9 ++- src/sinks/splunk_hec/logs/tests.rs | 5 +- src/sources/amqp.rs | 15 ++-- src/sources/aws_s3/sqs.rs | 2 +- src/sources/docker_logs/mod.rs | 27 +++---- src/sources/gcp_pubsub.rs | 9 ++- src/sources/journald.rs | 9 ++- src/sources/kubernetes_logs/parser/cri.rs | 12 ++- src/sources/kubernetes_logs/parser/docker.rs | 17 ++-- src/sources/kubernetes_logs/parser/mod.rs | 10 +-- .../kubernetes_logs/transform_utils/mod.rs | 11 --- src/sources/nats.rs | 2 +- src/sources/splunk_hec/mod.rs | 25 +++--- src/sources/syslog.rs | 7 +- src/transforms/log_to_metric.rs | 8 +- src/transforms/lua/v1/mod.rs | 36 ++++++--- src/transforms/metric_to_log.rs | 2 +- src/transforms/reduce/merge_strategy.rs | 39 ++++++---- src/transforms/remap.rs | 16 ++-- .../reference/components/sinks/base/kafka.cue | 2 +- 48 files changed, 307 insertions(+), 291 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 78c9e596b50a1..9dcac6a4d6950 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,7 +118,8 @@ members = [ ] [workspace.dependencies] -vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] } +vrl = { version = "0.6.0", default-features = false, features = ["cli", "test", "test_framework", "arbitrary", "compiler", "value", "diagnostic", "path", "parser", "stdlib", "datadog", "core"] } + pin-project = { version = "1.1.3", default-features = false } [dependencies] @@ -370,6 +371,8 @@ tokio = { version = "1.32.0", features = ["test-util"] } tokio-test = "0.4.2" tower-test = "0.4.0" vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] } +vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] } + wiremock = "0.5.19" zstd = { version = "0.12.4", default-features = false } diff --git a/lib/codecs/src/decoding/format/gelf.rs b/lib/codecs/src/decoding/format/gelf.rs index 88d7989c7e210..77633c328d9d0 100644 --- a/lib/codecs/src/decoding/format/gelf.rs +++ b/lib/codecs/src/decoding/format/gelf.rs @@ -360,7 +360,7 @@ mod tests { let events = deserialize_gelf_input(&input).unwrap(); assert_eq!(events.len(), 1); let log = events[0].as_log(); - assert!(!log.contains("_id")); + assert!(!log.contains(event_path!("_id"))); } } diff --git a/lib/codecs/src/encoding/format/gelf.rs b/lib/codecs/src/encoding/format/gelf.rs index d35bad66decce..363a377d4c519 100644 --- a/lib/codecs/src/encoding/format/gelf.rs +++ b/lib/codecs/src/encoding/format/gelf.rs @@ -224,7 +224,10 @@ fn to_gelf_event(log: LogEvent) -> vector_common::Result { coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| { // rename additional fields that were flagged as missing the underscore prefix for field in missing_prefix { - log.rename_key(event_path!(field.as_str()), format!("_{}", &field).as_str()); + log.rename_key( + event_path!(field.as_str()), + event_path!(format!("_{}", &field).as_str()), + ); } log }) diff --git a/lib/vector-lookup/src/lookup_v2/mod.rs b/lib/vector-lookup/src/lookup_v2/mod.rs index d066eb596e3e2..de95977844d62 100644 --- a/lib/vector-lookup/src/lookup_v2/mod.rs +++ b/lib/vector-lookup/src/lookup_v2/mod.rs @@ -38,6 +38,13 @@ impl<'a> ValuePath<'a> for &'a ConfigValuePath { } } +#[cfg(any(test, feature = "test"))] +impl From<&str> for ConfigValuePath { + fn from(path: &str) -> Self { + ConfigValuePath::try_from(path.to_string()).unwrap() + } +} + /// A wrapper around `OwnedTargetPath` that allows it to be used in Vector config /// with prefix default to `PathPrefix::Event` #[configurable_component] diff --git a/src/api/schema/events/log.rs b/src/api/schema/events/log.rs index 251de727c27e7..ad53474622510 100644 --- a/src/api/schema/events/log.rs +++ b/src/api/schema/events/log.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use async_graphql::Object; use chrono::{DateTime, Utc}; use vector_common::encode_logfmt; +use vrl::event_path; use super::EventEncodingType; use crate::{event, topology::TapOutput}; @@ -19,11 +20,11 @@ impl Log { } pub fn get_message(&self) -> Option> { - Some(self.event.get("message")?.to_string_lossy()) + Some(self.event.get(event_path!("message"))?.to_string_lossy()) } pub fn get_timestamp(&self) -> Option<&DateTime> { - self.event.get("timestamp")?.as_timestamp() + self.event.get(event_path!("timestamp"))?.as_timestamp() } } @@ -69,7 +70,7 @@ impl Log { /// Get JSON field data on the log event, by field name async fn json(&self, field: String) -> Option { - self.event.get(field.as_str()).map(|field| { + self.event.get(event_path!(field.as_str())).map(|field| { serde_json::to_string(field) .expect("JSON serialization of trace event field failed. Please report.") }) diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index 97096de1e0f3d..cdae762bf6484 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -172,10 +172,7 @@ mod test { transformer.only_fields(), &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())]) ); - assert_eq!( - transformer.except_fields(), - &Some(vec!["ignore_me".to_owned()]) - ); + assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()])); assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix)); } @@ -207,10 +204,7 @@ mod test { transformer.only_fields(), &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())]) ); - assert_eq!( - transformer.except_fields(), - &Some(vec!["ignore_me".to_owned()]) - ); + assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()])); assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix)); } @@ -239,10 +233,7 @@ mod test { transformer.only_fields(), &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())]) ); - assert_eq!( - transformer.except_fields(), - &Some(vec!["ignore_me".to_owned()]) - ); + assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()])); assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix)); } } diff --git a/src/codecs/encoding/transformer.rs b/src/codecs/encoding/transformer.rs index 7a7be839d904f..bd5aee8b7f0bc 100644 --- a/src/codecs/encoding/transformer.rs +++ b/src/codecs/encoding/transformer.rs @@ -4,15 +4,12 @@ use core::fmt::Debug; use std::collections::BTreeMap; use lookup::lookup_v2::ConfigValuePath; -use lookup::{ - event_path, - lookup_v2::{parse_value_path, OwnedValuePath}, - PathPrefix, -}; +use lookup::{event_path, PathPrefix}; use serde::{Deserialize, Deserializer}; use vector_config::configurable_component; use vector_core::event::{LogEvent, MaybeAsLogMut}; use vector_core::schema::meaning; +use vrl::path::OwnedValuePath; use vrl::value::Value; use crate::{event::Event, serde::skip_serializing_if_default}; @@ -27,7 +24,7 @@ pub struct Transformer { /// List of fields that are excluded from the encoded event. #[serde(default, skip_serializing_if = "skip_serializing_if_default")] - except_fields: Option>, + except_fields: Option>, /// Format used for timestamp fields. #[serde(default, skip_serializing_if = "skip_serializing_if_default")] @@ -45,15 +42,19 @@ impl<'de> Deserialize<'de> for Transformer { #[serde(default)] only_fields: Option>, #[serde(default)] - except_fields: Option>, + except_fields: Option>, #[serde(default)] timestamp_format: Option, } let inner: TransformerInner = Deserialize::deserialize(deserializer)?; Self::new( - inner.only_fields, - inner.except_fields, + inner + .only_fields + .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()), + inner + .except_fields + .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()), inner.timestamp_format, ) .map_err(serde::de::Error::custom) @@ -66,13 +67,12 @@ impl Transformer { /// Returns `Err` if `only_fields` and `except_fields` fail validation, i.e. are not mutually /// exclusive. pub fn new( - only_fields: Option>, - except_fields: Option>, + only_fields: Option>, + except_fields: Option>, timestamp_format: Option, ) -> Result { Self::validate_fields(only_fields.as_ref(), except_fields.as_ref())?; - let only_fields = only_fields.map(|x| x.into_iter().map(ConfigValuePath).collect()); Ok(Self { only_fields, except_fields, @@ -87,7 +87,7 @@ impl Transformer { } /// Get the `Transformer`'s `except_fields`. - pub const fn except_fields(&self) -> &Option> { + pub const fn except_fields(&self) -> &Option> { &self.except_fields } @@ -100,14 +100,14 @@ impl Transformer { /// /// If an error is returned, the entire encoding configuration should be considered inoperable. fn validate_fields( - only_fields: Option<&Vec>, - except_fields: Option<&Vec>, + only_fields: Option<&Vec>, + except_fields: Option<&Vec>, ) -> crate::Result<()> { if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields) { - if except_fields.iter().any(|f| { - let path_iter = parse_value_path(f).unwrap(); - only_fields.iter().any(|v| v == &path_iter) - }) { + if except_fields + .iter() + .any(|f| only_fields.iter().any(|v| v == f)) + { return Err( "`except_fields` and `only_fields` should be mutually exclusive.".into(), ); @@ -155,22 +155,19 @@ impl Transformer { } fn apply_except_fields(&self, log: &mut LogEvent) { - use lookup::path::TargetPath; - if let Some(except_fields) = self.except_fields.as_ref() { - let service_path = log - .metadata() - .schema_definition() - .meaning_path(meaning::SERVICE) - .map(|path| path.value_path().to_string()); - for field in except_fields { - let value = log.remove(field.as_str()); + let value_path = &field.0; + let value = log.remove((PathPrefix::Event, value_path)); + let service_path = log + .metadata() + .schema_definition() + .meaning_path(meaning::SERVICE); // If we are removing the service field we need to store this in a `dropped_fields` list as we may need to // refer to this later when emitting metrics. - if let Some(v) = value { - if matches!(service_path.as_ref(), Some(path) if path == field) { + if let (Some(v), Some(service_path)) = (value, service_path) { + if service_path.path == *value_path { log.metadata_mut() .add_dropped_field(meaning::SERVICE.to_string(), v); } @@ -216,17 +213,12 @@ impl Transformer { /// Returns `Err` if the new `except_fields` fail validation, i.e. are not mutually exclusive /// with `only_fields`. #[cfg(test)] - pub fn set_except_fields(&mut self, except_fields: Option>) -> crate::Result<()> { - Self::validate_fields( - self.only_fields - .clone() - .map(|x| x.into_iter().map(|x| x.0).collect()) - .as_ref(), - except_fields.as_ref(), - )?; - + pub fn set_except_fields( + &mut self, + except_fields: Option>, + ) -> crate::Result<()> { + Self::validate_fields(self.only_fields.as_ref(), except_fields.as_ref())?; self.except_fields = except_fields; - Ok(()) } } @@ -282,7 +274,7 @@ mod tests { #[test] fn deserialize_and_transform_except() { let transformer: Transformer = - toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d\\.z", "e"]"#).unwrap(); + toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d.z", "e"]"#).unwrap(); let mut log = LogEvent::default(); { log.insert("a", 1); @@ -293,7 +285,7 @@ mod tests { log.insert("b[1].x", 1); log.insert("c[0].x", 1); log.insert("c[0].y", 1); - log.insert("d\\.z", 1); + log.insert("d.z", 1); log.insert("e.a", 1); log.insert("e.b", 1); } @@ -303,7 +295,7 @@ mod tests { assert!(!event.as_mut_log().contains("b")); assert!(!event.as_mut_log().contains("b[1].x")); assert!(!event.as_mut_log().contains("c[0].y")); - assert!(!event.as_mut_log().contains("d\\.z")); + assert!(!event.as_mut_log().contains("d.z")); assert!(!event.as_mut_log().contains("e.a")); assert!(event.as_mut_log().contains("a.b.d")); diff --git a/src/internal_events/kafka.rs b/src/internal_events/kafka.rs index 57c25d905c60f..7a968626d0cb9 100644 --- a/src/internal_events/kafka.rs +++ b/src/internal_events/kafka.rs @@ -1,5 +1,6 @@ use metrics::{counter, gauge}; use vector_core::{internal_event::InternalEvent, update_counter}; +use vrl::path::OwnedTargetPath; use vector_common::{ internal_event::{error_stage, error_type}, @@ -161,7 +162,7 @@ impl InternalEvent for KafkaStatisticsReceived<'_> { } pub struct KafkaHeaderExtractionError<'a> { - pub header_field: &'a str, + pub header_field: &'a OwnedTargetPath, } impl InternalEvent for KafkaHeaderExtractionError<'_> { @@ -171,7 +172,7 @@ impl InternalEvent for KafkaHeaderExtractionError<'_> { error_code = "extracting_header", error_type = error_type::PARSER_FAILED, stage = error_stage::RECEIVING, - header_field = self.header_field, + header_field = self.header_field.to_string(), internal_log_rate_limit = true, ); counter!( diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index 31c35e74c8fdc..1b3292afd29dd 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -1,3 +1,4 @@ +use lookup::lookup_v2::ConfigValuePath; use std::marker::PhantomData; use vector_core::stream::BatcherSettings; @@ -79,7 +80,7 @@ impl KinesisSinkBaseConfig { /// Builds an aws_kinesis sink. pub fn build_sink( config: &KinesisSinkBaseConfig, - partition_key_field: Option, + partition_key_field: Option, batch_settings: BatcherSettings, client: C, retry_logic: RT, diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index 0341c0e8244d6..c39911a0705bc 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -1,6 +1,8 @@ use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize}; +use lookup::lookup_v2::ConfigValuePath; use rand::random; +use vrl::path::PathPrefix; use crate::{ internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError}, @@ -27,7 +29,7 @@ pub struct KinesisSink { pub batch_settings: BatcherSettings, pub service: S, pub request_builder: KinesisRequestBuilder, - pub partition_key_field: Option, + pub partition_key_field: Option, pub _phantom: PhantomData, } @@ -42,13 +44,11 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let request_builder_concurrency_limit = NonZeroUsize::new(50); - let partition_key_field = self.partition_key_field.clone(); - input .filter_map(|event| { // Panic: This sink only accepts Logs, so this should never panic let log = event.into_log(); - let processed = process_log(log, &partition_key_field); + let processed = process_log(log, self.partition_key_field.as_ref()); future::ready(processed) }) @@ -106,14 +106,14 @@ where /// events are emitted and None is returned. pub(crate) fn process_log( log: LogEvent, - partition_key_field: &Option, + partition_key_field: Option<&ConfigValuePath>, ) -> Option { let partition_key = if let Some(partition_key_field) = partition_key_field { - if let Some(v) = log.get(partition_key_field.as_str()) { + if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) { v.to_string_lossy() } else { emit!(AwsKinesisStreamNoPartitionKeyError { - partition_key_field + partition_key_field: partition_key_field.0.to_string().as_str() }); return None; } diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index 4ebcf2a724e64..8ace03b2b602b 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -3,6 +3,7 @@ use aws_sdk_kinesis::{ types::SdkError, }; use futures::FutureExt; +use lookup::lookup_v2::ConfigValuePath; use snafu::Snafu; use vector_config::{component::GenerateConfig, configurable_component}; @@ -80,7 +81,7 @@ pub struct KinesisStreamsSinkConfig { /// /// If not specified, a unique partition key is generated for each Kinesis record. #[configurable(metadata(docs::examples = "user_id"))] - pub partition_key_field: Option, + pub partition_key_field: Option, #[configurable(derived)] #[serde(default)] diff --git a/src/sinks/datadog/events/request_builder.rs b/src/sinks/datadog/events/request_builder.rs index 93e4eeeb17c31..bbb2c53e22fd7 100644 --- a/src/sinks/datadog/events/request_builder.rs +++ b/src/sinks/datadog/events/request_builder.rs @@ -2,7 +2,7 @@ use std::{io, sync::Arc}; use bytes::Bytes; use codecs::JsonSerializerConfig; -use lookup::lookup_v2::OwnedSegment; +use lookup::lookup_v2::ConfigValuePath; use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; use vector_core::ByteSizeOf; @@ -133,7 +133,7 @@ fn encoder() -> (Transformer, Encoder<()>) { "title", ] .iter() - .map(|field| vec![OwnedSegment::Field((*field).into())].into()) + .map(|field| ConfigValuePath::try_from((*field).to_string()).unwrap()) .collect(), ); // DataDog Event API requires unix timestamp. diff --git a/src/sinks/datadog/events/sink.rs b/src/sinks/datadog/events/sink.rs index 4a7a1a04facd7..a85d23d829093 100644 --- a/src/sinks/datadog/events/sink.rs +++ b/src/sinks/datadog/events/sink.rs @@ -50,12 +50,12 @@ where async fn ensure_required_fields(event: Event) -> Option { let mut log = event.into_log(); - if !log.contains("title") { + if !log.contains(event_path!("title")) { emit!(ParserMissingFieldError:: { field: "title" }); return None; } - if !log.contains("text") { + if !log.contains(event_path!("text")) { let message_path = log .message_path() .expect("message is required (make sure the \"message\" semantic meaning is set)") @@ -63,21 +63,21 @@ async fn ensure_required_fields(event: Event) -> Option { log.rename_key(&message_path, event_path!("text")); } - if !log.contains("host") { + if !log.contains(event_path!("host")) { if let Some(host_path) = log.host_path().cloned().as_ref() { log.rename_key(host_path, event_path!("host")); } } - if !log.contains("date_happened") { + if !log.contains(event_path!("date_happened")) { if let Some(timestamp_path) = log.timestamp_path().cloned().as_ref() { - log.rename_key(timestamp_path, "date_happened"); + log.rename_key(timestamp_path, event_path!("date_happened")); } } - if !log.contains("source_type_name") { + if !log.contains(event_path!("source_type_name")) { if let Some(source_type_path) = log.source_type_path().cloned().as_ref() { - log.rename_key(source_type_path, "source_type_name"); + log.rename_key(source_type_path, event_path!("source_type_name")); } } diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index ca52fdde0899c..2295555dc8c72 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -33,6 +33,7 @@ use crate::{ transforms::metric_to_log::MetricToLogConfig, }; use lookup::event_path; +use lookup::lookup_v2::ConfigValuePath; use vector_core::schema::Requirement; use vrl::value::Kind; @@ -108,7 +109,7 @@ pub struct ElasticsearchConfig { #[configurable(metadata(docs::advanced))] #[configurable(metadata(docs::examples = "id"))] #[configurable(metadata(docs::examples = "_id"))] - pub id_key: Option, + pub id_key: Option, /// The name of the pipeline to apply. #[serde(default)] diff --git a/src/sinks/elasticsearch/sink.rs b/src/sinks/elasticsearch/sink.rs index 6e2fca3fb18d1..7800f4d975a91 100644 --- a/src/sinks/elasticsearch/sink.rs +++ b/src/sinks/elasticsearch/sink.rs @@ -2,8 +2,10 @@ use std::{fmt, num::NonZeroUsize}; use async_trait::async_trait; use futures::{future, stream::BoxStream, StreamExt}; +use lookup::lookup_v2::ConfigValuePath; use tower::Service; use vector_core::stream::{BatcherSettings, DriverResponse}; +use vrl::path::PathPrefix; use crate::{ codecs::Transformer, @@ -34,7 +36,7 @@ pub struct ElasticsearchSink { pub service: S, pub metric_to_log: MetricToLog, pub mode: ElasticsearchCommonMode, - pub id_key_field: Option, + pub id_key_field: Option, } impl ElasticsearchSink { @@ -68,7 +70,7 @@ where let request_builder_concurrency_limit = NonZeroUsize::new(50); let mode = self.mode; - let id_key_field = self.id_key_field; + let id_key_field = self.id_key_field.as_ref(); let transformer = self.transformer.clone(); input @@ -86,7 +88,7 @@ where }) .filter_map(|x| async move { x }) .filter_map(move |log| { - future::ready(process_log(log, &mode, &id_key_field, &transformer)) + future::ready(process_log(log, &mode, id_key_field, &transformer)) }) .batched(self.batch_settings.into_byte_size_config()) .request_builder(request_builder_concurrency_limit, self.request_builder) @@ -110,7 +112,7 @@ where pub(super) fn process_log( mut log: LogEvent, mode: &ElasticsearchCommonMode, - id_key_field: &Option, + id_key_field: Option<&ConfigValuePath>, transformer: &Transformer, ) -> Option { let index = mode.index(&log)?; @@ -120,9 +122,8 @@ pub(super) fn process_log( cfg.sync_fields(&mut log); cfg.remap_timestamp(&mut log); }; - let id = if let Some(Value::Bytes(key)) = id_key_field - .as_ref() - .and_then(|key| log.remove(key.as_str())) + let id = if let Some(Value::Bytes(key)) = + id_key_field.and_then(|key| log.remove((PathPrefix::Event, key))) { Some(String::from_utf8_lossy(&key).into_owned()) } else { diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index f2672228abfe3..f8a8a55ac7882 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -12,7 +12,6 @@ use crate::{ }, template::Template, }; -use lookup::owned_value_path; // helper to unwrap template strings for tests only fn parse_template(input: &str) -> Template { @@ -53,7 +52,7 @@ async fn sets_create_action_when_configured() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -129,7 +128,7 @@ async fn encode_datastream_mode() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -187,7 +186,7 @@ async fn encode_datastream_mode_no_routing() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -223,7 +222,7 @@ async fn handle_metrics() { es.request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -339,7 +338,7 @@ async fn encode_datastream_mode_no_sync() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -358,12 +357,8 @@ async fn allows_using_except_fields() { index: parse_template("{{ idx }}"), ..Default::default() }, - encoding: Transformer::new( - None, - Some(vec!["idx".to_string(), "timestamp".to_string()]), - None, - ) - .unwrap(), + encoding: Transformer::new(None, Some(vec!["idx".into(), "timestamp".into()]), None) + .unwrap(), endpoints: vec![String::from("https://example.com")], api_version: ElasticsearchApiVersion::V6, ..Default::default() @@ -379,7 +374,7 @@ async fn allows_using_except_fields() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -398,7 +393,7 @@ async fn allows_using_only_fields() { index: parse_template("{{ idx }}"), ..Default::default() }, - encoding: Transformer::new(Some(vec![owned_value_path!("foo")]), None, None).unwrap(), + encoding: Transformer::new(Some(vec!["foo".into()]), None, None).unwrap(), endpoints: vec![String::from("https://example.com")], api_version: ElasticsearchApiVersion::V6, ..Default::default() @@ -414,7 +409,7 @@ async fn allows_using_only_fields() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -540,7 +535,7 @@ async fn datastream_index_name() { ), ); - let processed_event = process_log(log, &es.mode, &None, &config.encoding).unwrap(); + let processed_event = process_log(log, &es.mode, None, &config.encoding).unwrap(); assert_eq!(processed_event.index, test_case.want, "{test_case:?}"); } } diff --git a/src/sinks/gcp/stackdriver_logs.rs b/src/sinks/gcp/stackdriver_logs.rs index fb24bce81a40b..6cf277405fafd 100644 --- a/src/sinks/gcp/stackdriver_logs.rs +++ b/src/sinks/gcp/stackdriver_logs.rs @@ -4,9 +4,11 @@ use bytes::Bytes; use futures::{FutureExt, SinkExt}; use http::{Request, Uri}; use hyper::Body; +use lookup::lookup_v2::ConfigValuePath; use serde_json::{json, map}; use snafu::Snafu; use vector_config::configurable_component; +use vrl::path::PathPrefix; use vrl::value::Kind; use crate::{ @@ -71,7 +73,7 @@ pub struct StackdriverConfig { /// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity /// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity #[configurable(metadata(docs::examples = "severity"))] - pub severity_key: Option, + pub severity_key: Option, #[serde(flatten)] pub auth: GcpAuthConfig, @@ -111,7 +113,7 @@ fn default_endpoint() -> String { struct StackdriverSink { config: StackdriverConfig, auth: GcpAuthenticator, - severity_key: Option, + severity_key: Option, uri: Uri, } @@ -259,7 +261,7 @@ impl SinkConfig for StackdriverConfig { struct StackdriverEventEncoder { config: StackdriverConfig, - severity_key: Option, + severity_key: Option, } impl HttpEventEncoder for StackdriverEventEncoder { @@ -294,7 +296,7 @@ impl HttpEventEncoder for StackdriverEventEncoder { let severity = self .severity_key .as_ref() - .and_then(|key| log.remove(key.as_str())) + .and_then(|key| log.remove((PathPrefix::Event, &key.0))) .map(remap_severity) .unwrap_or_else(|| 0.into()); diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 617bc9785ec89..bdfddfda59f2e 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -1,5 +1,5 @@ use codecs::JsonSerializerConfig; -use lookup::lookup_v2::OptionalValuePath; +use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath}; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; @@ -85,7 +85,7 @@ pub struct HumioLogsConfig { /// /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data #[serde(default)] - pub(super) indexed_fields: Vec, + pub(super) indexed_fields: Vec, /// Optional name of the repository to ingest into. /// diff --git a/src/sinks/humio/metrics.rs b/src/sinks/humio/metrics.rs index d3ca01cbc33d5..1142206e1dfdf 100644 --- a/src/sinks/humio/metrics.rs +++ b/src/sinks/humio/metrics.rs @@ -3,7 +3,7 @@ use codecs::JsonSerializerConfig; use futures::StreamExt; use futures_util::stream::BoxStream; use indoc::indoc; -use lookup::lookup_v2::OptionalValuePath; +use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath}; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; use vector_core::sink::StreamSink; @@ -97,7 +97,7 @@ pub struct HumioMetricsConfig { /// /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data #[serde(default)] - indexed_fields: Vec, + indexed_fields: Vec, /// Optional name of the repository to ingest into. /// diff --git a/src/sinks/kafka/config.rs b/src/sinks/kafka/config.rs index 57cb86c2001dd..a40c6525da835 100644 --- a/src/sinks/kafka/config.rs +++ b/src/sinks/kafka/config.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, time::Duration}; use codecs::JsonSerializerConfig; use futures::FutureExt; +use lookup::lookup_v2::ConfigTargetPath; use rdkafka::ClientConfig; use serde_with::serde_as; use vector_config::configurable_component; @@ -53,7 +54,9 @@ pub struct KafkaSinkConfig { /// no key. #[configurable(metadata(docs::advanced))] #[configurable(metadata(docs::examples = "user_id"))] - pub key_field: Option, + #[configurable(metadata(docs::examples = ".my_topic"))] + #[configurable(metadata(docs::examples = "%my_topic"))] + pub key_field: Option, #[configurable(derived)] pub encoding: EncodingConfig, @@ -108,7 +111,7 @@ pub struct KafkaSinkConfig { #[configurable(metadata(docs::advanced))] #[serde(alias = "headers_field")] // accidentally released as `headers_field` in 0.18 #[configurable(metadata(docs::examples = "headers"))] - pub headers_key: Option, + pub headers_key: Option, #[configurable(derived)] #[serde( @@ -249,7 +252,7 @@ impl GenerateConfig for KafkaSinkConfig { toml::Value::try_from(Self { bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092".to_owned(), topic: Template::try_from("topic-1234".to_owned()).unwrap(), - key_field: Some("user_id".to_owned()), + key_field: Some(ConfigTargetPath::try_from("user_id".to_owned()).unwrap()), encoding: JsonSerializerConfig::default().into(), batch: Default::default(), compression: KafkaCompression::None, diff --git a/src/sinks/kafka/request_builder.rs b/src/sinks/kafka/request_builder.rs index c2d3c7aaa9219..3a8ce886ad085 100644 --- a/src/sinks/kafka/request_builder.rs +++ b/src/sinks/kafka/request_builder.rs @@ -3,6 +3,7 @@ use std::num::NonZeroUsize; use bytes::{Bytes, BytesMut}; use rdkafka::message::{Header, OwnedHeaders}; use tokio_util::codec::Encoder as _; +use vrl::path::OwnedTargetPath; use crate::{ codecs::{Encoder, Transformer}, @@ -16,8 +17,8 @@ use crate::{ }; pub struct KafkaRequestBuilder { - pub key_field: Option, - pub headers_key: Option, + pub key_field: Option, + pub headers_key: Option, pub topic_template: Template, pub transformer: Transformer, pub encoder: Encoder<()>, @@ -39,9 +40,9 @@ impl KafkaRequestBuilder { let metadata = KafkaRequestMetadata { finalizers: event.take_finalizers(), - key: get_key(&event, &self.key_field), + key: get_key(&event, self.key_field.as_ref()), timestamp_millis: get_timestamp_millis(&event), - headers: get_headers(&event, &self.headers_key), + headers: get_headers(&event, self.headers_key.as_ref()), topic, }; self.transformer.transform(&mut event); @@ -64,14 +65,12 @@ impl KafkaRequestBuilder { } } -fn get_key(event: &Event, key_field: &Option) -> Option { - key_field.as_ref().and_then(|key_field| match event { - Event::Log(log) => log - .get(key_field.as_str()) - .map(|value| value.coerce_to_bytes()), +fn get_key(event: &Event, key_field: Option<&OwnedTargetPath>) -> Option { + key_field.and_then(|key_field| match event { + Event::Log(log) => log.get(key_field).map(|value| value.coerce_to_bytes()), Event::Metric(metric) => metric .tags() - .and_then(|tags| tags.get(key_field)) + .and_then(|tags| tags.get(key_field.to_string().as_str())) .map(|value| value.to_owned().into()), _ => None, }) @@ -86,10 +85,10 @@ fn get_timestamp_millis(event: &Event) -> Option { .map(|ts| ts.timestamp_millis()) } -fn get_headers(event: &Event, headers_key: &Option) -> Option { - headers_key.as_ref().and_then(|headers_key| { +fn get_headers(event: &Event, headers_key: Option<&OwnedTargetPath>) -> Option { + headers_key.and_then(|headers_key| { if let Event::Log(log) = event { - if let Some(headers) = log.get(headers_key.as_str()) { + if let Some(headers) = log.get(headers_key) { match headers { Value::Object(headers_map) => { let mut owned_headers = OwnedHeaders::new_with_capacity(headers_map.len()); @@ -131,15 +130,15 @@ mod tests { #[test] fn kafka_get_headers() { - let headers_key = "headers"; + let headers_key = OwnedTargetPath::try_from("headers".to_string()).unwrap(); let mut header_values = BTreeMap::new(); header_values.insert("a-key".to_string(), Value::Bytes(Bytes::from("a-value"))); header_values.insert("b-key".to_string(), Value::Bytes(Bytes::from("b-value"))); let mut event = Event::Log(LogEvent::from("hello")); - event.as_mut_log().insert(headers_key, header_values); + event.as_mut_log().insert(&headers_key, header_values); - let headers = get_headers(&event, &Some(headers_key.to_string())).unwrap(); + let headers = get_headers(&event, Some(&headers_key)).unwrap(); assert_eq!(headers.get(0).key, "a-key"); assert_eq!(headers.get(0).value.unwrap(), "a-value".as_bytes()); assert_eq!(headers.get(1).key, "b-key"); diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index e3060900f71c2..f2d0107524310 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -8,6 +8,7 @@ use rdkafka::{ use snafu::{ResultExt, Snafu}; use tokio::time::Duration; use tower::limit::ConcurrencyLimit; +use vrl::path::OwnedTargetPath; use super::config::{KafkaRole, KafkaSinkConfig}; use crate::{ @@ -32,8 +33,8 @@ pub struct KafkaSink { encoder: Encoder<()>, service: KafkaService, topic: Template, - key_field: Option, - headers_key: Option, + key_field: Option, + headers_key: Option, } pub(crate) fn create_producer( @@ -54,12 +55,12 @@ impl KafkaSink { let encoder = Encoder::<()>::new(serializer); Ok(KafkaSink { - headers_key: config.headers_key, + headers_key: config.headers_key.map(|key| key.0), transformer, encoder, service: KafkaService::new(producer), topic: config.topic, - key_field: config.key_field, + key_field: config.key_field.map(|key| key.0), }) } diff --git a/src/sinks/kafka/tests.rs b/src/sinks/kafka/tests.rs index 9ed3b66ba3fc7..fee36ca9af5eb 100644 --- a/src/sinks/kafka/tests.rs +++ b/src/sinks/kafka/tests.rs @@ -13,6 +13,7 @@ mod integration_test { use bytes::Bytes; use codecs::TextSerializerConfig; use futures::StreamExt; + use lookup::lookup_v2::ConfigTargetPath; use rdkafka::{ consumer::{BaseConsumer, Consumer}, message::Headers, @@ -302,7 +303,7 @@ mod integration_test { } let topic = format!("test-{}", random_string(10)); - let headers_key = "headers_key".to_string(); + let headers_key = ConfigTargetPath::try_from("headers_key".to_string()).unwrap(); let kafka_auth = KafkaAuthConfig { sasl, tls }; let config = KafkaSinkConfig { bootstrap_servers: server.clone(), @@ -335,7 +336,7 @@ mod integration_test { Value::Bytes(Bytes::from(header_1_value)), ); events.iter_logs_mut().for_each(move |log| { - log.insert(headers_key.as_str(), header_values.clone()); + log.insert(&headers_key, header_values.clone()); }); events }); diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index fd6aa01bac23d..bb0db3853cf7f 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -5,6 +5,7 @@ use once_cell::sync::Lazy; use regex::Regex; use snafu::Snafu; use tokio_util::codec::Encoder as _; +use vrl::path::parse_target_path; use super::{ config::{LokiConfig, OutOfOrderAction}, @@ -239,7 +240,9 @@ impl EventEncoder { for template in self.labels.values() { if let Some(fields) = template.get_fields() { for field in fields { - event.as_mut_log().remove(field.as_str()); + if let Ok(path) = parse_target_path(field.as_str()) { + event.as_mut_log().remove(&path); + } } } } diff --git a/src/sinks/mezmo.rs b/src/sinks/mezmo.rs index 97d96f40c8c40..25999e73163f0 100644 --- a/src/sinks/mezmo.rs +++ b/src/sinks/mezmo.rs @@ -6,6 +6,7 @@ use http::{Request, StatusCode, Uri}; use serde_json::json; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; +use vrl::event_path; use vrl::value::{Kind, Value}; use crate::{ @@ -269,15 +270,15 @@ impl HttpEventEncoder> for map.insert("line".to_string(), json!(line)); map.insert("timestamp".to_string(), json!(timestamp)); - if let Some(env) = log.remove("env") { + if let Some(env) = log.remove(event_path!("env")) { map.insert("env".to_string(), json!(env)); } - if let Some(app) = log.remove("app") { + if let Some(app) = log.remove(event_path!("app")) { map.insert("app".to_string(), json!(app)); } - if let Some(file) = log.remove("file") { + if let Some(file) = log.remove(event_path!("file")) { map.insert("file".to_string(), json!(file)); } diff --git a/src/sinks/sematext/logs.rs b/src/sinks/sematext/logs.rs index e5484b5dbd859..e5ba64f7e9524 100644 --- a/src/sinks/sematext/logs.rs +++ b/src/sinks/sematext/logs.rs @@ -3,6 +3,7 @@ use futures::stream::{BoxStream, StreamExt}; use indoc::indoc; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; +use vrl::event_path; use super::Region; use crate::{ @@ -144,11 +145,11 @@ fn map_timestamp(mut events: EventArray) -> EventArray { EventArray::Logs(logs) => { for log in logs { if let Some(path) = log.timestamp_path().cloned().as_ref() { - log.rename_key(path, "@timestamp"); + log.rename_key(path, event_path!("@timestamp")); } if let Some(path) = log.host_path().cloned().as_ref() { - log.rename_key(path, "os.host"); + log.rename_key(path, event_path!("os.host")); } } } diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index a205f1a0110de..ef056693bdc53 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use codecs::TextSerializerConfig; use futures_util::FutureExt; -use lookup::lookup_v2::OptionalValuePath; +use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath}; use tower::ServiceBuilder; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; @@ -73,7 +73,7 @@ pub struct HecLogsSinkConfig { #[configurable(metadata(docs::advanced))] #[serde(default)] #[configurable(metadata(docs::examples = "field1", docs::examples = "field2"))] - pub indexed_fields: Vec, + pub indexed_fields: Vec, /// The name of the index to send events to. /// @@ -272,7 +272,11 @@ impl HecLogsSinkConfig { sourcetype: self.sourcetype.clone(), source: self.source.clone(), index: self.index.clone(), - indexed_fields: self.indexed_fields.clone(), + indexed_fields: self + .indexed_fields + .iter() + .map(|config_path| config_path.0.clone()) + .collect(), host_key: self.host_key.path.clone(), timestamp_nanos_key: self.timestamp_nanos_key.clone(), timestamp_key: self.timestamp_key.path.clone(), diff --git a/src/sinks/splunk_hec/logs/integration_tests.rs b/src/sinks/splunk_hec/logs/integration_tests.rs index 8bfe02dfee305..63a3214265824 100644 --- a/src/sinks/splunk_hec/logs/integration_tests.rs +++ b/src/sinks/splunk_hec/logs/integration_tests.rs @@ -3,7 +3,7 @@ use std::{convert::TryFrom, iter, num::NonZeroU8}; use chrono::{TimeZone, Timelike, Utc}; use codecs::{JsonSerializerConfig, TextSerializerConfig}; use futures::{future::ready, stream}; -use lookup::lookup_v2::OptionalValuePath; +use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath}; use serde_json::Value as JsonValue; use tokio::time::{sleep, Duration}; use vector_core::{ @@ -106,7 +106,10 @@ async fn find_entries(messages: &[String]) -> bool { found_all } -async fn config(encoding: EncodingConfig, indexed_fields: Vec) -> HecLogsSinkConfig { +async fn config( + encoding: EncodingConfig, + indexed_fields: Vec, +) -> HecLogsSinkConfig { let mut batch = BatchConfig::default(); batch.max_events = Some(5); @@ -302,7 +305,7 @@ async fn splunk_insert_index() { async fn splunk_index_is_interpolated() { let cx = SinkContext::default(); - let indexed_fields = vec!["asdf".to_string()]; + let indexed_fields = vec!["asdf".into()]; let mut config = config(JsonSerializerConfig::default().into(), indexed_fields).await; config.index = Template::try_from("{{ index_name }}".to_string()).ok(); @@ -379,7 +382,7 @@ async fn splunk_hostname() { async fn splunk_sourcetype() { let cx = SinkContext::default(); - let indexed_fields = vec!["asdf".to_string()]; + let indexed_fields = vec!["asdf".into()]; let mut config = config(JsonSerializerConfig::default().into(), indexed_fields).await; config.sourcetype = Template::try_from("_json".to_string()).ok(); @@ -405,11 +408,7 @@ async fn splunk_configure_hostname() { let config = HecLogsSinkConfig { host_key: OptionalValuePath::new("roast"), - ..config( - JsonSerializerConfig::default().into(), - vec!["asdf".to_string()], - ) - .await + ..config(JsonSerializerConfig::default().into(), vec!["asdf".into()]).await }; let (sink, _) = config.build(cx).await.unwrap(); @@ -443,11 +442,7 @@ async fn splunk_indexer_acknowledgements() { let config = HecLogsSinkConfig { default_token: String::from(ACK_TOKEN).into(), acknowledgements: acknowledgements_config, - ..config( - JsonSerializerConfig::default().into(), - vec!["asdf".to_string()], - ) - .await + ..config(JsonSerializerConfig::default().into(), vec!["asdf".into()]).await }; let (sink, _) = config.build(cx).await.unwrap(); @@ -464,11 +459,7 @@ async fn splunk_indexer_acknowledgements() { async fn splunk_indexer_acknowledgements_disabled_on_server() { let cx = SinkContext::default(); - let config = config( - JsonSerializerConfig::default().into(), - vec!["asdf".to_string()], - ) - .await; + let config = config(JsonSerializerConfig::default().into(), vec!["asdf".into()]).await; let (sink, _) = config.build(cx).await.unwrap(); let (tx, mut rx) = BatchNotifier::new_with_receiver(); diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 595f8f2119dce..2533add3438a2 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -38,7 +38,7 @@ pub struct HecLogsSink { pub sourcetype: Option