Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: disable vrl 'string_path' feature #18188

Merged
merged 5 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ tokio = { version = "1.30.0", features = ["test-util"] }
tokio-test = "0.4.2"
tower-test = "0.4.0"
vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] }
vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] }
pront marked this conversation as resolved.
Show resolved Hide resolved

wiremock = "0.5.19"
zstd = { version = "0.12.4", default-features = false }

Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ mod tests {
let events = deserialize_gelf_input(&input).unwrap();
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert!(!log.contains("_id"));
assert!(!log.contains(event_path!("_id")));
}
}

Expand Down
5 changes: 4 additions & 1 deletion lib/codecs/src/encoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
// rename additional fields that were flagged as missing the underscore prefix
for field in missing_prefix {
log.rename_key(event_path!(field.as_str()), format!("_{}", &field).as_str());
log.rename_key(
event_path!(field.as_str()),
event_path!(format!("_{}", &field).as_str()),
);
}
log
})
Expand Down
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ impl<'a> ValuePath<'a> for &'a ConfigValuePath {
}
}

#[cfg(any(test, feature = "test"))]
impl From<&str> for ConfigValuePath {
fn from(path: &str) -> Self {
ConfigValuePath::try_from(path.to_string()).unwrap()
}
}

/// A wrapper around `OwnedTargetPath` that allows it to be used in Vector config
/// with prefix default to `PathPrefix::Event`
#[configurable_component]
Expand Down
7 changes: 4 additions & 3 deletions src/api/schema/events/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::borrow::Cow;
use async_graphql::Object;
use chrono::{DateTime, Utc};
use vector_common::encode_logfmt;
use vrl::event_path;

use super::EventEncodingType;
use crate::{event, topology::TapOutput};
Expand All @@ -19,11 +20,11 @@ impl Log {
}

pub fn get_message(&self) -> Option<Cow<'_, str>> {
Some(self.event.get("message")?.to_string_lossy())
Some(self.event.get(event_path!("message"))?.to_string_lossy())
}

pub fn get_timestamp(&self) -> Option<&DateTime<Utc>> {
self.event.get("timestamp")?.as_timestamp()
self.event.get(event_path!("timestamp"))?.as_timestamp()
}
}

Expand Down Expand Up @@ -69,7 +70,7 @@ impl Log {

/// Get JSON field data on the log event, by field name
async fn json(&self, field: String) -> Option<String> {
self.event.get(field.as_str()).map(|field| {
self.event.get(event_path!(field.as_str())).map(|field| {
serde_json::to_string(field)
.expect("JSON serialization of trace event field failed. Please report.")
})
Expand Down
15 changes: 3 additions & 12 deletions src/codecs/encoding/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}

Expand Down Expand Up @@ -207,10 +204,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}

Expand Down Expand Up @@ -239,10 +233,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}
}
78 changes: 35 additions & 43 deletions src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ use core::fmt::Debug;
use std::collections::BTreeMap;

use lookup::lookup_v2::ConfigValuePath;
use lookup::{
event_path,
lookup_v2::{parse_value_path, OwnedValuePath},
PathPrefix,
};
use lookup::{event_path, PathPrefix};
use serde::{Deserialize, Deserializer};
use vector_config::configurable_component;
use vector_core::event::{LogEvent, MaybeAsLogMut};
use vector_core::schema::meaning;
use vrl::path::OwnedValuePath;
use vrl::value::Value;

use crate::{event::Event, serde::skip_serializing_if_default};
Expand All @@ -27,7 +24,7 @@ pub struct Transformer {

/// List of fields that are excluded from the encoded event.
#[serde(default, skip_serializing_if = "skip_serializing_if_default")]
except_fields: Option<Vec<String>>,
except_fields: Option<Vec<ConfigValuePath>>,

/// Format used for timestamp fields.
#[serde(default, skip_serializing_if = "skip_serializing_if_default")]
Expand All @@ -45,15 +42,19 @@ impl<'de> Deserialize<'de> for Transformer {
#[serde(default)]
only_fields: Option<Vec<OwnedValuePath>>,
#[serde(default)]
except_fields: Option<Vec<String>>,
except_fields: Option<Vec<OwnedValuePath>>,
#[serde(default)]
timestamp_format: Option<TimestampFormat>,
}

let inner: TransformerInner = Deserialize::deserialize(deserializer)?;
Self::new(
inner.only_fields,
inner.except_fields,
inner
.only_fields
.map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
inner
.except_fields
.map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
inner.timestamp_format,
)
.map_err(serde::de::Error::custom)
Expand All @@ -66,13 +67,12 @@ impl Transformer {
/// Returns `Err` if `only_fields` and `except_fields` fail validation, i.e. are not mutually
/// exclusive.
pub fn new(
only_fields: Option<Vec<OwnedValuePath>>,
except_fields: Option<Vec<String>>,
only_fields: Option<Vec<ConfigValuePath>>,
except_fields: Option<Vec<ConfigValuePath>>,
timestamp_format: Option<TimestampFormat>,
) -> Result<Self, crate::Error> {
Self::validate_fields(only_fields.as_ref(), except_fields.as_ref())?;

let only_fields = only_fields.map(|x| x.into_iter().map(ConfigValuePath).collect());
Ok(Self {
only_fields,
except_fields,
Expand All @@ -87,7 +87,7 @@ impl Transformer {
}

/// Get the `Transformer`'s `except_fields`.
pub const fn except_fields(&self) -> &Option<Vec<String>> {
pub const fn except_fields(&self) -> &Option<Vec<ConfigValuePath>> {
&self.except_fields
}

Expand All @@ -100,14 +100,14 @@ impl Transformer {
///
/// If an error is returned, the entire encoding configuration should be considered inoperable.
fn validate_fields(
only_fields: Option<&Vec<OwnedValuePath>>,
except_fields: Option<&Vec<String>>,
only_fields: Option<&Vec<ConfigValuePath>>,
except_fields: Option<&Vec<ConfigValuePath>>,
) -> crate::Result<()> {
if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields) {
if except_fields.iter().any(|f| {
let path_iter = parse_value_path(f).unwrap();
only_fields.iter().any(|v| v == &path_iter)
}) {
if except_fields
.iter()
.any(|f| only_fields.iter().any(|v| v == f))
{
return Err(
"`except_fields` and `only_fields` should be mutually exclusive.".into(),
);
Expand Down Expand Up @@ -155,22 +155,19 @@ impl Transformer {
}

fn apply_except_fields(&self, log: &mut LogEvent) {
use lookup::path::TargetPath;

if let Some(except_fields) = self.except_fields.as_ref() {
let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE)
.map(|path| path.value_path().to_string());

for field in except_fields {
let value = log.remove(field.as_str());
let value_path = &field.0;
let value = log.remove((PathPrefix::Event, value_path));

let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE);
// If we are removing the service field we need to store this in a `dropped_fields` list as we may need to
// refer to this later when emitting metrics.
if let Some(v) = value {
if matches!(service_path.as_ref(), Some(path) if path == field) {
if let (Some(v), Some(service_path)) = (value, service_path) {
if service_path.path == *value_path {
log.metadata_mut()
.add_dropped_field(meaning::SERVICE.to_string(), v);
}
Expand Down Expand Up @@ -216,17 +213,12 @@ impl Transformer {
/// Returns `Err` if the new `except_fields` fail validation, i.e. are not mutually exclusive
/// with `only_fields`.
#[cfg(test)]
pub fn set_except_fields(&mut self, except_fields: Option<Vec<String>>) -> crate::Result<()> {
Self::validate_fields(
self.only_fields
.clone()
.map(|x| x.into_iter().map(|x| x.0).collect())
.as_ref(),
except_fields.as_ref(),
)?;

pub fn set_except_fields(
&mut self,
except_fields: Option<Vec<ConfigValuePath>>,
) -> crate::Result<()> {
Self::validate_fields(self.only_fields.as_ref(), except_fields.as_ref())?;
self.except_fields = except_fields;

Ok(())
}
}
Expand Down Expand Up @@ -282,7 +274,7 @@ mod tests {
#[test]
fn deserialize_and_transform_except() {
let transformer: Transformer =
toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d\\.z", "e"]"#).unwrap();
toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d.z", "e"]"#).unwrap();
let mut log = LogEvent::default();
{
log.insert("a", 1);
Expand All @@ -293,7 +285,7 @@ mod tests {
log.insert("b[1].x", 1);
log.insert("c[0].x", 1);
log.insert("c[0].y", 1);
log.insert("d\\.z", 1);
log.insert("d.z", 1);
log.insert("e.a", 1);
log.insert("e.b", 1);
}
Expand All @@ -303,7 +295,7 @@ mod tests {
assert!(!event.as_mut_log().contains("b"));
assert!(!event.as_mut_log().contains("b[1].x"));
assert!(!event.as_mut_log().contains("c[0].y"));
assert!(!event.as_mut_log().contains("d\\.z"));
assert!(!event.as_mut_log().contains("d.z"));
assert!(!event.as_mut_log().contains("e.a"));

assert!(event.as_mut_log().contains("a.b.d"));
Expand Down
5 changes: 3 additions & 2 deletions src/internal_events/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use metrics::{counter, gauge};
use vector_core::{internal_event::InternalEvent, update_counter};
use vrl::path::OwnedTargetPath;

use vector_common::{
internal_event::{error_stage, error_type},
Expand Down Expand Up @@ -161,7 +162,7 @@ impl InternalEvent for KafkaStatisticsReceived<'_> {
}

pub struct KafkaHeaderExtractionError<'a> {
pub header_field: &'a str,
pub header_field: &'a OwnedTargetPath,
}

impl InternalEvent for KafkaHeaderExtractionError<'_> {
Expand All @@ -171,7 +172,7 @@ impl InternalEvent for KafkaHeaderExtractionError<'_> {
error_code = "extracting_header",
error_type = error_type::PARSER_FAILED,
stage = error_stage::RECEIVING,
header_field = self.header_field,
header_field = self.header_field.to_string(),
internal_log_rate_limit = true,
);
counter!(
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use lookup::lookup_v2::ConfigValuePath;
use std::marker::PhantomData;

use vector_core::stream::BatcherSettings;
Expand Down Expand Up @@ -79,7 +80,7 @@ impl KinesisSinkBaseConfig {
/// Builds an aws_kinesis sink.
pub fn build_sink<C, R, RR, E, RT>(
config: &KinesisSinkBaseConfig,
partition_key_field: Option<String>,
partition_key_field: Option<ConfigValuePath>,
batch_settings: BatcherSettings,
client: C,
retry_logic: RT,
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/aws_kinesis/sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize};

use lookup::lookup_v2::ConfigValuePath;
use rand::random;
use vrl::path::PathPrefix;

use crate::{
internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
Expand All @@ -27,7 +29,7 @@ pub struct KinesisSink<S, R> {
pub batch_settings: BatcherSettings,
pub service: S,
pub request_builder: KinesisRequestBuilder<R>,
pub partition_key_field: Option<String>,
pub partition_key_field: Option<ConfigValuePath>,
pub _phantom: PhantomData<R>,
}

Expand All @@ -42,13 +44,11 @@ where
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let request_builder_concurrency_limit = NonZeroUsize::new(50);

let partition_key_field = self.partition_key_field.clone();

input
.filter_map(|event| {
// Panic: This sink only accepts Logs, so this should never panic
let log = event.into_log();
let processed = process_log(log, &partition_key_field);
let processed = process_log(log, self.partition_key_field.as_ref());

future::ready(processed)
})
Expand Down Expand Up @@ -106,14 +106,14 @@ where
/// events are emitted and None is returned.
pub(crate) fn process_log(
log: LogEvent,
partition_key_field: &Option<String>,
partition_key_field: Option<&ConfigValuePath>,
) -> Option<KinesisProcessedEvent> {
let partition_key = if let Some(partition_key_field) = partition_key_field {
if let Some(v) = log.get(partition_key_field.as_str()) {
if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) {
v.to_string_lossy()
} else {
emit!(AwsKinesisStreamNoPartitionKeyError {
partition_key_field
partition_key_field: partition_key_field.0.to_string().as_str()
});
return None;
}
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use aws_sdk_kinesis::{
types::SdkError,
};
use futures::FutureExt;
use lookup::lookup_v2::ConfigValuePath;
use snafu::Snafu;
use vector_config::{component::GenerateConfig, configurable_component};

Expand Down Expand Up @@ -80,7 +81,7 @@ pub struct KinesisStreamsSinkConfig {
///
/// If not specified, a unique partition key is generated for each Kinesis record.
#[configurable(metadata(docs::examples = "user_id"))]
pub partition_key_field: Option<String>,
pub partition_key_field: Option<ConfigValuePath>,

#[configurable(derived)]
#[serde(default)]
Expand Down
Loading