Skip to content

Commit

Permalink
more path refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Aug 16, 2023
1 parent 992d915 commit 27f524f
Show file tree
Hide file tree
Showing 19 changed files with 111 additions and 114 deletions.
62 changes: 27 additions & 35 deletions src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ use core::fmt::Debug;
use std::collections::BTreeMap;

use lookup::lookup_v2::ConfigValuePath;
use lookup::{
event_path,
lookup_v2::{parse_value_path, OwnedValuePath},
PathPrefix,
};
use lookup::{event_path, PathPrefix};
use serde::{Deserialize, Deserializer};
use vector_config::configurable_component;
use vector_core::event::{LogEvent, MaybeAsLogMut};
use vector_core::schema::meaning;
use vrl::path::OwnedValuePath;
use vrl::value::Value;

use crate::{event::Event, serde::skip_serializing_if_default};
Expand All @@ -27,7 +24,7 @@ pub struct Transformer {

/// List of fields that are excluded from the encoded event.
#[serde(default, skip_serializing_if = "skip_serializing_if_default")]
except_fields: Option<Vec<String>>,
except_fields: Option<Vec<ConfigValuePath>>,

/// Format used for timestamp fields.
#[serde(default, skip_serializing_if = "skip_serializing_if_default")]
Expand All @@ -45,15 +42,19 @@ impl<'de> Deserialize<'de> for Transformer {
#[serde(default)]
only_fields: Option<Vec<OwnedValuePath>>,
#[serde(default)]
except_fields: Option<Vec<String>>,
except_fields: Option<Vec<OwnedValuePath>>,
#[serde(default)]
timestamp_format: Option<TimestampFormat>,
}

let inner: TransformerInner = Deserialize::deserialize(deserializer)?;
Self::new(
inner.only_fields,
inner.except_fields,
inner
.only_fields
.map(|v| v.iter().map(|p| ConfigValuePath { 0: p.clone() }).collect()),
inner
.except_fields
.map(|v| v.iter().map(|p| ConfigValuePath { 0: p.clone() }).collect()),
inner.timestamp_format,
)
.map_err(serde::de::Error::custom)
Expand All @@ -66,13 +67,12 @@ impl Transformer {
/// Returns `Err` if `only_fields` and `except_fields` fail validation, i.e. are not mutually
/// exclusive.
pub fn new(
only_fields: Option<Vec<OwnedValuePath>>,
except_fields: Option<Vec<String>>,
only_fields: Option<Vec<ConfigValuePath>>,
except_fields: Option<Vec<ConfigValuePath>>,
timestamp_format: Option<TimestampFormat>,
) -> Result<Self, crate::Error> {
Self::validate_fields(only_fields.as_ref(), except_fields.as_ref())?;

let only_fields = only_fields.map(|x| x.into_iter().map(ConfigValuePath).collect());
Ok(Self {
only_fields,
except_fields,
Expand All @@ -87,7 +87,7 @@ impl Transformer {
}

/// Get the `Transformer`'s `except_fields`.
pub const fn except_fields(&self) -> &Option<Vec<String>> {
pub const fn except_fields(&self) -> &Option<Vec<ConfigValuePath>> {
&self.except_fields
}

Expand All @@ -100,14 +100,14 @@ impl Transformer {
///
/// If an error is returned, the entire encoding configuration should be considered inoperable.
fn validate_fields(
only_fields: Option<&Vec<OwnedValuePath>>,
except_fields: Option<&Vec<String>>,
only_fields: Option<&Vec<ConfigValuePath>>,
except_fields: Option<&Vec<ConfigValuePath>>,
) -> crate::Result<()> {
if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields) {
if except_fields.iter().any(|f| {
let path_iter = parse_value_path(f).unwrap();
only_fields.iter().any(|v| v == &path_iter)
}) {
if except_fields
.iter()
.any(|f| only_fields.iter().any(|v| v == f))
{
return Err(
"`except_fields` and `only_fields` should be mutually exclusive.".into(),
);
Expand Down Expand Up @@ -155,22 +155,19 @@ impl Transformer {
}

fn apply_except_fields(&self, log: &mut LogEvent) {
use lookup::path::TargetPath;

if let Some(except_fields) = self.except_fields.as_ref() {
let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE)
.map(|path| path.value_path().to_string());
.meaning_path(meaning::SERVICE);

for field in except_fields {
let value = log.remove(field.as_str());
let value = log.remove((PathPrefix::Event, field));

// If we are removing the service field we need to store this in a `dropped_fields` list as we may need to
// refer to this later when emitting metrics.
if let Some(v) = value {
if matches!(service_path.as_ref(), Some(path) if path == field) {
if matches!(service_path, Some(target_path) if target_path.path == field.0) {
log.metadata_mut()
.add_dropped_field(meaning::SERVICE.to_string(), v);
}
Expand Down Expand Up @@ -216,17 +213,12 @@ impl Transformer {
/// Returns `Err` if the new `except_fields` fail validation, i.e. are not mutually exclusive
/// with `only_fields`.
#[cfg(test)]
pub fn set_except_fields(&mut self, except_fields: Option<Vec<String>>) -> crate::Result<()> {
Self::validate_fields(
self.only_fields
.clone()
.map(|x| x.into_iter().map(|x| x.0).collect())
.as_ref(),
except_fields.as_ref(),
)?;

pub fn set_except_fields(
&mut self,
except_fields: Option<Vec<ConfigValuePath>>,
) -> crate::Result<()> {
Self::validate_fields(self.only_fields.as_ref(), except_fields.as_ref())?;
self.except_fields = except_fields;

Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use lookup::lookup_v2::ConfigValuePath;
use std::marker::PhantomData;

use vector_core::stream::BatcherSettings;
Expand Down Expand Up @@ -79,7 +80,7 @@ impl KinesisSinkBaseConfig {
/// Builds an aws_kinesis sink.
pub fn build_sink<C, R, RR, E, RT>(
config: &KinesisSinkBaseConfig,
partition_key_field: Option<String>,
partition_key_field: Option<ConfigValuePath>,
batch_settings: BatcherSettings,
client: C,
retry_logic: RT,
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/aws_kinesis/sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize};

use lookup::lookup_v2::ConfigValuePath;
use rand::random;
use vrl::path::PathPrefix;

use crate::{
internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
Expand All @@ -27,7 +29,7 @@ pub struct KinesisSink<S, R> {
pub batch_settings: BatcherSettings,
pub service: S,
pub request_builder: KinesisRequestBuilder<R>,
pub partition_key_field: Option<String>,
pub partition_key_field: Option<ConfigValuePath>,
pub _phantom: PhantomData<R>,
}

Expand All @@ -42,13 +44,11 @@ where
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let request_builder_concurrency_limit = NonZeroUsize::new(50);

let partition_key_field = self.partition_key_field.clone();

input
.filter_map(|event| {
// Panic: This sink only accepts Logs, so this should never panic
let log = event.into_log();
let processed = process_log(log, &partition_key_field);
let processed = process_log(log, self.partition_key_field.as_ref());

future::ready(processed)
})
Expand Down Expand Up @@ -106,14 +106,14 @@ where
/// events are emitted and None is returned.
pub(crate) fn process_log(
log: LogEvent,
partition_key_field: &Option<String>,
partition_key_field: Option<&ConfigValuePath>,
) -> Option<KinesisProcessedEvent> {
let partition_key = if let Some(partition_key_field) = partition_key_field {
if let Some(v) = log.get(partition_key_field.as_str()) {
if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) {
v.to_string_lossy()
} else {
emit!(AwsKinesisStreamNoPartitionKeyError {
partition_key_field
partition_key_field: partition_key_field.0.to_string().as_str()
});
return None;
}
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use aws_sdk_kinesis::{
types::SdkError,
};
use futures::FutureExt;
use lookup::lookup_v2::ConfigValuePath;
use snafu::Snafu;
use vector_config::{component::GenerateConfig, configurable_component};

Expand Down Expand Up @@ -80,7 +81,7 @@ pub struct KinesisStreamsSinkConfig {
///
/// If not specified, a unique partition key is generated for each Kinesis record.
#[configurable(metadata(docs::examples = "user_id"))]
pub partition_key_field: Option<String>,
pub partition_key_field: Option<ConfigValuePath>,

#[configurable(derived)]
#[serde(default)]
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/datadog/events/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{io, sync::Arc};

use bytes::Bytes;
use codecs::JsonSerializerConfig;
use lookup::lookup_v2::OwnedSegment;
use lookup::lookup_v2::ConfigValuePath;
use vector_common::request_metadata::{MetaDescriptive, RequestMetadata};
use vector_core::ByteSizeOf;

Expand Down Expand Up @@ -133,7 +133,7 @@ fn encoder() -> (Transformer, Encoder<()>) {
"title",
]
.iter()
.map(|field| vec![OwnedSegment::Field((*field).into())].into())
.map(|field| ConfigValuePath::try_from((*field).to_string()).unwrap())
.collect(),
);
// DataDog Event API requires unix timestamp.
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::{
transforms::metric_to_log::MetricToLogConfig,
};
use lookup::event_path;
use lookup::lookup_v2::ConfigValuePath;
use vector_core::schema::Requirement;
use vrl::value::Kind;

Expand Down Expand Up @@ -108,7 +109,7 @@ pub struct ElasticsearchConfig {
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(docs::examples = "id"))]
#[configurable(metadata(docs::examples = "_id"))]
pub id_key: Option<String>,
pub id_key: Option<ConfigValuePath>,

/// The name of the pipeline to apply.
#[serde(default)]
Expand Down
15 changes: 8 additions & 7 deletions src/sinks/elasticsearch/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::{fmt, num::NonZeroUsize};

use async_trait::async_trait;
use futures::{future, stream::BoxStream, StreamExt};
use lookup::lookup_v2::ConfigValuePath;
use tower::Service;
use vector_core::stream::{BatcherSettings, DriverResponse};
use vrl::path::PathPrefix;

use crate::{
codecs::Transformer,
Expand Down Expand Up @@ -34,7 +36,7 @@ pub struct ElasticsearchSink<S> {
pub service: S,
pub metric_to_log: MetricToLog,
pub mode: ElasticsearchCommonMode,
pub id_key_field: Option<String>,
pub id_key_field: Option<ConfigValuePath>,
}

impl<S> ElasticsearchSink<S> {
Expand Down Expand Up @@ -68,7 +70,7 @@ where
let request_builder_concurrency_limit = NonZeroUsize::new(50);

let mode = self.mode;
let id_key_field = self.id_key_field;
let id_key_field = self.id_key_field.as_ref();
let transformer = self.transformer.clone();

input
Expand All @@ -86,7 +88,7 @@ where
})
.filter_map(|x| async move { x })
.filter_map(move |log| {
future::ready(process_log(log, &mode, &id_key_field, &transformer))
future::ready(process_log(log, &mode, id_key_field, &transformer))
})
.batched(self.batch_settings.into_byte_size_config())
.request_builder(request_builder_concurrency_limit, self.request_builder)
Expand All @@ -110,7 +112,7 @@ where
pub(super) fn process_log(
mut log: LogEvent,
mode: &ElasticsearchCommonMode,
id_key_field: &Option<String>,
id_key_field: Option<&ConfigValuePath>,
transformer: &Transformer,
) -> Option<ProcessedEvent> {
let index = mode.index(&log)?;
Expand All @@ -120,9 +122,8 @@ pub(super) fn process_log(
cfg.sync_fields(&mut log);
cfg.remap_timestamp(&mut log);
};
let id = if let Some(Value::Bytes(key)) = id_key_field
.as_ref()
.and_then(|key| log.remove(key.as_str()))
let id = if let Some(Value::Bytes(key)) =
id_key_field.and_then(|key| log.remove((PathPrefix::Event, key)))
{
Some(String::from_utf8_lossy(&key).into_owned())
} else {
Expand Down
10 changes: 6 additions & 4 deletions src/sinks/gcp/stackdriver_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use bytes::Bytes;
use futures::{FutureExt, SinkExt};
use http::{Request, Uri};
use hyper::Body;
use lookup::lookup_v2::ConfigValuePath;
use serde_json::{json, map};
use snafu::Snafu;
use vector_config::configurable_component;
use vrl::path::PathPrefix;
use vrl::value::Kind;

use crate::{
Expand Down Expand Up @@ -71,7 +73,7 @@ pub struct StackdriverConfig {
/// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
/// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
#[configurable(metadata(docs::examples = "severity"))]
pub severity_key: Option<String>,
pub severity_key: Option<ConfigValuePath>,

#[serde(flatten)]
pub auth: GcpAuthConfig,
Expand Down Expand Up @@ -111,7 +113,7 @@ fn default_endpoint() -> String {
struct StackdriverSink {
config: StackdriverConfig,
auth: GcpAuthenticator,
severity_key: Option<String>,
severity_key: Option<ConfigValuePath>,
uri: Uri,
}

Expand Down Expand Up @@ -259,7 +261,7 @@ impl SinkConfig for StackdriverConfig {

struct StackdriverEventEncoder {
config: StackdriverConfig,
severity_key: Option<String>,
severity_key: Option<ConfigValuePath>,
}

impl HttpEventEncoder<serde_json::Value> for StackdriverEventEncoder {
Expand Down Expand Up @@ -294,7 +296,7 @@ impl HttpEventEncoder<serde_json::Value> for StackdriverEventEncoder {
let severity = self
.severity_key
.as_ref()
.and_then(|key| log.remove(key.as_str()))
.and_then(|key| log.remove((PathPrefix::Event, &key.0)))
.map(remap_severity)
.unwrap_or_else(|| 0.into());

Expand Down
4 changes: 2 additions & 2 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use codecs::JsonSerializerConfig;
use lookup::lookup_v2::OptionalValuePath;
use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath};
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;

Expand Down Expand Up @@ -85,7 +85,7 @@ pub struct HumioLogsConfig {
///
/// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
#[serde(default)]
pub(super) indexed_fields: Vec<String>,
pub(super) indexed_fields: Vec<ConfigValuePath>,

/// Optional name of the repository to ingest into.
///
Expand Down
Loading

0 comments on commit 27f524f

Please sign in to comment.