From dcf7f9ae538c821eb7b3baf494d3e8938083832c Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Mon, 26 Jun 2023 14:03:39 +0100 Subject: [PATCH] chore(observability): emit `component_sent` events by `source` and `service` (#17549) Closes #17580 Closes #17581 This is still in draft until I can get the following done. - [ ] ~~There are way more clones that I am happy with here, especially since this is in a hot path. These need reducing.~~ The remaining clones that I would like to remove are in the `get_tags` functions. This didn't seem trivial, and given the fairly positive regression numbers, I think it should be ok to defer for now. - [x] Function documentation. - [ ] Currently source schemas aren't being attached to the event at runtime, so the service meaning can't be retrieved. That won't work until this has been done. This will be a separate PR - #17692 - [x] I've only tested this with the kafka sink so far. I think it should work with all Stream sinks without needing any further modification - but further testing is needed. - [x] Tests. A bunch of tests need writing. - [x] The Vector source tests are failing I think because we now have `EventsSent` and `TaggedEventsSent` which both emit `component_sent_event` events and the test framework doesn't like this. This needs fixed. - [ ] We will need to review every sink to ensure they work with this. All the stream based sinks should, but the others are highly likely to need some work. --------- Signed-off-by: Stephen Wakely --- docs/tutorials/sinks/2_http_sink.md | 4 +- lib/vector-common/Cargo.toml | 2 +- .../src/internal_event/cached_event.rs | 69 ++++ .../src/internal_event/events_sent.rs | 65 +++- lib/vector-common/src/internal_event/mod.rs | 40 ++- .../src/internal_event/optional_tag.rs | 14 + lib/vector-common/src/request_metadata.rs | 312 +++++++++++++++++- lib/vector-core/src/config/global_options.rs | 15 + lib/vector-core/src/config/mod.rs | 2 + lib/vector-core/src/config/telemetry.rs | 93 ++++++ lib/vector-core/src/event/array.rs | 5 +- lib/vector-core/src/event/log_event.rs | 24 +- lib/vector-core/src/event/metadata.rs | 16 +- lib/vector-core/src/event/metric/mod.rs | 30 +- lib/vector-core/src/event/mod.rs | 26 +- lib/vector-core/src/event/trace.rs | 12 +- lib/vector-core/src/stream/driver.rs | 55 +-- src/app.rs | 2 + src/config/mod.rs | 2 +- src/sinks/amqp/request_builder.rs | 5 +- src/sinks/amqp/service.rs | 21 +- src/sinks/amqp/sink.rs | 19 -- .../aws_cloudwatch_logs/request_builder.rs | 10 +- src/sinks/aws_cloudwatch_logs/service.rs | 42 +-- src/sinks/aws_cloudwatch_logs/sink.rs | 13 +- src/sinks/aws_kinesis/firehose/record.rs | 3 +- src/sinks/aws_kinesis/request_builder.rs | 10 +- src/sinks/aws_kinesis/service.rs | 15 +- src/sinks/aws_kinesis/sink.rs | 15 +- src/sinks/aws_kinesis/streams/record.rs | 3 +- src/sinks/aws_sqs/request_builder.rs | 10 +- src/sinks/aws_sqs/service.rs | 17 +- src/sinks/azure_common/config.rs | 19 +- src/sinks/azure_common/service.rs | 5 +- src/sinks/databend/service.rs | 20 +- src/sinks/datadog/events/request_builder.rs | 10 +- src/sinks/datadog/events/service.rs | 15 +- src/sinks/datadog/logs/service.rs | 30 +- src/sinks/datadog/logs/tests.rs | 58 +++- src/sinks/datadog/metrics/service.rs | 30 +- src/sinks/datadog/traces/service.rs | 30 +- src/sinks/elasticsearch/encoder.rs | 11 +- src/sinks/elasticsearch/retry.rs | 6 +- src/sinks/elasticsearch/service.rs | 23 +- src/sinks/gcp/chronicle_unstructured.rs | 10 +- src/sinks/gcs_common/service.rs | 19 +- src/sinks/kafka/request_builder.rs | 2 +- src/sinks/kafka/service.rs | 20 +- src/sinks/kafka/tests.rs | 105 +++++- src/sinks/loki/event.rs | 12 +- src/sinks/loki/service.rs | 18 +- src/sinks/loki/sink.rs | 3 + src/sinks/new_relic/service.rs | 20 +- src/sinks/opendal_common.rs | 23 +- src/sinks/prelude.rs | 4 +- src/sinks/pulsar/request_builder.rs | 2 +- src/sinks/pulsar/service.rs | 17 +- src/sinks/s3_common/service.rs | 35 +- src/sinks/splunk_hec/common/request.rs | 8 +- src/sinks/splunk_hec/common/response.rs | 9 +- src/sinks/splunk_hec/common/service.rs | 7 +- src/sinks/statsd/service.rs | 18 +- src/sinks/util/metadata.rs | 63 +++- src/sinks/util/processed_event.rs | 14 +- src/sinks/vector/service.rs | 35 +- src/sources/kubernetes_logs/mod.rs | 5 +- src/sources/vector/mod.rs | 26 +- src/test_util/components.rs | 39 +++ src/topology/builder.rs | 5 +- src/topology/test/compliance.rs | 12 +- src/topology/test/mod.rs | 32 +- src/transforms/aggregate.rs | 4 +- src/transforms/dedupe.rs | 30 +- src/transforms/filter.rs | 3 +- src/transforms/log_to_metric.rs | 23 +- src/transforms/metric_to_log.rs | 13 +- src/transforms/tag_cardinality_limit/tests.rs | 24 +- 77 files changed, 1387 insertions(+), 501 deletions(-) create mode 100644 lib/vector-common/src/internal_event/cached_event.rs create mode 100644 lib/vector-common/src/internal_event/optional_tag.rs create mode 100644 lib/vector-core/src/config/telemetry.rs diff --git a/docs/tutorials/sinks/2_http_sink.md b/docs/tutorials/sinks/2_http_sink.md index 7090ef41a88d1..66fcb2e4d6f97 100644 --- a/docs/tutorials/sinks/2_http_sink.md +++ b/docs/tutorials/sinks/2_http_sink.md @@ -366,9 +366,9 @@ impl DriverResponse for BasicResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { + fn events_sent(&self) -> RequestCountByteSize { // (events count, byte size) - CountByteSize(1, self.byte_size) + CountByteSize(1, self.byte_size).into() } } ``` diff --git a/lib/vector-common/Cargo.toml b/lib/vector-common/Cargo.toml index 1fde826696bad..0f38b78fdea30 100644 --- a/lib/vector-common/Cargo.toml +++ b/lib/vector-common/Cargo.toml @@ -46,7 +46,7 @@ bytes = { version = "1.4.0", default-features = false, optional = true } chrono-tz = { version = "0.8.2", default-features = false, features = ["serde"] } chrono = { version = "0.4", default-features = false, optional = true, features = ["clock"] } crossbeam-utils = { version = "0.8.16", default-features = false } -derivative = "2.1.3" +derivative = { version = "2.2.0", default-features = false } futures = { version = "0.3.28", default-features = false, features = ["std"] } indexmap = { version = "~1.9.3", default-features = false } metrics = "0.21.0" diff --git a/lib/vector-common/src/internal_event/cached_event.rs b/lib/vector-common/src/internal_event/cached_event.rs new file mode 100644 index 0000000000000..e672848c93584 --- /dev/null +++ b/lib/vector-common/src/internal_event/cached_event.rs @@ -0,0 +1,69 @@ +use std::{ + collections::BTreeMap, + sync::{Arc, RwLock}, +}; + +use derivative::Derivative; + +use super::{InternalEventHandle, RegisterInternalEvent}; + +/// Metrics (eg. `component_sent_event_bytes_total`) may need to emit tags based on +/// values contained within the events. These tags can't be determined in advance. +/// +/// Metrics need to be registered and the handle needs to be held onto in order to +/// prevent them from expiring and being dropped (this would result in the counter +/// resetting to zero). +/// `CachedEvent` is used to maintain a store of these registered metrics. When a +/// new event is emitted for a previously unseen set of tags an event is registered +/// and stored in the cache. +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Default(bound = ""))] +pub struct RegisteredEventCache { + cache: Arc< + RwLock< + BTreeMap< + ::Tags, + ::Handle, + >, + >, + >, +} + +/// This trait must be implemented by events that emit dynamic tags. `register` must +/// be implemented to register an event based on the set of tags passed. +pub trait RegisterTaggedInternalEvent: RegisterInternalEvent { + /// The type that will contain the data necessary to extract the tags + /// that will be used when registering the event. + type Tags; + + fn register(tags: Self::Tags) -> ::Handle; +} + +impl RegisteredEventCache +where + Data: Sized, + EventHandle: InternalEventHandle, + Tags: Ord + Clone, + Event: RegisterInternalEvent + RegisterTaggedInternalEvent, +{ + /// Emits the event with the given tags. + /// It will register the event and store in the cache if this has not already + /// been done. + /// + /// # Panics + /// + /// This will panic if the lock is poisoned. + pub fn emit(&self, tags: &Tags, value: Data) { + let read = self.cache.read().unwrap(); + if let Some(event) = read.get(tags) { + event.emit(value); + } else { + let event = ::register(tags.clone()); + event.emit(value); + + // Ensure the read lock is dropped so we can write. + drop(read); + self.cache.write().unwrap().insert(tags.clone(), event); + } + } +} diff --git a/lib/vector-common/src/internal_event/events_sent.rs b/lib/vector-common/src/internal_event/events_sent.rs index d329562afe7fc..d12a22bf17e8a 100644 --- a/lib/vector-common/src/internal_event/events_sent.rs +++ b/lib/vector-common/src/internal_event/events_sent.rs @@ -1,7 +1,11 @@ +use std::sync::Arc; + use metrics::{register_counter, Counter}; use tracing::trace; -use super::{CountByteSize, Output, SharedString}; +use crate::{config::ComponentKey, request_metadata::EventCountTags}; + +use super::{CountByteSize, OptionalTag, Output, SharedString}; pub const DEFAULT_OUTPUT: &str = "_default"; @@ -44,3 +48,62 @@ impl From for EventsSent { Self { output: output.0 } } } + +/// Makes a list of the tags to use with the events sent event. +fn make_tags( + source: &OptionalTag>, + service: &OptionalTag, +) -> Vec<(&'static str, String)> { + let mut tags = Vec::new(); + if let OptionalTag::Specified(tag) = source { + tags.push(( + "source", + tag.as_ref() + .map_or_else(|| "-".to_string(), |tag| tag.id().to_string()), + )); + } + + if let OptionalTag::Specified(tag) = service { + tags.push(("service", tag.clone().unwrap_or("-".to_string()))); + } + + tags +} + +crate::registered_event!( + TaggedEventsSent { + source: OptionalTag>, + service: OptionalTag, + } => { + events: Counter = { + register_counter!("component_sent_events_total", &make_tags(&self.source, &self.service)) + }, + event_bytes: Counter = { + register_counter!("component_sent_event_bytes_total", &make_tags(&self.source, &self.service)) + }, + } + + fn emit(&self, data: CountByteSize) { + let CountByteSize(count, byte_size) = data; + trace!(message = "Events sent.", %count, %byte_size); + + self.events.increment(count as u64); + self.event_bytes.increment(byte_size.get() as u64); + } + + fn register(tags: EventCountTags) { + super::register(TaggedEventsSent::new( + tags, + )) + } +); + +impl TaggedEventsSent { + #[must_use] + pub fn new(tags: EventCountTags) -> Self { + Self { + source: tags.source, + service: tags.service, + } + } +} diff --git a/lib/vector-common/src/internal_event/mod.rs b/lib/vector-common/src/internal_event/mod.rs index 7af70cc1322ee..4ce2f5335ac49 100644 --- a/lib/vector-common/src/internal_event/mod.rs +++ b/lib/vector-common/src/internal_event/mod.rs @@ -1,18 +1,25 @@ mod bytes_received; mod bytes_sent; +mod cached_event; pub mod component_events_dropped; mod events_received; mod events_sent; +mod optional_tag; mod prelude; pub mod service; +use std::ops::{Add, AddAssign}; + pub use metrics::SharedString; pub use bytes_received::BytesReceived; pub use bytes_sent::BytesSent; +#[allow(clippy::module_name_repetitions)] +pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache}; pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL}; pub use events_received::EventsReceived; -pub use events_sent::{EventsSent, DEFAULT_OUTPUT}; +pub use events_sent::{EventsSent, TaggedEventsSent, DEFAULT_OUTPUT}; +pub use optional_tag::OptionalTag; pub use prelude::{error_stage, error_type}; pub use service::{CallError, PollReadyError}; @@ -109,9 +116,24 @@ pub struct ByteSize(pub usize); pub struct Count(pub usize); /// Holds the tuple `(count_of_events, estimated_json_size_of_events)`. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct CountByteSize(pub usize, pub JsonSize); +impl AddAssign for CountByteSize { + fn add_assign(&mut self, rhs: Self) { + self.0 += rhs.0; + self.1 += rhs.1; + } +} + +impl Add for CountByteSize { + type Output = CountByteSize; + + fn add(self, rhs: CountByteSize) -> Self::Output { + CountByteSize(self.0 + rhs.0, self.1 + rhs.1) + } +} + // Wrapper types used to hold parameters for registering events pub struct Output(pub Option); @@ -196,6 +218,9 @@ macro_rules! registered_event { fn emit(&$slf:ident, $data_name:ident: $data:ident) $emit_body:block + + $(fn register($tags_name:ident: $tags:ty) + $register_body:block)? ) => { paste::paste!{ #[derive(Clone)] @@ -223,6 +248,17 @@ macro_rules! registered_event { fn emit(&$slf, $data_name: $data) $emit_body } + + $(impl $crate::internal_event::cached_event::RegisterTaggedInternalEvent for $event { + type Tags = $tags; + + fn register( + $tags_name: $tags, + ) -> ::Handle { + $register_body + } + })? + } }; } diff --git a/lib/vector-common/src/internal_event/optional_tag.rs b/lib/vector-common/src/internal_event/optional_tag.rs new file mode 100644 index 0000000000000..400bc554630d1 --- /dev/null +++ b/lib/vector-common/src/internal_event/optional_tag.rs @@ -0,0 +1,14 @@ +/// The user can configure whether a tag should be emitted. If they configure it to +/// be emitted, but the value doesn't exist - we should emit the tag but with a value +/// of `-`. +#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)] +pub enum OptionalTag { + Ignored, + Specified(Option), +} + +impl From> for OptionalTag { + fn from(value: Option) -> Self { + Self::Specified(value) + } +} diff --git a/lib/vector-common/src/request_metadata.rs b/lib/vector-common/src/request_metadata.rs index cce6124361b60..d28d7da681a58 100644 --- a/lib/vector-common/src/request_metadata.rs +++ b/lib/vector-common/src/request_metadata.rs @@ -1,16 +1,207 @@ use std::ops::Add; +use std::{collections::HashMap, sync::Arc}; -use crate::json_size::JsonSize; +use crate::{ + config::ComponentKey, + internal_event::{ + CountByteSize, InternalEventHandle, OptionalTag, RegisterTaggedInternalEvent, + RegisteredEventCache, + }, + json_size::JsonSize, +}; + +/// Tags that are used to group the events within a batch for emitting telemetry. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct EventCountTags { + pub source: OptionalTag>, + pub service: OptionalTag, +} + +impl EventCountTags { + #[must_use] + pub fn new_empty() -> Self { + Self { + source: OptionalTag::Specified(None), + service: OptionalTag::Specified(None), + } + } +} + +/// Must be implemented by events to get the tags that will be attached to +/// the `component_sent_event_*` emitted metrics. +pub trait GetEventCountTags { + fn get_tags(&self) -> EventCountTags; +} + +/// Keeps track of the estimated json size of a given batch of events by +/// source and service. +#[derive(Clone, Debug)] +pub enum GroupedCountByteSize { + /// When we need to keep track of the events by certain tags we use this + /// variant. + Tagged { + sizes: HashMap, + }, + /// If we don't need to track the events by certain tags we can use + /// this variant to avoid allocating a `HashMap`, + Untagged { size: CountByteSize }, +} + +impl Default for GroupedCountByteSize { + fn default() -> Self { + Self::Untagged { + size: CountByteSize(0, JsonSize::zero()), + } + } +} + +impl GroupedCountByteSize { + /// Creates a new Tagged variant for when we need to track events by + /// certain tags. + #[must_use] + pub fn new_tagged() -> Self { + Self::Tagged { + sizes: HashMap::new(), + } + } + + /// Creates a new Tagged variant for when we do not need to track events by + /// tags. + #[must_use] + pub fn new_untagged() -> Self { + Self::Untagged { + size: CountByteSize(0, JsonSize::zero()), + } + } + + /// Returns a `HashMap` of tags => event counts for when we are tracking by tags. + /// Returns `None` if we are not tracking by tags. + #[must_use] + #[cfg(test)] + pub fn sizes(&self) -> Option<&HashMap> { + match self { + Self::Tagged { sizes } => Some(sizes), + Self::Untagged { .. } => None, + } + } + + /// Returns a single count for when we are not tracking by tags. + #[must_use] + #[cfg(test)] + fn size(&self) -> Option { + match self { + Self::Tagged { .. } => None, + Self::Untagged { size } => Some(*size), + } + } + + /// Adds the given estimated json size of the event to current count. + pub fn add_event(&mut self, event: &E, json_size: JsonSize) + where + E: GetEventCountTags, + { + match self { + Self::Tagged { sizes } => { + let size = CountByteSize(1, json_size); + let tags = event.get_tags(); + + match sizes.get_mut(&tags) { + Some(current) => { + *current += size; + } + None => { + sizes.insert(tags, size); + } + } + } + Self::Untagged { size } => { + *size += CountByteSize(1, json_size); + } + } + } + + /// Emits our counts to a `RegisteredEvent` cached event. + pub fn emit_event(&self, event_cache: &RegisteredEventCache) + where + T: RegisterTaggedInternalEvent, + H: InternalEventHandle, + { + match self { + GroupedCountByteSize::Tagged { sizes } => { + for (tags, size) in sizes { + event_cache.emit(tags, *size); + } + } + GroupedCountByteSize::Untagged { size } => { + event_cache.emit(&EventCountTags::new_empty(), *size); + } + } + } +} + +impl From for GroupedCountByteSize { + fn from(value: CountByteSize) -> Self { + Self::Untagged { size: value } + } +} + +impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize { + type Output = GroupedCountByteSize; + + fn add(self, other: &'a Self::Output) -> Self::Output { + match (self, other) { + (Self::Tagged { sizes: mut us }, Self::Tagged { sizes: them }) => { + for (key, value) in them { + match us.get_mut(key) { + Some(size) => *size += *value, + None => { + us.insert(key.clone(), *value); + } + } + } + + Self::Tagged { sizes: us } + } + + (Self::Untagged { size: us }, Self::Untagged { size: them }) => { + Self::Untagged { size: us + *them } + } + + // The following two scenarios shouldn't really occur in practice, but are provided for completeness. + (Self::Tagged { mut sizes }, Self::Untagged { size }) => { + match sizes.get_mut(&EventCountTags::new_empty()) { + Some(empty_size) => *empty_size += *size, + None => { + sizes.insert(EventCountTags::new_empty(), *size); + } + } + + Self::Tagged { sizes } + } + (Self::Untagged { size }, Self::Tagged { sizes }) => { + let mut sizes = sizes.clone(); + match sizes.get_mut(&EventCountTags::new_empty()) { + Some(empty_size) => *empty_size += size, + None => { + sizes.insert(EventCountTags::new_empty(), size); + } + } + + Self::Tagged { sizes } + } + } + } +} /// Metadata for batch requests. -#[derive(Clone, Copy, Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct RequestMetadata { /// Number of events represented by this batch request. event_count: usize, /// Size, in bytes, of the in-memory representation of all events in this batch request. events_byte_size: usize, /// Size, in bytes, of the estimated JSON-encoded representation of all events in this batch request. - events_estimated_json_encoded_byte_size: JsonSize, + events_estimated_json_encoded_byte_size: GroupedCountByteSize, /// Uncompressed size, in bytes, of the encoded events in this batch request. request_encoded_size: usize, /// On-the-wire size, in bytes, of the batch request itself after compression, etc. @@ -19,7 +210,6 @@ pub struct RequestMetadata { request_wire_size: usize, } -// TODO: Make this struct the object which emits the actual internal telemetry i.e. events sent, bytes sent, etc. impl RequestMetadata { #[must_use] pub fn new( @@ -27,7 +217,7 @@ impl RequestMetadata { events_byte_size: usize, request_encoded_size: usize, request_wire_size: usize, - events_estimated_json_encoded_byte_size: JsonSize, + events_estimated_json_encoded_byte_size: GroupedCountByteSize, ) -> Self { Self { event_count, @@ -49,7 +239,14 @@ impl RequestMetadata { } #[must_use] - pub const fn events_estimated_json_encoded_byte_size(&self) -> JsonSize { + pub fn events_estimated_json_encoded_byte_size(&self) -> &GroupedCountByteSize { + &self.events_estimated_json_encoded_byte_size + } + + /// Consumes the object and returns the byte size of the request grouped by + /// the tags (source and service). + #[must_use] + pub fn into_events_estimated_json_encoded_byte_size(self) -> GroupedCountByteSize { self.events_estimated_json_encoded_byte_size } @@ -66,7 +263,7 @@ impl RequestMetadata { /// Constructs a `RequestMetadata` by summation of the "batch" of `RequestMetadata` provided. #[must_use] pub fn from_batch>(metadata_iter: T) -> Self { - let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, JsonSize::zero()); + let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, GroupedCountByteSize::default()); for metadata in metadata_iter { metadata_sum = metadata_sum + &metadata; @@ -84,7 +281,7 @@ impl<'a> Add<&'a RequestMetadata> for RequestMetadata { event_count: self.event_count + other.event_count, events_byte_size: self.events_byte_size + other.events_byte_size, events_estimated_json_encoded_byte_size: self.events_estimated_json_encoded_byte_size - + other.events_estimated_json_encoded_byte_size, + + &other.events_estimated_json_encoded_byte_size, request_encoded_size: self.request_encoded_size + other.request_encoded_size, request_wire_size: self.request_wire_size + other.request_wire_size, } @@ -94,5 +291,102 @@ impl<'a> Add<&'a RequestMetadata> for RequestMetadata { /// Objects implementing this trait have metadata that describes the request. pub trait MetaDescriptive { /// Returns the `RequestMetadata` associated with this object. - fn get_metadata(&self) -> RequestMetadata; + fn get_metadata(&self) -> &RequestMetadata; + + // Returns a mutable reference to the `RequestMetadata` associated with this object. + fn metadata_mut(&mut self) -> &mut RequestMetadata; +} + +#[cfg(test)] +mod tests { + use super::*; + + struct DummyEvent { + source: OptionalTag>, + service: OptionalTag, + } + + impl GetEventCountTags for DummyEvent { + fn get_tags(&self) -> EventCountTags { + EventCountTags { + source: self.source.clone(), + service: self.service.clone(), + } + } + } + + #[test] + fn add_request_count_bytesize_event_untagged() { + let mut bytesize = GroupedCountByteSize::new_untagged(); + let event = DummyEvent { + source: Some(Arc::new(ComponentKey::from("carrot"))).into(), + service: Some("cabbage".to_string()).into(), + }; + + bytesize.add_event(&event, JsonSize::new(42)); + + let event = DummyEvent { + source: Some(Arc::new(ComponentKey::from("pea"))).into(), + service: Some("potato".to_string()).into(), + }; + + bytesize.add_event(&event, JsonSize::new(36)); + + assert_eq!(Some(CountByteSize(2, JsonSize::new(78))), bytesize.size()); + assert_eq!(None, bytesize.sizes()); + } + + #[test] + fn add_request_count_bytesize_event_tagged() { + let mut bytesize = GroupedCountByteSize::new_tagged(); + let event = DummyEvent { + source: OptionalTag::Ignored, + service: Some("cabbage".to_string()).into(), + }; + + bytesize.add_event(&event, JsonSize::new(42)); + + let event = DummyEvent { + source: OptionalTag::Ignored, + service: Some("cabbage".to_string()).into(), + }; + + bytesize.add_event(&event, JsonSize::new(36)); + + let event = DummyEvent { + source: OptionalTag::Ignored, + service: Some("tomato".to_string()).into(), + }; + + bytesize.add_event(&event, JsonSize::new(23)); + + assert_eq!(None, bytesize.size()); + let mut sizes = bytesize + .sizes() + .unwrap() + .clone() + .into_iter() + .collect::>(); + sizes.sort(); + + assert_eq!( + vec![ + ( + EventCountTags { + source: OptionalTag::Ignored, + service: Some("cabbage".to_string()).into() + }, + CountByteSize(2, JsonSize::new(78)) + ), + ( + EventCountTags { + source: OptionalTag::Ignored, + service: Some("tomato".to_string()).into() + }, + CountByteSize(1, JsonSize::new(23)) + ), + ], + sizes + ); + } } diff --git a/lib/vector-core/src/config/global_options.rs b/lib/vector-core/src/config/global_options.rs index 3e63d863f5ab0..af86b177095d5 100644 --- a/lib/vector-core/src/config/global_options.rs +++ b/lib/vector-core/src/config/global_options.rs @@ -5,6 +5,7 @@ use vector_common::TimeZone; use vector_config::configurable_component; use super::super::default_data_dir; +use super::Telemetry; use super::{proxy::ProxyConfig, AcknowledgementsConfig, LogSchema}; use crate::serde::bool_or_struct; @@ -55,6 +56,16 @@ pub struct GlobalOptions { )] pub log_schema: LogSchema, + /// Telemetry options. + /// + /// Determines whether `source` and `service` tags should be emitted with the + /// `component_sent_*` and `component_received_*` events. + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub telemetry: Telemetry, + /// The name of the time zone to apply to timestamp conversions that do not contain an explicit time zone. /// /// The time zone name may be any name in the [TZ database][tzdb] or `local` to indicate system @@ -218,10 +229,14 @@ impl GlobalOptions { errors.extend(merge_errors); } + let mut telemetry = self.telemetry.clone(); + telemetry.merge(&with.telemetry); + if errors.is_empty() { Ok(Self { data_dir, log_schema, + telemetry, acknowledgements: self.acknowledgements.merge_default(&with.acknowledgements), timezone: self.timezone.or(with.timezone), proxy: self.proxy.merge(&with.proxy), diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 97cbc091d8f5e..3ff5152a293a7 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -8,6 +8,7 @@ mod global_options; mod log_schema; pub mod output_id; pub mod proxy; +mod telemetry; use crate::event::LogEvent; pub use global_options::GlobalOptions; @@ -15,6 +16,7 @@ pub use log_schema::{init_log_schema, log_schema, LogSchema}; use lookup::{lookup_v2::ValuePath, path, PathPrefix}; pub use output_id::OutputId; use serde::{Deserialize, Serialize}; +pub use telemetry::{init_telemetry, telemetry, Tags, Telemetry}; pub use vector_common::config::ComponentKey; use vector_config::configurable_component; use vrl::value::Value; diff --git a/lib/vector-core/src/config/telemetry.rs b/lib/vector-core/src/config/telemetry.rs new file mode 100644 index 0000000000000..71348c509ef94 --- /dev/null +++ b/lib/vector-core/src/config/telemetry.rs @@ -0,0 +1,93 @@ +use once_cell::sync::{Lazy, OnceCell}; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_config::configurable_component; + +static TELEMETRY: OnceCell = OnceCell::new(); +static TELEMETRY_DEFAULT: Lazy = Lazy::new(Telemetry::default); + +/// Loads the telemetry options from configurations and sets the global options. +/// Once this is done, configurations can be correctly loaded using configured +/// log schema defaults. +/// +/// # Errors +/// +/// This function will fail if the `builder` fails. +/// +/// # Panics +/// +/// If deny is set, will panic if telemetry has already been set. +pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) { + assert!( + !(TELEMETRY.set(telemetry).is_err() && deny_if_set), + "Couldn't set telemetry" + ); +} + +/// Returns the telemetry configuration options. +pub fn telemetry() -> &'static Telemetry { + TELEMETRY.get().unwrap_or(&TELEMETRY_DEFAULT) +} + +/// Sets options for the telemetry that Vector emits. +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq, Default)] +#[serde(default)] +pub struct Telemetry { + #[configurable(derived)] + pub tags: Tags, +} + +impl Telemetry { + /// Merge two `Telemetry` instances together. + pub fn merge(&mut self, other: &Telemetry) { + self.tags.emit_service = self.tags.emit_service || other.tags.emit_service; + self.tags.emit_source = self.tags.emit_source || other.tags.emit_source; + } + + /// Returns true if any of the tag options are true. + pub fn has_tags(&self) -> bool { + self.tags.emit_service || self.tags.emit_source + } + + pub fn tags(&self) -> &Tags { + &self.tags + } + + /// The variant of `GroupedCountByteSize` + pub fn create_request_count_byte_size(&self) -> GroupedCountByteSize { + if self.has_tags() { + GroupedCountByteSize::new_tagged() + } else { + GroupedCountByteSize::new_untagged() + } + } +} + +/// Configures whether to emit certain tags +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq, Default)] +#[serde(default)] +pub struct Tags { + /// True if the `service` tag should be emitted + /// in the `component_received_*` and `component_sent_*` + /// telemetry. + pub emit_service: bool, + + /// True if the `source` tag should be emitted + /// in the `component_received_*` and `component_sent_*` + /// telemetry. + pub emit_source: bool, +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn partial_telemetry() { + let toml = r#" + emit_source = true + "#; + toml::from_str::(toml).unwrap(); + } +} diff --git a/lib/vector-core/src/event/array.rs b/lib/vector-core/src/event/array.rs index da87a6f6a8074..9cdafbcf569d4 100644 --- a/lib/vector-core/src/event/array.rs +++ b/lib/vector-core/src/event/array.rs @@ -9,6 +9,7 @@ use futures::{stream, Stream}; use quickcheck::{Arbitrary, Gen}; use vector_buffers::EventCount; use vector_common::{ + config::ComponentKey, finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable}, json_size::JsonSize, }; @@ -17,7 +18,7 @@ use super::{ EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMutRef, EventRef, LogEvent, Metric, TraceEvent, }; -use crate::{config::OutputId, ByteSizeOf}; +use crate::ByteSizeOf; /// The type alias for an array of `LogEvent` elements. pub type LogArray = Vec; @@ -142,7 +143,7 @@ pub enum EventArray { impl EventArray { /// Sets the `OutputId` in the metadata for all the events in this array. - pub fn set_output_id(&mut self, output_id: &Arc) { + pub fn set_output_id(&mut self, output_id: &Arc) { match self { EventArray::Logs(logs) => { for log in logs { diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index beb0afdec1e32..c782476b5c515 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -15,7 +15,9 @@ use lookup::lookup_v2::TargetPath; use lookup::PathPrefix; use serde::{Deserialize, Serialize, Serializer}; use vector_common::{ + internal_event::OptionalTag, json_size::{JsonSize, NonZeroJsonSize}, + request_metadata::{EventCountTags, GetEventCountTags}, EventDataEq, }; @@ -25,8 +27,8 @@ use super::{ metadata::EventMetadata, util, EventFinalizers, Finalizable, Value, }; -use crate::config::log_schema; use crate::config::LogNamespace; +use crate::config::{log_schema, telemetry}; use crate::{event::MaybeAsLogMut, ByteSizeOf}; use lookup::{metadata_path, path}; @@ -212,6 +214,26 @@ impl EstimatedJsonEncodedSizeOf for LogEvent { } } +impl GetEventCountTags for LogEvent { + fn get_tags(&self) -> EventCountTags { + let source = if telemetry().tags().emit_source { + self.metadata().source_id().cloned().into() + } else { + OptionalTag::Ignored + }; + + let service = if telemetry().tags().emit_service { + self.get_by_meaning("service") + .map(ToString::to_string) + .into() + } else { + OptionalTag::Ignored + }; + + EventCountTags { source, service } + } +} + impl LogEvent { #[must_use] pub fn new_with_metadata(metadata: EventMetadata) -> Self { diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 403b43bfc52b9..f13bee6a5e009 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -1,15 +1,13 @@ #![deny(missing_docs)] -use std::collections::BTreeMap; -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; use serde::{Deserialize, Serialize}; -use vector_common::EventDataEq; +use vector_common::{config::ComponentKey, EventDataEq}; use vrl::value::{Kind, Secrets, Value}; use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus}; -use crate::config::{LogNamespace, OutputId}; -use crate::{schema, ByteSizeOf}; +use crate::{config::LogNamespace, schema, ByteSizeOf}; const DATADOG_API_KEY: &str = "datadog_api_key"; const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token"; @@ -30,7 +28,7 @@ pub struct EventMetadata { finalizers: EventFinalizers, /// The id of the source - source_id: Option>, + source_id: Option>, /// An identifier for a globally registered schema definition which provides information about /// the event shape (type information, and semantic meaning of fields). @@ -75,12 +73,12 @@ impl EventMetadata { /// Returns a reference to the metadata source. #[must_use] - pub fn source_id(&self) -> Option<&OutputId> { - self.source_id.as_deref() + pub fn source_id(&self) -> Option<&Arc> { + self.source_id.as_ref() } /// Sets the `source_id` in the metadata to the provided value. - pub fn set_source_id(&mut self, source_id: Arc) { + pub fn set_source_id(&mut self, source_id: Arc) { self.source_id = Some(source_id); } diff --git a/lib/vector-core/src/event/metric/mod.rs b/lib/vector-core/src/event/metric/mod.rs index 141d3b28997d9..fa62bec7ec52c 100644 --- a/lib/vector-core/src/event/metric/mod.rs +++ b/lib/vector-core/src/event/metric/mod.rs @@ -11,10 +11,16 @@ use std::{ }; use chrono::{DateTime, Utc}; -use vector_common::{json_size::JsonSize, EventDataEq}; +use vector_common::{ + internal_event::OptionalTag, + json_size::JsonSize, + request_metadata::{EventCountTags, GetEventCountTags}, + EventDataEq, +}; use vector_config::configurable_component; use crate::{ + config::telemetry, event::{ estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf, BatchNotifier, EventFinalizer, EventFinalizers, EventMetadata, Finalizable, @@ -476,6 +482,28 @@ impl Finalizable for Metric { } } +impl GetEventCountTags for Metric { + fn get_tags(&self) -> EventCountTags { + let source = if telemetry().tags().emit_source { + self.metadata().source_id().cloned().into() + } else { + OptionalTag::Ignored + }; + + // Currently there is no way to specify a tag that means the service, + // so we will be hardcoding it to "service". + let service = if telemetry().tags().emit_service { + self.tags() + .and_then(|tags| tags.get("service").map(ToString::to_string)) + .into() + } else { + OptionalTag::Ignored + }; + + EventCountTags { source, service } + } +} + /// Metric kind. /// /// Metrics can be either absolute of incremental. Absolute metrics represent a sort of "last write wins" scenario, diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index 04522793e3436..ae2e51e8a23a8 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, }; -use crate::{config::OutputId, ByteSizeOf}; +use crate::ByteSizeOf; pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray}; pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf; pub use finalization::{ @@ -19,7 +19,13 @@ pub use r#ref::{EventMutRef, EventRef}; use serde::{Deserialize, Serialize}; pub use trace::TraceEvent; use vector_buffers::EventCount; -use vector_common::{finalization, json_size::JsonSize, EventDataEq}; +use vector_common::{ + config::ComponentKey, + finalization, + json_size::JsonSize, + request_metadata::{EventCountTags, GetEventCountTags}, + EventDataEq, +}; pub use vrl::value::Value; #[cfg(feature = "vrl")] pub use vrl_target::{TargetEvents, VrlTarget}; @@ -90,6 +96,16 @@ impl Finalizable for Event { } } +impl GetEventCountTags for Event { + fn get_tags(&self) -> EventCountTags { + match self { + Event::Log(log) => log.get_tags(), + Event::Metric(metric) => metric.get_tags(), + Event::Trace(trace) => trace.get_tags(), + } + } +} + impl Event { /// Return self as a `LogEvent` /// @@ -284,18 +300,18 @@ impl Event { /// Returns a reference to the event metadata source. #[must_use] - pub fn source_id(&self) -> Option<&OutputId> { + pub fn source_id(&self) -> Option<&Arc> { self.metadata().source_id() } /// Sets the `source_id` in the event metadata to the provided value. - pub fn set_source_id(&mut self, source_id: Arc) { + pub fn set_source_id(&mut self, source_id: Arc) { self.metadata_mut().set_source_id(source_id); } /// Sets the `source_id` in the event metadata to the provided value. #[must_use] - pub fn with_source_id(mut self, source_id: Arc) -> Self { + pub fn with_source_id(mut self, source_id: Arc) -> Self { self.metadata_mut().set_source_id(source_id); self } diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index bd10a9e3aaca5..3885b50b9f13d 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -3,7 +3,11 @@ use std::{collections::BTreeMap, fmt::Debug}; use lookup::lookup_v2::TargetPath; use serde::{Deserialize, Serialize}; use vector_buffers::EventCount; -use vector_common::{json_size::JsonSize, EventDataEq}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{EventCountTags, GetEventCountTags}, + EventDataEq, +}; use super::{ BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata, @@ -143,3 +147,9 @@ impl AsMut for TraceEvent { &mut self.0 } } + +impl GetEventCountTags for TraceEvent { + fn get_tags(&self) -> EventCountTags { + self.0.get_tags() + } +} diff --git a/lib/vector-core/src/stream/driver.rs b/lib/vector-core/src/stream/driver.rs index 093a7e0c4fad0..6ff23014c96d5 100644 --- a/lib/vector-core/src/stream/driver.rs +++ b/lib/vector-core/src/stream/driver.rs @@ -5,10 +5,10 @@ use tokio::{pin, select}; use tower::Service; use tracing::Instrument; use vector_common::internal_event::{ - register, ByteSize, BytesSent, CallError, CountByteSize, EventsSent, InternalEventHandle as _, - Output, PollReadyError, Registered, SharedString, + register, ByteSize, BytesSent, CallError, InternalEventHandle as _, PollReadyError, Registered, + RegisteredEventCache, SharedString, TaggedEventsSent, }; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive}; use super::FuturesUnorderedCount; use crate::{ @@ -18,7 +18,7 @@ use crate::{ pub trait DriverResponse { fn event_status(&self) -> EventStatus; - fn events_sent(&self) -> CountByteSize; + fn events_sent(&self) -> &GroupedCountByteSize; /// Return the number of bytes that were sent in the request that returned this response. // TODO, remove the default implementation once all sinks have @@ -99,7 +99,7 @@ where pin!(batched_input); let bytes_sent = protocol.map(|protocol| register(BytesSent { protocol })); - let events_sent = register(EventsSent::from(Output(None))); + let events_sent = RegisteredEventCache::default(); loop { // Core behavior of the loop: @@ -167,8 +167,7 @@ where let finalizers = req.take_finalizers(); let bytes_sent = bytes_sent.clone(); let events_sent = events_sent.clone(); - - let metadata = req.get_metadata(); + let event_count = req.get_metadata().event_count(); let fut = svc.call(req) .err_into() @@ -176,7 +175,7 @@ where result, request_id, finalizers, - &metadata, + event_count, &bytes_sent, &events_sent, )) @@ -202,13 +201,13 @@ where result: Result, request_id: usize, finalizers: EventFinalizers, - metadata: &RequestMetadata, + event_count: usize, bytes_sent: &Option>, - events_sent: &Registered, + events_sent: &RegisteredEventCache, ) { match result { Err(error) => { - Self::emit_call_error(Some(error), request_id, metadata.event_count()); + Self::emit_call_error(Some(error), request_id, event_count); finalizers.update_status(EventStatus::Rejected); } Ok(response) => { @@ -220,10 +219,12 @@ where bytes_sent.emit(ByteSize(byte_size)); } } - events_sent.emit(response.events_sent()); + + response.events_sent().emit_event(events_sent); + // This condition occurs specifically when the `HttpBatchService::call()` is called *within* the `Service::call()` } else if response.event_status() == EventStatus::Rejected { - Self::emit_call_error(None, request_id, metadata.event_count()); + Self::emit_call_error(None, request_id, event_count); finalizers.update_status(EventStatus::Rejected); } } @@ -264,7 +265,7 @@ mod tests { use vector_common::{ finalization::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, Finalizable}, json_size::JsonSize, - request_metadata::RequestMetadata, + request_metadata::{GroupedCountByteSize, RequestMetadata}, }; use vector_common::{internal_event::CountByteSize, request_metadata::MetaDescriptive}; @@ -298,20 +299,34 @@ mod tests { } impl MetaDescriptive for DelayRequest { - fn get_metadata(&self) -> RequestMetadata { - self.2 + fn get_metadata(&self) -> &RequestMetadata { + &self.2 + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.2 } } - struct DelayResponse; + struct DelayResponse { + events_sent: GroupedCountByteSize, + } + + impl DelayResponse { + fn new() -> Self { + Self { + events_sent: CountByteSize(1, JsonSize::new(1)).into(), + } + } + } impl DriverResponse for DelayResponse { fn event_status(&self) -> EventStatus { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(1, JsonSize::new(1)) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_sent } } @@ -396,7 +411,7 @@ mod tests { drop(permit); drop(req); - Ok(DelayResponse) + Ok(DelayResponse::new()) }) } } diff --git a/src/app.rs b/src/app.rs index c1ab0ae12e0d1..12658a01115d3 100644 --- a/src/app.rs +++ b/src/app.rs @@ -472,6 +472,8 @@ pub async fn load_configs( #[cfg(not(feature = "enterprise-tests"))] config::init_log_schema(config.global.log_schema.clone(), true); + config::init_telemetry(config.global.telemetry.clone(), true); + if !config.healthchecks.enabled { info!("Health checks are disabled."); } diff --git a/src/config/mod.rs b/src/config/mod.rs index 61c1a219d9eed..b4591199ef886 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -63,7 +63,7 @@ pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult}; pub use validation::warnings; pub use vars::{interpolate, ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX}; pub use vector_core::config::{ - init_log_schema, log_schema, proxy::ProxyConfig, LogSchema, OutputId, + init_log_schema, init_telemetry, log_schema, proxy::ProxyConfig, telemetry, LogSchema, OutputId, }; #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] diff --git a/src/sinks/amqp/request_builder.rs b/src/sinks/amqp/request_builder.rs index ace1af1f66fe4..13aaeab81cfcd 100644 --- a/src/sinks/amqp/request_builder.rs +++ b/src/sinks/amqp/request_builder.rs @@ -13,7 +13,6 @@ pub(super) struct AmqpMetadata { routing_key: String, properties: BasicProperties, finalizers: EventFinalizers, - event_json_size: JsonSize, } /// Build the request to send to `AMQP` by using the encoder to convert it into @@ -43,14 +42,13 @@ impl RequestBuilder for AmqpRequestBuilder { &self, mut input: AmqpEvent, ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { - let builder = RequestMetadataBuilder::from_events(&input); + let builder = RequestMetadataBuilder::from_event(&input.event); let metadata = AmqpMetadata { exchange: input.exchange, routing_key: input.routing_key, properties: input.properties, finalizers: input.event.take_finalizers(), - event_json_size: input.event.estimated_json_encoded_size_of(), }; (metadata, builder, input.event) @@ -70,7 +68,6 @@ impl RequestBuilder for AmqpRequestBuilder { amqp_metadata.properties, amqp_metadata.finalizers, metadata, - amqp_metadata.event_json_size, ) } } diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 20b16b99e6e39..42ccf467e5692 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -22,7 +22,6 @@ pub(super) struct AmqpRequest { properties: BasicProperties, finalizers: EventFinalizers, metadata: RequestMetadata, - event_json_size: JsonSize, } impl AmqpRequest { @@ -33,7 +32,6 @@ impl AmqpRequest { properties: BasicProperties, finalizers: EventFinalizers, metadata: RequestMetadata, - event_json_size: JsonSize, ) -> Self { Self { body, @@ -42,7 +40,6 @@ impl AmqpRequest { properties, finalizers, metadata, - event_json_size, } } } @@ -54,15 +51,19 @@ impl Finalizable for AmqpRequest { } impl MetaDescriptive for AmqpRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } /// A successful response from `AMQP`. pub(super) struct AmqpResponse { byte_size: usize, - json_size: JsonSize, + json_size: GroupedCountByteSize, } impl DriverResponse for AmqpResponse { @@ -70,8 +71,8 @@ impl DriverResponse for AmqpResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(1, self.json_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.json_size } fn bytes_sent(&self) -> Option { @@ -129,7 +130,7 @@ impl Service for AmqpService { Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => { warn!("Received Negative Acknowledgement from AMQP server."); Ok(AmqpResponse { - json_size: req.event_json_size, + json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), byte_size, }) } @@ -139,7 +140,7 @@ impl Service for AmqpService { Err(AmqpError::AmqpAcknowledgementFailed { error }) } Ok(_) => Ok(AmqpResponse { - json_size: req.event_json_size, + json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), byte_size, }), }, diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index f1da0b8d944f0..287b002b935f2 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -26,25 +26,6 @@ pub(super) struct AmqpEvent { pub(super) properties: BasicProperties, } -impl EventCount for AmqpEvent { - fn event_count(&self) -> usize { - // An AmqpEvent represents one event. - 1 - } -} - -impl ByteSizeOf for AmqpEvent { - fn allocated_bytes(&self) -> usize { - self.event.size_of() - } -} - -impl EstimatedJsonEncodedSizeOf for AmqpEvent { - fn estimated_json_encoded_size_of(&self) -> JsonSize { - self.event.estimated_json_encoded_size_of() - } -} - pub(super) struct AmqpSink { pub(super) channel: Arc, exchange: Template, diff --git a/src/sinks/aws_cloudwatch_logs/request_builder.rs b/src/sinks/aws_cloudwatch_logs/request_builder.rs index 0d2b63fa3322d..edbf4a752233c 100644 --- a/src/sinks/aws_cloudwatch_logs/request_builder.rs +++ b/src/sinks/aws_cloudwatch_logs/request_builder.rs @@ -39,8 +39,12 @@ impl Finalizable for CloudwatchRequest { } impl MetaDescriptive for CloudwatchRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -87,7 +91,7 @@ impl CloudwatchRequestBuilder { self.transformer.transform(&mut event); let mut message_bytes = BytesMut::new(); - let builder = RequestMetadataBuilder::from_events(&event); + let builder = RequestMetadataBuilder::from_event(&event); if self.encoder.encode(event, &mut message_bytes).is_err() { // The encoder handles internal event emission for Error and EventsDropped. diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index 93ed0b52252a2..9b02493536017 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -22,20 +22,18 @@ use tower::{ timeout::Timeout, Service, ServiceBuilder, ServiceExt, }; -use vector_common::{json_size::JsonSize, request_metadata::MetaDescriptive}; -use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; - -use crate::{ - event::EventStatus, - sinks::{ - aws_cloudwatch_logs::{ - config::CloudwatchLogsSinkConfig, request, retry::CloudwatchRetryLogic, - sink::BatchCloudwatchRequest, CloudwatchKey, - }, - util::{ - retries::FixedRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings, - }, +use vector_common::{ + finalization::EventStatus, + request_metadata::{GroupedCountByteSize, MetaDescriptive}, +}; +use vector_core::stream::DriverResponse; + +use crate::sinks::{ + aws_cloudwatch_logs::{ + config::CloudwatchLogsSinkConfig, request, retry::CloudwatchRetryLogic, + sink::BatchCloudwatchRequest, CloudwatchKey, }, + util::{retries::FixedRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings}, }; type Svc = Buffer< @@ -98,8 +96,7 @@ impl From> for CloudwatchError { #[derive(Debug)] pub struct CloudwatchResponse { - events_count: usize, - events_byte_size: JsonSize, + events_byte_size: GroupedCountByteSize, } impl crate::sinks::util::sink::Response for CloudwatchResponse { @@ -117,8 +114,8 @@ impl DriverResponse for CloudwatchResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.events_count, self.events_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size } } @@ -156,9 +153,9 @@ impl Service for CloudwatchLogsPartitionSvc { Poll::Ready(Ok(())) } - fn call(&mut self, req: BatchCloudwatchRequest) -> Self::Future { - let events_count = req.get_metadata().event_count(); - let events_byte_size = req.get_metadata().events_estimated_json_encoded_byte_size(); + fn call(&mut self, mut req: BatchCloudwatchRequest) -> Self::Future { + let metadata = std::mem::take(req.metadata_mut()); + let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); let key = req.key; let events = req @@ -200,10 +197,7 @@ impl Service for CloudwatchLogsPartitionSvc { }; svc.oneshot(events) - .map_ok(move |_x| CloudwatchResponse { - events_count, - events_byte_size, - }) + .map_ok(move |_x| CloudwatchResponse { events_byte_size }) .map_err(Into::into) .boxed() } diff --git a/src/sinks/aws_cloudwatch_logs/sink.rs b/src/sinks/aws_cloudwatch_logs/sink.rs index a1546e8135687..3c320ad6236e7 100644 --- a/src/sinks/aws_cloudwatch_logs/sink.rs +++ b/src/sinks/aws_cloudwatch_logs/sink.rs @@ -51,8 +51,9 @@ where }) .batched_partitioned(CloudwatchPartitioner, batcher_settings) .map(|(key, events)| { - let metadata = - RequestMetadata::from_batch(events.iter().map(|req| req.get_metadata())); + let metadata = RequestMetadata::from_batch( + events.iter().map(|req| req.get_metadata().clone()), + ); BatchCloudwatchRequest { key, @@ -80,8 +81,12 @@ impl Finalizable for BatchCloudwatchRequest { } impl MetaDescriptive for BatchCloudwatchRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 52a656240282c..114d487558558 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -66,9 +66,8 @@ impl SendRecord for KinesisFirehoseClient { .instrument(info_span!("request").or_current()) .await .map(|output: PutRecordBatchOutput| KinesisResponse { - count: rec_count, failure_count: output.failed_put_count().unwrap_or(0) as usize, - events_byte_size: JsonSize::new(total_size), + events_byte_size: CountByteSize(rec_count, JsonSize::new(total_size)).into(), }) } } diff --git a/src/sinks/aws_kinesis/request_builder.rs b/src/sinks/aws_kinesis/request_builder.rs index 1491d59b08ee7..0483dd01e318b 100644 --- a/src/sinks/aws_kinesis/request_builder.rs +++ b/src/sinks/aws_kinesis/request_builder.rs @@ -53,8 +53,12 @@ impl MetaDescriptive for KinesisRequest where R: Record, { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -102,7 +106,7 @@ where partition_key: processed_event.metadata.partition_key, }; let event = Event::from(processed_event.event); - let builder = RequestMetadataBuilder::from_events(&event); + let builder = RequestMetadataBuilder::from_event(&event); (kinesis_metadata, builder, event) } diff --git a/src/sinks/aws_kinesis/service.rs b/src/sinks/aws_kinesis/service.rs index 4ebc53f0d746a..ae3723638c6bc 100644 --- a/src/sinks/aws_kinesis/service.rs +++ b/src/sinks/aws_kinesis/service.rs @@ -5,7 +5,6 @@ use std::{ use aws_smithy_client::SdkError; use aws_types::region::Region; -use vector_core::internal_event::CountByteSize; use super::{ record::{Record, SendRecord}, @@ -37,9 +36,8 @@ where } pub struct KinesisResponse { - pub(crate) count: usize, pub(crate) failure_count: usize, - pub(crate) events_byte_size: JsonSize, + pub(crate) events_byte_size: GroupedCountByteSize, } impl DriverResponse for KinesisResponse { @@ -47,8 +45,8 @@ impl DriverResponse for KinesisResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.count, self.events_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size } } @@ -69,10 +67,9 @@ where } // Emission of internal events for errors and dropped events is handled upstream by the caller. - fn call(&mut self, requests: BatchKinesisRequest) -> Self::Future { - let events_byte_size = requests - .get_metadata() - .events_estimated_json_encoded_byte_size(); + fn call(&mut self, mut requests: BatchKinesisRequest) -> Self::Future { + let metadata = std::mem::take(requests.metadata_mut()); + let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); let records = requests .events diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index bc3d53947c338..0341c0e8244d6 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -69,8 +69,9 @@ where self.batch_settings, ) .map(|(key, events)| { - let metadata = - RequestMetadata::from_batch(events.iter().map(|req| req.get_metadata())); + let metadata = RequestMetadata::from_batch( + events.iter().map(|req| req.get_metadata().clone()), + ); BatchKinesisRequest { key, events, @@ -159,7 +160,7 @@ where partition_key: self.key.partition_key.clone(), }, events: self.events.to_vec(), - metadata: self.metadata, + metadata: self.metadata.clone(), } } } @@ -177,8 +178,12 @@ impl MetaDescriptive for BatchKinesisRequest where R: Record + Clone, { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index 339d6997af63a..c7eebe4f6e0a7 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -82,9 +82,8 @@ impl SendRecord for KinesisStreamClient { .instrument(info_span!("request").or_current()) .await .map(|output: PutRecordsOutput| KinesisResponse { - count: rec_count, failure_count: output.failed_record_count().unwrap_or(0) as usize, - events_byte_size: JsonSize::new(total_size), + events_byte_size: CountByteSize(rec_count, JsonSize::new(total_size)).into(), }) } } diff --git a/src/sinks/aws_sqs/request_builder.rs b/src/sinks/aws_sqs/request_builder.rs index 03d30f34f3737..34f54b5c1c0a2 100644 --- a/src/sinks/aws_sqs/request_builder.rs +++ b/src/sinks/aws_sqs/request_builder.rs @@ -93,7 +93,7 @@ impl RequestBuilder for SqsRequestBuilder { None => None, }; - let builder = RequestMetadataBuilder::from_events(&event); + let builder = RequestMetadataBuilder::from_event(&event); let sqs_metadata = SqsMetadata { finalizers: event.take_finalizers(), @@ -154,7 +154,11 @@ impl Finalizable for SendMessageEntry { } impl MetaDescriptive for SendMessageEntry { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } diff --git a/src/sinks/aws_sqs/service.rs b/src/sinks/aws_sqs/service.rs index 38b20fddfe21f..e08f68c2753ba 100644 --- a/src/sinks/aws_sqs/service.rs +++ b/src/sinks/aws_sqs/service.rs @@ -4,10 +4,8 @@ use aws_sdk_sqs::{error::SendMessageError, types::SdkError, Client as SqsClient} use futures::{future::BoxFuture, TryFutureExt}; use tower::Service; use tracing::Instrument; -use vector_common::json_size::JsonSize; -use vector_core::{ - event::EventStatus, internal_event::CountByteSize, stream::DriverResponse, ByteSizeOf, -}; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::{event::EventStatus, stream::DriverResponse, ByteSizeOf}; use super::request_builder::SendMessageEntry; @@ -47,7 +45,10 @@ impl Service for SqsService { .send() .map_ok(|_| SendMessageResponse { byte_size, - json_byte_size: entry.metadata.events_estimated_json_encoded_byte_size(), + json_byte_size: entry + .metadata + .events_estimated_json_encoded_byte_size() + .clone(), }) .instrument(info_span!("request").or_current()) .await @@ -57,7 +58,7 @@ impl Service for SqsService { pub(crate) struct SendMessageResponse { byte_size: usize, - json_byte_size: JsonSize, + json_byte_size: GroupedCountByteSize, } impl DriverResponse for SendMessageResponse { @@ -65,8 +66,8 @@ impl DriverResponse for SendMessageResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(1, self.json_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.json_byte_size } fn bytes_sent(&self) -> Option { diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 56ed8d210f694..bfb61d616ce63 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -10,9 +10,9 @@ use http::StatusCode; use snafu::Snafu; use vector_common::{ json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, + request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; -use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; +use vector_core::stream::DriverResponse; use crate::{ event::{EventFinalizers, EventStatus, Finalizable}, @@ -35,8 +35,12 @@ impl Finalizable for AzureBlobRequest { } impl MetaDescriptive for AzureBlobRequest { - fn get_metadata(&self) -> RequestMetadata { - self.request_metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.request_metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.request_metadata } } @@ -64,8 +68,7 @@ impl RetryLogic for AzureBlobRetryLogic { #[derive(Debug)] pub struct AzureBlobResponse { pub inner: PutBlockBlobResponse, - pub count: usize, - pub events_byte_size: JsonSize, + pub events_byte_size: GroupedCountByteSize, pub byte_size: usize, } @@ -74,8 +77,8 @@ impl DriverResponse for AzureBlobResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.count, self.events_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size } fn bytes_sent(&self) -> Option { diff --git a/src/sinks/azure_common/service.rs b/src/sinks/azure_common/service.rs index 9cbd07dfbf620..122bd66525b18 100644 --- a/src/sinks/azure_common/service.rs +++ b/src/sinks/azure_common/service.rs @@ -57,8 +57,9 @@ impl Service for AzureBlobService { result.map(|inner| AzureBlobResponse { inner, - count: request.metadata.count, - events_byte_size: request.metadata.byte_size, + events_byte_size: request + .request_metadata + .into_events_estimated_json_encoded_byte_size(), byte_size, }) }) diff --git a/src/sinks/databend/service.rs b/src/sinks/databend/service.rs index 473a2d3220ba3..05a8b4629a767 100644 --- a/src/sinks/databend/service.rs +++ b/src/sinks/databend/service.rs @@ -9,8 +9,7 @@ use rand_distr::Alphanumeric; use snafu::Snafu; use tower::Service; use vector_common::finalization::{EventFinalizers, EventStatus, Finalizable}; -use vector_common::internal_event::CountByteSize; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_core::stream::DriverResponse; use crate::{internal_events::EndpointBytesSent, sinks::util::retries::RetryLogic}; @@ -67,8 +66,12 @@ impl Finalizable for DatabendRequest { } impl MetaDescriptive for DatabendRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -82,11 +85,8 @@ impl DriverResponse for DatabendResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize( - self.metadata.event_count(), - self.metadata.events_estimated_json_encoded_byte_size(), - ) + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() } fn bytes_sent(&self) -> Option { @@ -205,7 +205,7 @@ impl Service for DatabendService { let service = self.clone(); let future = async move { - let metadata = request.get_metadata(); + let metadata = request.get_metadata().clone(); let stage_location = service.new_stage_location(); let protocol = service.client.get_protocol(); let endpoint = service.client.get_host(); diff --git a/src/sinks/datadog/events/request_builder.rs b/src/sinks/datadog/events/request_builder.rs index 664f99beca436..93e4eeeb17c31 100644 --- a/src/sinks/datadog/events/request_builder.rs +++ b/src/sinks/datadog/events/request_builder.rs @@ -42,8 +42,12 @@ impl ElementCount for DatadogEventsRequest { } impl MetaDescriptive for DatadogEventsRequest { - fn get_metadata(&self) -> RequestMetadata { - self.request_metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.request_metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.request_metadata } } @@ -86,7 +90,7 @@ impl RequestBuilder for DatadogEventsRequestBuilder { } fn split_input(&self, event: Event) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { - let builder = RequestMetadataBuilder::from_events(&event); + let builder = RequestMetadataBuilder::from_event(&event); let mut log = event.into_log(); let metadata = Metadata { diff --git a/src/sinks/datadog/events/service.rs b/src/sinks/datadog/events/service.rs index 693929d62e961..374bd3268b802 100644 --- a/src/sinks/datadog/events/service.rs +++ b/src/sinks/datadog/events/service.rs @@ -8,8 +8,8 @@ use futures::{ use http::Request; use hyper::Body; use tower::{Service, ServiceExt}; -use vector_common::{json_size::JsonSize, request_metadata::MetaDescriptive}; -use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive}; +use vector_core::stream::DriverResponse; use crate::{ event::EventStatus, @@ -23,7 +23,7 @@ use crate::{ pub struct DatadogEventsResponse { pub(self) event_status: EventStatus, pub http_status: http::StatusCode, - pub event_byte_size: JsonSize, + pub event_byte_size: GroupedCountByteSize, } impl DriverResponse for DatadogEventsResponse { @@ -31,8 +31,8 @@ impl DriverResponse for DatadogEventsResponse { self.event_status } - fn events_sent(&self) -> CountByteSize { - CountByteSize(1, self.event_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.event_byte_size } fn bytes_sent(&self) -> Option { @@ -85,12 +85,13 @@ impl Service for DatadogEventsService { } // Emission of Error internal event is handled upstream by the caller - fn call(&mut self, req: DatadogEventsRequest) -> Self::Future { + fn call(&mut self, mut req: DatadogEventsRequest) -> Self::Future { let mut http_service = self.batch_http_service.clone(); Box::pin(async move { + let metadata = std::mem::take(req.metadata_mut()); http_service.ready().await?; - let event_byte_size = req.get_metadata().events_estimated_json_encoded_byte_size(); + let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); let http_response = http_service.call(req).await?; let event_status = if http_response.is_successful() { EventStatus::Delivered diff --git a/src/sinks/datadog/logs/service.rs b/src/sinks/datadog/logs/service.rs index 06bc923ad3e36..47effa754df5e 100644 --- a/src/sinks/datadog/logs/service.rs +++ b/src/sinks/datadog/logs/service.rs @@ -14,13 +14,9 @@ use hyper::Body; use indexmap::IndexMap; use tower::Service; use tracing::Instrument; -use vector_common::{ - json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, stream::DriverResponse, }; @@ -59,16 +55,19 @@ impl Finalizable for LogApiRequest { } impl MetaDescriptive for LogApiRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } #[derive(Debug)] pub struct LogApiResponse { event_status: EventStatus, - count: usize, - events_byte_size: JsonSize, + events_byte_size: GroupedCountByteSize, raw_byte_size: usize, } @@ -77,8 +76,8 @@ impl DriverResponse for LogApiResponse { self.event_status } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.count, self.events_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size } fn bytes_sent(&self) -> Option { @@ -125,7 +124,7 @@ impl Service for LogApiService { } // Emission of Error internal event is handled upstream by the caller - fn call(&mut self, request: LogApiRequest) -> Self::Future { + fn call(&mut self, mut request: LogApiRequest) -> Self::Future { let mut client = self.client.clone(); let http_request = Request::post(&self.uri) .header(CONTENT_TYPE, "application/json") @@ -139,10 +138,8 @@ impl Service for LogApiService { http_request }; - let count = request.get_metadata().event_count(); - let events_byte_size = request - .get_metadata() - .events_estimated_json_encoded_byte_size(); + let metadata = std::mem::take(request.metadata_mut()); + let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); let raw_byte_size = request.uncompressed_size; let mut http_request = http_request.header(CONTENT_LENGTH, request.body.len()); @@ -162,7 +159,6 @@ impl Service for LogApiService { DatadogApiError::from_result(client.call(http_request).in_current_span().await).map( |_| LogApiResponse { event_status: EventStatus::Delivered, - count, events_byte_size, raw_byte_size, }, diff --git a/src/sinks/datadog/logs/tests.rs b/src/sinks/datadog/logs/tests.rs index efe366120100e..c8ef154280e4f 100644 --- a/src/sinks/datadog/logs/tests.rs +++ b/src/sinks/datadog/logs/tests.rs @@ -11,7 +11,10 @@ use futures::{ use http::request::Parts; use hyper::StatusCode; use indoc::indoc; -use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; +use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus, Event, LogEvent}, +}; use crate::{ config::SinkConfig, @@ -22,8 +25,8 @@ use crate::{ }, test_util::{ components::{ - run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS, - SINK_TAGS, + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + run_and_assert_sink_error, COMPONENT_ERROR_TAGS, DATA_VOLUME_SINK_TAGS, SINK_TAGS, }, next_addr, random_lines_with_stream, }, @@ -71,6 +74,13 @@ fn event_with_api_key(msg: &str, key: &str) -> Event { e } +#[derive(PartialEq)] +enum TestType { + Happy, + Telemetry, + Error, +} + /// Starts a test sink with random lines running into it /// /// This function starts a Datadog Logs sink with a simplistic configuration and @@ -83,8 +93,20 @@ fn event_with_api_key(msg: &str, key: &str) -> Event { async fn start_test_detail( api_status: ApiStatus, batch_status: BatchStatus, - is_error: bool, + test_type: TestType, ) -> (Vec, Receiver<(http::request::Parts, Bytes)>) { + if test_type == TestType::Telemetry { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + } + let config = indoc! {r#" default_api_key = "atoken" compression = "none" @@ -105,10 +127,12 @@ async fn start_test_detail( let (batch, receiver) = BatchNotifier::new_with_receiver(); let (expected, events) = random_lines_with_stream(100, 10, Some(batch)); - if is_error { - run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await; - } else { - run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; + match test_type { + TestType::Happy => run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await, + TestType::Error => run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await, + TestType::Telemetry => { + run_and_assert_data_volume_sink_compliance(sink, events, &DATA_VOLUME_SINK_TAGS).await + } } assert_eq!(receiver.await, batch_status); @@ -120,14 +144,21 @@ async fn start_test_success( api_status: ApiStatus, batch_status: BatchStatus, ) -> (Vec, Receiver<(http::request::Parts, Bytes)>) { - start_test_detail(api_status, batch_status, false).await + start_test_detail(api_status, batch_status, TestType::Happy).await +} + +async fn start_test_telemetry( + api_status: ApiStatus, + batch_status: BatchStatus, +) -> (Vec, Receiver<(http::request::Parts, Bytes)>) { + start_test_detail(api_status, batch_status, TestType::Telemetry).await } async fn start_test_error( api_status: ApiStatus, batch_status: BatchStatus, ) -> (Vec, Receiver<(http::request::Parts, Bytes)>) { - start_test_detail(api_status, batch_status, true).await + start_test_detail(api_status, batch_status, TestType::Error).await } /// Assert the basic functionality of the sink in good conditions @@ -174,6 +205,13 @@ async fn smoke() { } } +/// Assert the sink emits source and service tags when run with telemetry configured. +#[tokio::test] +async fn telemetry() { + let (expected, rx) = start_test_telemetry(ApiStatus::OKv1, BatchStatus::Delivered).await; + let _ = rx.take(expected.len()).collect::>().await; +} + #[tokio::test] /// Assert delivery error behavior for v1 API /// diff --git a/src/sinks/datadog/metrics/service.rs b/src/sinks/datadog/metrics/service.rs index d15716b99d8ad..6abacfcc739b7 100644 --- a/src/sinks/datadog/metrics/service.rs +++ b/src/sinks/datadog/metrics/service.rs @@ -10,13 +10,9 @@ use http::{ use hyper::Body; use snafu::ResultExt; use tower::Service; -use vector_common::{ - json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, stream::DriverResponse, }; @@ -115,8 +111,12 @@ impl Finalizable for DatadogMetricsRequest { } impl MetaDescriptive for DatadogMetricsRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -125,8 +125,7 @@ impl MetaDescriptive for DatadogMetricsRequest { pub struct DatadogMetricsResponse { status_code: StatusCode, body: Bytes, - batch_size: usize, - byte_size: JsonSize, + byte_size: GroupedCountByteSize, raw_byte_size: usize, } @@ -141,8 +140,8 @@ impl DriverResponse for DatadogMetricsResponse { } } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.batch_size, self.byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.byte_size } fn bytes_sent(&self) -> Option { @@ -180,15 +179,13 @@ impl Service for DatadogMetricsService { } // Emission of Error internal event is handled upstream by the caller - fn call(&mut self, request: DatadogMetricsRequest) -> Self::Future { + fn call(&mut self, mut request: DatadogMetricsRequest) -> Self::Future { let client = self.client.clone(); let api_key = self.api_key.clone(); Box::pin(async move { - let byte_size = request - .get_metadata() - .events_estimated_json_encoded_byte_size(); - let batch_size = request.get_metadata().event_count(); + let metadata = std::mem::take(request.metadata_mut()); + let byte_size = metadata.into_events_estimated_json_encoded_byte_size(); let raw_byte_size = request.raw_bytes; let request = request @@ -208,7 +205,6 @@ impl Service for DatadogMetricsService { Ok(DatadogMetricsResponse { status_code: parts.status, body, - batch_size, byte_size, raw_byte_size, }) diff --git a/src/sinks/datadog/traces/service.rs b/src/sinks/datadog/traces/service.rs index 66e46b8075ca1..5128d855edb0f 100644 --- a/src/sinks/datadog/traces/service.rs +++ b/src/sinks/datadog/traces/service.rs @@ -9,13 +9,9 @@ use http::{Request, StatusCode, Uri}; use hyper::Body; use snafu::ResultExt; use tower::Service; -use vector_common::{ - json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, stream::DriverResponse, }; @@ -84,8 +80,12 @@ impl Finalizable for TraceApiRequest { } impl MetaDescriptive for TraceApiRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -93,8 +93,7 @@ impl MetaDescriptive for TraceApiRequest { pub struct TraceApiResponse { status_code: StatusCode, body: Bytes, - batch_size: usize, - byte_size: JsonSize, + byte_size: GroupedCountByteSize, uncompressed_size: usize, } @@ -109,8 +108,8 @@ impl DriverResponse for TraceApiResponse { } } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.batch_size, self.byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.byte_size } fn bytes_sent(&self) -> Option { @@ -145,14 +144,12 @@ impl Service for TraceApiService { } // Emission of Error internal event is handled upstream by the caller - fn call(&mut self, request: TraceApiRequest) -> Self::Future { + fn call(&mut self, mut request: TraceApiRequest) -> Self::Future { let client = self.client.clone(); Box::pin(async move { - let byte_size = request - .get_metadata() - .events_estimated_json_encoded_byte_size(); - let batch_size = request.get_metadata().event_count(); + let metadata = std::mem::take(request.metadata_mut()); + let byte_size = metadata.into_events_estimated_json_encoded_byte_size(); let uncompressed_size = request.uncompressed_size; let http_request = request.into_http_request().context(BuildRequestSnafu)?; @@ -166,7 +163,6 @@ impl Service for TraceApiService { Ok(TraceApiResponse { status_code: parts.status, body, - batch_size, byte_size, uncompressed_size, }) diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index 5d27ba891b596..f5b39a52b23a0 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -2,7 +2,10 @@ use std::{io, io::Write}; use serde::Serialize; use vector_buffers::EventCount; -use vector_common::json_size::JsonSize; +use vector_common::{ + json_size::JsonSize, + request_metadata::{EventCountTags, GetEventCountTags}, +}; use vector_core::{event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use crate::{ @@ -47,6 +50,12 @@ impl EventCount for ProcessedEvent { } } +impl GetEventCountTags for ProcessedEvent { + fn get_tags(&self) -> EventCountTags { + self.log.get_tags() + } +} + #[derive(PartialEq, Eq, Default, Clone, Debug)] pub struct ElasticsearchEncoder { pub transformer: Transformer, diff --git a/src/sinks/elasticsearch/retry.rs b/src/sinks/elasticsearch/retry.rs index 4f5a6e0c73ed6..bf40d1751f051 100644 --- a/src/sinks/elasticsearch/retry.rs +++ b/src/sinks/elasticsearch/retry.rs @@ -160,7 +160,7 @@ mod tests { use bytes::Bytes; use http::Response; use similar_asserts::assert_eq; - use vector_common::json_size::JsonSize; + use vector_common::{internal_event::CountByteSize, json_size::JsonSize}; use super::*; use crate::event::EventStatus; @@ -180,7 +180,7 @@ mod tests { http_response: response, event_status: EventStatus::Rejected, batch_size: 1, - events_byte_size: JsonSize::new(1), + events_byte_size: CountByteSize(1, JsonSize::new(1)).into(), }), RetryAction::DontRetry(_) )); @@ -201,7 +201,7 @@ mod tests { http_response: response, event_status: EventStatus::Errored, batch_size: 1, - events_byte_size: JsonSize::new(1), + events_byte_size: CountByteSize(1, JsonSize::new(1)).into(), }), RetryAction::Retry(_) )); diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index bdf0824915de6..38ba9a41b3af3 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -13,9 +13,9 @@ use hyper::{service::Service, Body, Request}; use tower::ServiceExt; use vector_common::{ json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, + request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; -use vector_core::{internal_event::CountByteSize, stream::DriverResponse, ByteSizeOf}; +use vector_core::{stream::DriverResponse, ByteSizeOf}; use crate::sinks::elasticsearch::sign_request; use crate::{ @@ -57,8 +57,12 @@ impl Finalizable for ElasticsearchRequest { } impl MetaDescriptive for ElasticsearchRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -149,7 +153,7 @@ pub struct ElasticsearchResponse { pub http_response: Response, pub event_status: EventStatus, pub batch_size: usize, - pub events_byte_size: JsonSize, + pub events_byte_size: GroupedCountByteSize, } impl DriverResponse for ElasticsearchResponse { @@ -157,8 +161,8 @@ impl DriverResponse for ElasticsearchResponse { self.event_status } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.batch_size, self.events_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size } } @@ -173,12 +177,13 @@ impl Service for ElasticsearchService { } // Emission of internal events for errors and dropped events is handled upstream by the caller. - fn call(&mut self, req: ElasticsearchRequest) -> Self::Future { + fn call(&mut self, mut req: ElasticsearchRequest) -> Self::Future { let mut http_service = self.batch_service.clone(); Box::pin(async move { http_service.ready().await?; let batch_size = req.batch_size; - let events_byte_size = req.events_byte_size; + let events_byte_size = + std::mem::take(req.metadata_mut()).into_events_estimated_json_encoded_byte_size(); let http_response = http_service.call(req).await?; let event_status = get_event_status(&http_response); diff --git a/src/sinks/gcp/chronicle_unstructured.rs b/src/sinks/gcp/chronicle_unstructured.rs index 3f7b3d4494d25..ddf70f31e4e25 100644 --- a/src/sinks/gcp/chronicle_unstructured.rs +++ b/src/sinks/gcp/chronicle_unstructured.rs @@ -286,8 +286,12 @@ impl Finalizable for ChronicleRequest { } impl MetaDescriptive for ChronicleRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -474,7 +478,7 @@ impl Service for ChronicleService { HeaderValue::from_str(&request.body.len().to_string()).unwrap(), ); - let metadata = request.get_metadata(); + let metadata = request.get_metadata().clone(); let mut http_request = builder.body(Body::from(request.body)).unwrap(); self.creds.apply(&mut http_request); diff --git a/src/sinks/gcs_common/service.rs b/src/sinks/gcs_common/service.rs index 502335e8430b2..9a75203629dfb 100644 --- a/src/sinks/gcs_common/service.rs +++ b/src/sinks/gcs_common/service.rs @@ -8,8 +8,8 @@ use http::{ }; use hyper::Body; use tower::Service; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; -use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; +use vector_core::stream::DriverResponse; use crate::{ event::{EventFinalizers, EventStatus, Finalizable}, @@ -50,8 +50,12 @@ impl Finalizable for GcsRequest { } impl MetaDescriptive for GcsRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -84,11 +88,8 @@ impl DriverResponse for GcsResponse { } } - fn events_sent(&self) -> CountByteSize { - CountByteSize( - self.metadata.event_count(), - self.metadata.events_estimated_json_encoded_byte_size(), - ) + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() } fn bytes_sent(&self) -> Option { diff --git a/src/sinks/kafka/request_builder.rs b/src/sinks/kafka/request_builder.rs index 794de6174396e..9d1edd0d97c43 100644 --- a/src/sinks/kafka/request_builder.rs +++ b/src/sinks/kafka/request_builder.rs @@ -37,7 +37,7 @@ impl KafkaRequestBuilder { }) .ok()?; - let metadata_builder = RequestMetadataBuilder::from_events(&event); + let metadata_builder = RequestMetadataBuilder::from_event(&event); let metadata = KafkaRequestMetadata { finalizers: event.take_finalizers(), diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index f271a7a580e53..299a9de4ee056 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -8,7 +8,7 @@ use rdkafka::{ util::Timeout, }; use vector_core::internal_event::{ - ByteSize, BytesSent, CountByteSize, InternalEventHandle as _, Protocol, Registered, + ByteSize, BytesSent, InternalEventHandle as _, Protocol, Registered, }; use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*}; @@ -28,7 +28,7 @@ pub struct KafkaRequestMetadata { } pub struct KafkaResponse { - event_byte_size: JsonSize, + event_byte_size: GroupedCountByteSize, } impl DriverResponse for KafkaResponse { @@ -36,8 +36,8 @@ impl DriverResponse for KafkaResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(1, self.event_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.event_byte_size } } @@ -48,8 +48,12 @@ impl Finalizable for KafkaRequest { } impl MetaDescriptive for KafkaRequest { - fn get_metadata(&self) -> RequestMetadata { - self.request_metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.request_metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.request_metadata } } @@ -82,8 +86,8 @@ impl Service for KafkaService { Box::pin(async move { let event_byte_size = request - .get_metadata() - .events_estimated_json_encoded_byte_size(); + .request_metadata + .into_events_estimated_json_encoded_byte_size(); let mut record = FutureRecord::to(&request.metadata.topic).payload(request.body.as_ref()); diff --git a/src/sinks/kafka/tests.rs b/src/sinks/kafka/tests.rs index ba1e62e1eeb9e..9ed3b66ba3fc7 100644 --- a/src/sinks/kafka/tests.rs +++ b/src/sinks/kafka/tests.rs @@ -18,7 +18,10 @@ mod integration_test { message::Headers, Message, Offset, TopicPartitionList, }; - use vector_core::event::{BatchNotifier, BatchStatus}; + use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus}, + }; use crate::{ event::Value, @@ -32,7 +35,10 @@ mod integration_test { prelude::*, }, test_util::{ - components::{assert_sink_compliance, SINK_TAGS}, + components::{ + assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS, + SINK_TAGS, + }, random_lines_with_stream, random_string, wait_for, }, tls::{TlsConfig, TlsEnableableConfig, TEST_PEM_INTERMEDIATE_CA_PATH}, @@ -72,31 +78,74 @@ mod integration_test { #[tokio::test] async fn kafka_happy_path_plaintext() { crate::test_util::trace_init(); - kafka_happy_path(kafka_address(9091), None, None, KafkaCompression::None).await; + kafka_happy_path( + kafka_address(9091), + None, + None, + KafkaCompression::None, + true, + ) + .await; + kafka_happy_path( + kafka_address(9091), + None, + None, + KafkaCompression::None, + false, + ) + .await; } #[tokio::test] async fn kafka_happy_path_gzip() { crate::test_util::trace_init(); - kafka_happy_path(kafka_address(9091), None, None, KafkaCompression::Gzip).await; + kafka_happy_path( + kafka_address(9091), + None, + None, + KafkaCompression::Gzip, + false, + ) + .await; } #[tokio::test] async fn kafka_happy_path_lz4() { crate::test_util::trace_init(); - kafka_happy_path(kafka_address(9091), None, None, KafkaCompression::Lz4).await; + kafka_happy_path( + kafka_address(9091), + None, + None, + KafkaCompression::Lz4, + false, + ) + .await; } #[tokio::test] async fn kafka_happy_path_snappy() { crate::test_util::trace_init(); - kafka_happy_path(kafka_address(9091), None, None, KafkaCompression::Snappy).await; + kafka_happy_path( + kafka_address(9091), + None, + None, + KafkaCompression::Snappy, + false, + ) + .await; } #[tokio::test] async fn kafka_happy_path_zstd() { crate::test_util::trace_init(); - kafka_happy_path(kafka_address(9091), None, None, KafkaCompression::Zstd).await; + kafka_happy_path( + kafka_address(9091), + None, + None, + KafkaCompression::Zstd, + false, + ) + .await; } async fn kafka_batch_options_overrides( @@ -208,6 +257,7 @@ mod integration_test { options: TlsConfig::test_config(), }), KafkaCompression::None, + false, ) .await; } @@ -225,6 +275,7 @@ mod integration_test { }), None, KafkaCompression::None, + false, ) .await; } @@ -234,7 +285,22 @@ mod integration_test { sasl: Option, tls: Option, compression: KafkaCompression, + test_telemetry_tags: bool, ) { + if test_telemetry_tags { + // We need to configure Vector to emit the service and source tags. + // The default is to not emit these. + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + } + let topic = format!("test-{}", random_string(10)); let headers_key = "headers_key".to_string(); let kafka_auth = KafkaAuthConfig { sasl, tls }; @@ -273,13 +339,24 @@ mod integration_test { }); events }); - assert_sink_compliance(&SINK_TAGS, async move { - let sink = KafkaSink::new(config).unwrap(); - let sink = VectorSink::from_event_streamsink(sink); - sink.run(input_events).await - }) - .await - .expect("Running sink failed"); + + if test_telemetry_tags { + assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async move { + let sink = KafkaSink::new(config).unwrap(); + let sink = VectorSink::from_event_streamsink(sink); + sink.run(input_events).await + }) + .await + .expect("Running sink failed"); + } else { + assert_sink_compliance(&SINK_TAGS, async move { + let sink = KafkaSink::new(config).unwrap(); + let sink = VectorSink::from_event_streamsink(sink); + sink.run(input_events).await + }) + .await + .expect("Running sink failed"); + } assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); // read back everything from the beginning diff --git a/src/sinks/loki/event.rs b/src/sinks/loki/event.rs index 6b85153c0655b..22d399f970710 100644 --- a/src/sinks/loki/event.rs +++ b/src/sinks/loki/event.rs @@ -1,11 +1,8 @@ use std::{collections::HashMap, io}; -use crate::sinks::prelude::*; +use crate::sinks::{prelude::*, util::encoding::Encoder}; use bytes::Bytes; use serde::{ser::SerializeSeq, Serialize}; -use vector_buffers::EventCount; - -use crate::sinks::util::encoding::{write_all, Encoder}; pub type Labels = Vec<(String, String)>; @@ -155,6 +152,7 @@ pub struct LokiRecord { pub event: LokiEvent, pub json_byte_size: JsonSize, pub finalizers: EventFinalizers, + pub event_count_tags: EventCountTags, } impl ByteSizeOf for LokiRecord { @@ -186,6 +184,12 @@ impl Finalizable for LokiRecord { } } +impl GetEventCountTags for LokiRecord { + fn get_tags(&self) -> EventCountTags { + self.event_count_tags.clone() + } +} + #[derive(Hash, Eq, PartialEq, Clone, Debug)] pub struct PartitionKey { pub tenant_id: Option, diff --git a/src/sinks/loki/service.rs b/src/sinks/loki/service.rs index 1ac3c871631cb..edcc762042fba 100644 --- a/src/sinks/loki/service.rs +++ b/src/sinks/loki/service.rs @@ -4,7 +4,6 @@ use bytes::Bytes; use http::StatusCode; use snafu::Snafu; use tracing::Instrument; -use vector_core::internal_event::CountByteSize; use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression}; use crate::{ @@ -50,11 +49,8 @@ impl DriverResponse for LokiResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize( - self.metadata.event_count(), - self.metadata.events_estimated_json_encoded_byte_size(), - ) + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() } fn bytes_sent(&self) -> Option { @@ -78,8 +74,12 @@ impl Finalizable for LokiRequest { } impl MetaDescriptive for LokiRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -120,7 +120,7 @@ impl Service for LokiService { }; let mut req = http::Request::post(&self.endpoint.uri).header("Content-Type", content_type); - let metadata = request.get_metadata(); + let metadata = request.get_metadata().clone(); if let Some(tenant_id) = request.tenant_id { req = req.header("X-Scope-OrgID", tenant_id); diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 1ba3cbee6268a..74e133887b6b4 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -262,6 +262,8 @@ impl EventEncoder { event.as_mut_log().remove_timestamp(); } + let event_count_tags = event.get_tags(); + self.transformer.transform(&mut event); let mut bytes = BytesMut::new(); self.encoder.encode(event, &mut bytes).ok(); @@ -285,6 +287,7 @@ impl EventEncoder { partition, finalizers, json_byte_size, + event_count_tags, }) } } diff --git a/src/sinks/new_relic/service.rs b/src/sinks/new_relic/service.rs index 67a2f27ad28f9..290276b72b5cf 100644 --- a/src/sinks/new_relic/service.rs +++ b/src/sinks/new_relic/service.rs @@ -13,10 +13,9 @@ use http::{ use hyper::Body; use tower::Service; use tracing::Instrument; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, stream::DriverResponse, }; @@ -39,8 +38,12 @@ impl Finalizable for NewRelicApiRequest { } impl MetaDescriptive for NewRelicApiRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -55,11 +58,8 @@ impl DriverResponse for NewRelicApiResponse { self.event_status } - fn events_sent(&self) -> CountByteSize { - CountByteSize( - self.metadata.event_count(), - self.metadata.events_estimated_json_encoded_byte_size(), - ) + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() } fn bytes_sent(&self) -> Option { @@ -97,7 +97,7 @@ impl Service for NewRelicApiService { }; let payload_len = request.payload.len(); - let metadata = request.get_metadata(); + let metadata = request.get_metadata().clone(); let http_request = http_request .header(CONTENT_LENGTH, payload_len) .body(Body::from(request.payload)) diff --git a/src/sinks/opendal_common.rs b/src/sinks/opendal_common.rs index ba961607be438..f8e5877b8e1ed 100644 --- a/src/sinks/opendal_common.rs +++ b/src/sinks/opendal_common.rs @@ -20,10 +20,9 @@ use tracing::Instrument; use vector_common::{ finalization::{EventStatus, Finalizable}, json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, + request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; use vector_core::{ - internal_event::CountByteSize, sink::StreamSink, stream::{BatcherSettings, DriverResponse}, EstimatedJsonEncodedSizeOf, @@ -153,8 +152,12 @@ pub struct OpenDalRequest { } impl MetaDescriptive for OpenDalRequest { - fn get_metadata(&self) -> RequestMetadata { - self.request_metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.request_metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.request_metadata } } @@ -237,8 +240,7 @@ impl RequestBuilder<(String, Vec)> for OpenDalRequestBuilder { /// OpenDalResponse is the response returned by OpenDAL services. #[derive(Debug)] pub struct OpenDalResponse { - pub count: usize, - pub events_byte_size: JsonSize, + pub events_byte_size: GroupedCountByteSize, pub byte_size: usize, } @@ -247,8 +249,8 @@ impl DriverResponse for OpenDalResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.count, self.events_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size } fn bytes_sent(&self) -> Option { @@ -277,8 +279,9 @@ impl Service for OpenDalService { .in_current_span() .await; result.map(|_| OpenDalResponse { - count: request.metadata.count, - events_byte_size: request.metadata.byte_size, + events_byte_size: request + .request_metadata + .into_events_estimated_json_encoded_byte_size(), byte_size, }) }) diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs index 15f5d99376a0f..ffef449c78df1 100644 --- a/src/sinks/prelude.rs +++ b/src/sinks/prelude.rs @@ -30,7 +30,9 @@ pub use vector_common::{ finalization::{EventFinalizers, EventStatus, Finalizable}, internal_event::CountByteSize, json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, + request_metadata::{ + EventCountTags, GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata, + }, }; pub use vector_config::configurable_component; pub use vector_core::{ diff --git a/src/sinks/pulsar/request_builder.rs b/src/sinks/pulsar/request_builder.rs index b284ffef1ab26..de62f179dcb30 100644 --- a/src/sinks/pulsar/request_builder.rs +++ b/src/sinks/pulsar/request_builder.rs @@ -41,7 +41,7 @@ impl RequestBuilder for PulsarRequestBuilder { &self, mut input: PulsarEvent, ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { - let builder = RequestMetadataBuilder::from_events(&input); + let builder = RequestMetadataBuilder::from_event(&input.event); let metadata = PulsarMetadata { finalizers: input.event.take_finalizers(), key: input.key, diff --git a/src/sinks/pulsar/service.rs b/src/sinks/pulsar/service.rs index b04d2eb0d13e5..8afabb3260207 100644 --- a/src/sinks/pulsar/service.rs +++ b/src/sinks/pulsar/service.rs @@ -6,7 +6,6 @@ use bytes::Bytes; use pulsar::producer::Message; use pulsar::{Error as PulsarError, Executor, MultiTopicProducer, ProducerOptions, Pulsar}; use tokio::sync::Mutex; -use vector_common::internal_event::CountByteSize; use crate::internal_events::PulsarSendingError; use crate::sinks::{prelude::*, pulsar::request_builder::PulsarMetadata}; @@ -20,7 +19,7 @@ pub(super) struct PulsarRequest { pub struct PulsarResponse { byte_size: usize, - event_byte_size: JsonSize, + event_byte_size: GroupedCountByteSize, } impl DriverResponse for PulsarResponse { @@ -28,8 +27,8 @@ impl DriverResponse for PulsarResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(1, self.event_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.event_byte_size } fn bytes_sent(&self) -> Option { @@ -44,8 +43,12 @@ impl Finalizable for PulsarRequest { } impl MetaDescriptive for PulsarRequest { - fn get_metadata(&self) -> RequestMetadata { - self.request_metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.request_metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.request_metadata } } @@ -134,7 +137,7 @@ impl Service for PulsarService { byte_size, event_byte_size: request .request_metadata - .events_estimated_json_encoded_byte_size(), + .into_events_estimated_json_encoded_byte_size(), }), Err(e) => { emit!(PulsarSendingError { diff --git a/src/sinks/s3_common/service.rs b/src/sinks/s3_common/service.rs index c9c12ac4bcb69..5ad3bea516d5c 100644 --- a/src/sinks/s3_common/service.rs +++ b/src/sinks/s3_common/service.rs @@ -11,13 +11,9 @@ use futures::future::BoxFuture; use md5::Digest; use tower::Service; use tracing::Instrument; -use vector_common::{ - json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, stream::DriverResponse, }; @@ -41,8 +37,12 @@ impl Finalizable for S3Request { } impl MetaDescriptive for S3Request { - fn get_metadata(&self) -> RequestMetadata { - self.request_metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.request_metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.request_metadata } } @@ -55,8 +55,7 @@ pub struct S3Metadata { #[derive(Debug)] pub struct S3Response { - count: usize, - events_byte_size: JsonSize, + events_byte_size: GroupedCountByteSize, } impl DriverResponse for S3Response { @@ -64,8 +63,8 @@ impl DriverResponse for S3Response { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.count, self.events_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size } } @@ -102,11 +101,6 @@ impl Service for S3Service { // Emission of internal events for errors and dropped events is handled upstream by the caller. fn call(&mut self, request: S3Request) -> Self::Future { - let count = request.get_metadata().event_count(); - let events_byte_size = request - .get_metadata() - .events_estimated_json_encoded_byte_size(); - let options = request.options; let content_encoding = request.content_encoding; @@ -127,6 +121,10 @@ impl Service for S3Service { tagging.finish() }); + let events_byte_size = request + .request_metadata + .into_events_estimated_json_encoded_byte_size(); + let client = self.client.clone(); Box::pin(async move { @@ -150,10 +148,7 @@ impl Service for S3Service { let result = request.send().in_current_span().await; - result.map(|_| S3Response { - count, - events_byte_size, - }) + result.map(|_| S3Response { events_byte_size }) }) } } diff --git a/src/sinks/splunk_hec/common/request.rs b/src/sinks/splunk_hec/common/request.rs index ba0ab442076ac..f1fc2366b6b30 100644 --- a/src/sinks/splunk_hec/common/request.rs +++ b/src/sinks/splunk_hec/common/request.rs @@ -40,7 +40,11 @@ impl Finalizable for HecRequest { } impl MetaDescriptive for HecRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } diff --git a/src/sinks/splunk_hec/common/response.rs b/src/sinks/splunk_hec/common/response.rs index 65eaea0f12bcf..16a7b74abc1ab 100644 --- a/src/sinks/splunk_hec/common/response.rs +++ b/src/sinks/splunk_hec/common/response.rs @@ -1,11 +1,10 @@ -use vector_common::json_size::JsonSize; -use vector_core::internal_event::CountByteSize; +use vector_common::request_metadata::GroupedCountByteSize; use vector_core::{event::EventStatus, stream::DriverResponse}; pub struct HecResponse { pub event_status: EventStatus, pub events_count: usize, - pub events_byte_size: JsonSize, + pub events_byte_size: GroupedCountByteSize, } impl AsRef for HecResponse { @@ -19,7 +18,7 @@ impl DriverResponse for HecResponse { self.event_status } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.events_count, self.events_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size } } diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index 9492f11137dbe..a8abc57e77b09 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -109,12 +109,13 @@ where } } - fn call(&mut self, req: HecRequest) -> Self::Future { + fn call(&mut self, mut req: HecRequest) -> Self::Future { let ack_finalizer_tx = self.ack_finalizer_tx.clone(); let ack_slot = self.current_ack_slot.take(); - let events_count = req.get_metadata().event_count(); - let events_byte_size = req.get_metadata().events_estimated_json_encoded_byte_size(); + let metadata = std::mem::take(req.metadata_mut()); + let events_count = metadata.event_count(); + let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); let response = self.inner.call(req); Box::pin(async move { diff --git a/src/sinks/statsd/service.rs b/src/sinks/statsd/service.rs index 5686dc22de1ea..5ab5e6092e7ed 100644 --- a/src/sinks/statsd/service.rs +++ b/src/sinks/statsd/service.rs @@ -4,8 +4,7 @@ use futures_util::future::BoxFuture; use tower::Service; use vector_common::{ finalization::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, - request_metadata::{MetaDescriptive, RequestMetadata}, + request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; use vector_core::stream::DriverResponse; @@ -24,8 +23,12 @@ impl Finalizable for StatsdRequest { } impl MetaDescriptive for StatsdRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -46,11 +49,8 @@ impl DriverResponse for StatsdResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize( - self.metadata.event_count(), - self.metadata.events_estimated_json_encoded_byte_size(), - ) + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() } fn bytes_sent(&self) -> Option { diff --git a/src/sinks/util/metadata.rs b/src/sinks/util/metadata.rs index d89b51140e5f6..521f1c080995c 100644 --- a/src/sinks/util/metadata.rs +++ b/src/sinks/util/metadata.rs @@ -1,9 +1,13 @@ use std::num::NonZeroUsize; use vector_buffers::EventCount; -use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; +use vector_core::{config, ByteSizeOf, EstimatedJsonEncodedSizeOf}; -use vector_common::{json_size::JsonSize, request_metadata::RequestMetadata}; +use vector_common::{ + internal_event::CountByteSize, + json_size::JsonSize, + request_metadata::{GetEventCountTags, GroupedCountByteSize, RequestMetadata}, +}; use super::request_builder::EncodeResult; @@ -11,22 +15,47 @@ use super::request_builder::EncodeResult; pub struct RequestMetadataBuilder { event_count: usize, events_byte_size: usize, - events_estimated_json_encoded_byte_size: JsonSize, + events_estimated_json_encoded_byte_size: GroupedCountByteSize, } impl RequestMetadataBuilder { - pub fn from_events(events: E) -> Self + pub fn from_events(events: &[E]) -> Self where - E: ByteSizeOf + EventCount + EstimatedJsonEncodedSizeOf, + E: ByteSizeOf + EventCount + GetEventCountTags + EstimatedJsonEncodedSizeOf, { + let mut size = config::telemetry().create_request_count_byte_size(); + + let mut event_count = 0; + let mut events_byte_size = 0; + + for event in events { + event_count += 1; + events_byte_size += event.size_of(); + size.add_event(event, event.estimated_json_encoded_size_of()); + } + + Self { + event_count, + events_byte_size, + events_estimated_json_encoded_byte_size: size, + } + } + + pub fn from_event(event: &E) -> Self + where + E: ByteSizeOf + GetEventCountTags + EstimatedJsonEncodedSizeOf, + { + let mut size = config::telemetry().create_request_count_byte_size(); + size.add_event(event, event.estimated_json_encoded_size_of()); + Self { - event_count: events.event_count(), - events_byte_size: events.size_of(), - events_estimated_json_encoded_byte_size: events.estimated_json_encoded_size_of(), + event_count: 1, + events_byte_size: event.size_of(), + events_estimated_json_encoded_byte_size: size, } } - pub const fn new( + pub fn new( event_count: usize, events_byte_size: usize, events_estimated_json_encoded_byte_size: JsonSize, @@ -34,17 +63,23 @@ impl RequestMetadataBuilder { Self { event_count, events_byte_size, - events_estimated_json_encoded_byte_size, + events_estimated_json_encoded_byte_size: CountByteSize( + event_count, + events_estimated_json_encoded_byte_size, + ) + .into(), } } pub fn track_event(&mut self, event: E) where - E: ByteSizeOf + EstimatedJsonEncodedSizeOf, + E: ByteSizeOf + GetEventCountTags + EstimatedJsonEncodedSizeOf, { self.event_count += 1; self.events_byte_size += event.size_of(); - self.events_estimated_json_encoded_byte_size += event.estimated_json_encoded_size_of(); + let json_size = event.estimated_json_encoded_size_of(); + self.events_estimated_json_encoded_byte_size + .add_event(&event, json_size); } pub fn with_request_size(&self, size: NonZeroUsize) -> RequestMetadata { @@ -55,7 +90,7 @@ impl RequestMetadataBuilder { self.events_byte_size, size, size, - self.events_estimated_json_encoded_byte_size, + self.events_estimated_json_encoded_byte_size.clone(), ) } @@ -67,7 +102,7 @@ impl RequestMetadataBuilder { result .compressed_byte_size .unwrap_or(result.uncompressed_byte_size), - self.events_estimated_json_encoded_byte_size, + self.events_estimated_json_encoded_byte_size.clone(), ) } } diff --git a/src/sinks/util/processed_event.rs b/src/sinks/util/processed_event.rs index dd13df8bd3f21..fe4a50a8eb42d 100644 --- a/src/sinks/util/processed_event.rs +++ b/src/sinks/util/processed_event.rs @@ -1,5 +1,8 @@ use serde::Serialize; -use vector_common::json_size::JsonSize; +use vector_common::{ + json_size::JsonSize, + request_metadata::{EventCountTags, GetEventCountTags}, +}; use vector_core::{ event::{EventFinalizers, Finalizable, LogEvent, MaybeAsLogMut}, ByteSizeOf, EstimatedJsonEncodedSizeOf, @@ -49,3 +52,12 @@ where self.event.estimated_json_encoded_size_of() } } + +impl GetEventCountTags for ProcessedEvent +where + E: GetEventCountTags, +{ + fn get_tags(&self) -> EventCountTags { + self.event.get_tags() + } +} diff --git a/src/sinks/vector/service.rs b/src/sinks/vector/service.rs index a93a196a58dc9..5277a408634ee 100644 --- a/src/sinks/vector/service.rs +++ b/src/sinks/vector/service.rs @@ -8,11 +8,8 @@ use hyper_proxy::ProxyConnector; use prost::Message; use tonic::{body::BoxBody, IntoRequest}; use tower::Service; -use vector_common::{ - json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; -use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; +use vector_core::stream::DriverResponse; use super::VectorSinkError; use crate::{ @@ -31,8 +28,7 @@ pub struct VectorService { } pub struct VectorResponse { - events_count: usize, - events_byte_size: JsonSize, + events_byte_size: GroupedCountByteSize, } impl DriverResponse for VectorResponse { @@ -40,8 +36,8 @@ impl DriverResponse for VectorResponse { EventStatus::Delivered } - fn events_sent(&self) -> CountByteSize { - CountByteSize(self.events_count, self.events_byte_size) + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size } } @@ -59,8 +55,12 @@ impl Finalizable for VectorRequest { } impl MetaDescriptive for VectorRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -103,13 +103,11 @@ impl Service for VectorService { } // Emission of internal events for errors and dropped events is handled upstream by the caller. - fn call(&mut self, list: VectorRequest) -> Self::Future { + fn call(&mut self, mut list: VectorRequest) -> Self::Future { let mut service = self.clone(); let byte_size = list.request.encoded_len(); - let events_count = list.get_metadata().event_count(); - let events_byte_size = list - .get_metadata() - .events_estimated_json_encoded_byte_size(); + let metadata = std::mem::take(list.metadata_mut()); + let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); let future = async move { service @@ -121,10 +119,7 @@ impl Service for VectorService { protocol: &service.protocol, endpoint: &service.endpoint, }); - VectorResponse { - events_count, - events_byte_size, - } + VectorResponse { events_byte_size } }) .map_err(|source| VectorSinkError::Request { source }.into()) .await diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 17878ec54c28b..bd004a778881b 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -21,10 +21,7 @@ use k8s_paths_provider::K8sPathsProvider; use kube::{ api::Api, config::{self, KubeConfigOptions}, - runtime::{ - reflector::{self}, - watcher, WatchStreamExt, - }, + runtime::{reflector, watcher, WatchStreamExt}, Client, Config as ClientConfig, }; use lifecycle::Lifecycle; diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index a6f5ad5b049f3..a6fdaa494d60e 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -280,11 +280,7 @@ mod tests { use crate::{ config::{SinkConfig as _, SinkContext}, sinks::vector::VectorConfig as SinkConfig, - test_util::{ - self, - components::{assert_source_compliance, SOURCE_TAGS}, - }, - SourceSender, + test_util, SourceSender, }; async fn run_test(vector_source_config_str: &str, addr: SocketAddr) { @@ -323,25 +319,19 @@ mod tests { async fn receive_message() { let addr = test_util::next_addr(); - assert_source_compliance(&SOURCE_TAGS, async { - let config = format!(r#"address = "{}""#, addr); - run_test(&config, addr).await; - }) - .await; + let config = format!(r#"address = "{}""#, addr); + run_test(&config, addr).await; } #[tokio::test] async fn receive_compressed_message() { let addr = test_util::next_addr(); - assert_source_compliance(&SOURCE_TAGS, async { - let config = format!( - r#"address = "{}" + let config = format!( + r#"address = "{}" compression=true"#, - addr - ); - run_test(&config, addr).await; - }) - .await; + addr + ); + run_test(&config, addr).await; } } diff --git a/src/test_util/components.rs b/src/test_util/components.rs index 8d999246628dc..e819bee0352bf 100644 --- a/src/test_util/components.rs +++ b/src/test_util/components.rs @@ -58,6 +58,8 @@ pub const FILE_SOURCE_TAGS: [&str; 1] = ["file"]; /// The most basic set of tags for sinks, regardless of whether or not they push data or have it pulled out. pub const SINK_TAGS: [&str; 1] = ["protocol"]; +pub const DATA_VOLUME_SINK_TAGS: [&str; 2] = ["source", "service"]; + /// The standard set of tags for all sinks that write a file. pub const FILE_SINK_TAGS: [&str; 2] = ["file", "protocol"]; @@ -120,6 +122,17 @@ pub static SINK_TESTS: Lazy = Lazy::new(|| { } }); +pub static DATA_VOLUME_SINK_TESTS: Lazy = Lazy::new(|| { + ComponentTests { + events: &["BytesSent", "EventsSent"], // EventsReceived is emitted in the topology + tagged_counters: &[ + "component_sent_events_total", + "component_sent_event_bytes_total", + ], + untagged_counters: &[], + } +}); + /// The component test specification for sinks which simply expose data, or do not otherwise "send" it anywhere. pub static NONSENDING_SINK_TESTS: Lazy = Lazy::new(|| ComponentTests { events: &["EventsSent"], @@ -432,6 +445,32 @@ where .await; } +/// Convenience wrapper for running sink tests +pub async fn assert_data_volume_sink_compliance(tags: &[&str], f: impl Future) -> T { + init_test(); + + let result = f.await; + + DATA_VOLUME_SINK_TESTS.assert(tags); + + result +} + +pub async fn run_and_assert_data_volume_sink_compliance( + sink: VectorSink, + events: S, + tags: &[&str], +) where + S: Stream + Send, + I: Into, +{ + assert_data_volume_sink_compliance(tags, async move { + let events = events.map(Into::into); + sink.run(events).await.expect("Running sink failed") + }) + .await; +} + pub async fn assert_nonsending_sink_compliance(tags: &[&str], f: impl Future) -> T { init_test(); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index ba0d2dda8a761..b6385704b2b70 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -245,10 +245,7 @@ impl<'a> Builder<'a> { let mut rx = builder.add_source_output(output.clone()); let (mut fanout, control) = Fanout::new(); - let source = Arc::new(OutputId { - component: key.clone(), - port: output.port.clone(), - }); + let source = Arc::new(key.clone()); let pump = async move { debug!("Source pump starting."); diff --git a/src/topology/test/compliance.rs b/src/topology/test/compliance.rs index c7c9d3c349818..a716d29593998 100644 --- a/src/topology/test/compliance.rs +++ b/src/topology/test/compliance.rs @@ -1,10 +1,8 @@ use std::sync::Arc; use tokio::sync::oneshot::{channel, Receiver}; -use vector_core::{ - config::OutputId, - event::{Event, EventArray, EventContainer, LogEvent}, -}; +use vector_common::config::ComponentKey; +use vector_core::event::{Event, EventArray, EventContainer, LogEvent}; use crate::{ config::{unit_test::UnitTestSourceConfig, ConfigBuilder}, @@ -58,7 +56,7 @@ async fn test_function_transform_single_event() { let mut events = events.into_events().collect::>(); assert_eq!(events.len(), 1); - original_event.set_source_id(Arc::new(OutputId::from("in"))); + original_event.set_source_id(Arc::new(ComponentKey::from("in"))); let event = events.remove(0); assert_eq!(original_event, event); @@ -79,7 +77,7 @@ async fn test_sync_transform_single_event() { let mut events = events.into_events().collect::>(); assert_eq!(events.len(), 1); - original_event.set_source_id(Arc::new(OutputId::from("in"))); + original_event.set_source_id(Arc::new(ComponentKey::from("in"))); let event = events.remove(0); assert_eq!(original_event, event); @@ -99,7 +97,7 @@ async fn test_task_transform_single_event() { let mut events = events.into_events().collect::>(); assert_eq!(events.len(), 1); - original_event.set_source_id(Arc::new(OutputId::from("in"))); + original_event.set_source_id(Arc::new(ComponentKey::from("in"))); let event = events.remove(0); assert_eq!(original_event, event); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index babce13f20d90..aa5720382e96c 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -26,7 +26,7 @@ use tokio::{ time::{sleep, Duration}, }; use vector_buffers::{BufferConfig, BufferType, WhenFull}; -use vector_core::config::OutputId; +use vector_common::config::ComponentKey; mod backpressure; mod compliance; @@ -148,7 +148,7 @@ async fn topology_source_and_sink() { let res = out1.flat_map(into_event_stream).collect::>().await; - event.set_source_id(Arc::new(OutputId::from("in1"))); + event.set_source_id(Arc::new(ComponentKey::from("in1"))); assert_eq!(vec![event], res); } @@ -181,8 +181,8 @@ async fn topology_multiple_sources() { topology.stop().await; - event1.set_source_id(Arc::new(OutputId::from("in1"))); - event2.set_source_id(Arc::new(OutputId::from("in2"))); + event1.set_source_id(Arc::new(ComponentKey::from("in1"))); + event2.set_source_id(Arc::new(ComponentKey::from("in2"))); assert_eq!(out_event1, Some(event1.into())); assert_eq!(out_event2, Some(event2.into())); @@ -217,7 +217,7 @@ async fn topology_multiple_sinks() { let res2 = out2.flat_map(into_event_stream).collect::>().await; // We should see that both sinks got the exact same event: - event.set_source_id(Arc::new(OutputId::from("in1"))); + event.set_source_id(Arc::new(ComponentKey::from("in1"))); let expected = vec![event]; assert_eq!(expected, res1); assert_eq!(expected, res2); @@ -291,7 +291,7 @@ async fn topology_remove_one_source() { drop(in2); topology.stop().await; - event1.set_source_id(Arc::new(OutputId::from("in1"))); + event1.set_source_id(Arc::new(ComponentKey::from("in1"))); let res = h_out1.await.unwrap(); assert_eq!(vec![event1], res); @@ -330,7 +330,7 @@ async fn topology_remove_one_sink() { let res1 = out1.flat_map(into_event_stream).collect::>().await; let res2 = out2.flat_map(into_event_stream).collect::>().await; - event.set_source_id(Arc::new(OutputId::from("in1"))); + event.set_source_id(Arc::new(ComponentKey::from("in1"))); assert_eq!(vec![event], res1); assert_eq!(Vec::::new(), res2); @@ -441,7 +441,7 @@ async fn topology_swap_source() { // as we've removed it from the topology prior to the sends. assert_eq!(Vec::::new(), res1); - event2.set_source_id(Arc::new(OutputId::from("in2"))); + event2.set_source_id(Arc::new(ComponentKey::from("in2"))); assert_eq!(vec![event2], res2); } @@ -553,7 +553,7 @@ async fn topology_swap_sink() { // the new sink, which _was_ rebuilt: assert_eq!(Vec::::new(), res1); - event1.set_source_id(Arc::new(OutputId::from("in1"))); + event1.set_source_id(Arc::new(ComponentKey::from("in1"))); assert_eq!(vec![event1], res2); } @@ -660,8 +660,8 @@ async fn topology_rebuild_connected() { let res = h_out1.await.unwrap(); - event1.set_source_id(Arc::new(OutputId::from("in1"))); - event2.set_source_id(Arc::new(OutputId::from("in1"))); + event1.set_source_id(Arc::new(ComponentKey::from("in1"))); + event2.set_source_id(Arc::new(ComponentKey::from("in1"))); assert_eq!(vec![event1, event2], res); } @@ -714,7 +714,7 @@ async fn topology_rebuild_connected_transform() { let res2 = h_out2.await.unwrap(); assert_eq!(Vec::::new(), res1); - event.set_source_id(Arc::new(OutputId::from("in1"))); + event.set_source_id(Arc::new(ComponentKey::from("in1"))); assert_eq!(vec![event], res2); } @@ -897,11 +897,11 @@ async fn source_metadata_reaches_sink() { topology.stop().await; assert_eq!( - out_event1.into_log().metadata().source_id().unwrap(), - &OutputId::from("in1") + **out_event1.into_log().metadata().source_id().unwrap(), + ComponentKey::from("in1") ); assert_eq!( - out_event2.into_log().metadata().source_id().unwrap(), - &OutputId::from("in2") + **out_event2.into_log().metadata().source_id().unwrap(), + ComponentKey::from("in2") ); } diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index de97b69eafe41..a591305764df1 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -155,6 +155,7 @@ mod tests { use futures::stream; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; + use vector_common::config::ComponentKey; use super::*; use crate::{ @@ -173,7 +174,8 @@ mod tests { kind: metric::MetricKind, value: metric::MetricValue, ) -> Event { - Event::Metric(Metric::new(name, kind, value)).with_source_id(Arc::new(OutputId::from("in"))) + Event::Metric(Metric::new(name, kind, value)) + .with_source_id(Arc::new(ComponentKey::from("in"))) } #[test] diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index 8375e125763f1..4a6497628d78a 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -288,7 +288,7 @@ mod tests { use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; - use vector_core::config::OutputId; + use vector_common::config::ComponentKey; use crate::{ event::{Event, LogEvent, Value}, @@ -362,7 +362,7 @@ mod tests { tx.send(event1.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event1); // Second event differs in matched field so should be output even though it @@ -370,7 +370,7 @@ mod tests { tx.send(event2.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event2); // Third event has the same value for "matched" as first event, so it should be dropped. @@ -412,7 +412,7 @@ mod tests { tx.send(event1.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event1); // Second event has a different matched field name with the same value, @@ -420,7 +420,7 @@ mod tests { tx.send(event2.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event2); drop(tx); @@ -465,7 +465,7 @@ mod tests { tx.send(event1.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event1); // Second event is the same just with different field order, so it @@ -510,7 +510,7 @@ mod tests { tx.send(event1.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event1); // Second event gets output because it's not a dupe. This causes the first @@ -518,7 +518,7 @@ mod tests { tx.send(event2.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event2); // Third event is a dupe but gets output anyway because the first @@ -526,7 +526,7 @@ mod tests { tx.send(event1.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event1); drop(tx); @@ -567,7 +567,7 @@ mod tests { tx.send(event1.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -575,7 +575,7 @@ mod tests { tx.send(event2.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event2); drop(tx); @@ -620,7 +620,7 @@ mod tests { tx.send(event1.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -628,7 +628,7 @@ mod tests { tx.send(event2.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event2); drop(tx); @@ -666,7 +666,7 @@ mod tests { tx.send(event1.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event1.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event1); // Second event should also get passed through as null is different than @@ -674,7 +674,7 @@ mod tests { tx.send(event2.clone()).await.unwrap(); let new_event = out.recv().await.unwrap(); - event2.set_source_id(Arc::new(OutputId::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event, event2); drop(tx); diff --git a/src/transforms/filter.rs b/src/transforms/filter.rs index 85bee0a071ba1..95e8877bee255 100644 --- a/src/transforms/filter.rs +++ b/src/transforms/filter.rs @@ -100,6 +100,7 @@ mod test { use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; + use vector_common::config::ComponentKey; use vector_core::event::{Metric, MetricKind, MetricValue}; use super::*; @@ -127,7 +128,7 @@ mod test { let mut log = Event::from(LogEvent::from("message")); tx.send(log.clone()).await.unwrap(); - log.set_source_id(Arc::new(OutputId::from("in"))); + log.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(out.recv().await.unwrap(), log); let metric = Event::from(Metric::new( diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index aaf7a68e42225..ad44b0a9e6d55 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -409,6 +409,7 @@ mod tests { use std::time::Duration; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; + use vector_common::config::ComponentKey; use vector_core::metric_tags; use super::*; @@ -508,7 +509,7 @@ mod tests { let event = create_event("status", "42"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let metric = do_transform(config, event).await.unwrap(); assert_eq!( @@ -540,7 +541,7 @@ mod tests { event.as_mut_log().insert("method", "post"); event.as_mut_log().insert("code", "200"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let metric = do_transform(config, event).await.unwrap(); @@ -629,7 +630,7 @@ mod tests { let event = create_event("backtrace", "message"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let metric = do_transform(config, event).await.unwrap(); assert_eq!( @@ -673,7 +674,7 @@ mod tests { let event = create_event("amount", "33.99"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let metric = do_transform(config, event).await.unwrap(); assert_eq!( @@ -703,7 +704,7 @@ mod tests { let event = create_event("amount", "33.99"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let metric = do_transform(config, event).await.unwrap(); assert_eq!( @@ -731,7 +732,7 @@ mod tests { let event = create_event("memory_rss", "123"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let metric = do_transform(config, event).await.unwrap(); assert_eq!( @@ -815,7 +816,7 @@ mod tests { event.as_mut_log().insert("status", "42"); event.as_mut_log().insert("backtrace", "message"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let output = do_transform_multiple_events(config, event, 2).await; assert_eq!(2, output.len()); @@ -869,7 +870,7 @@ mod tests { event.as_mut_log().insert("worker", "abc"); event.as_mut_log().insert("service", "xyz"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let output = do_transform_multiple_events(config, event, 2).await; @@ -912,7 +913,7 @@ mod tests { let event = create_event("user_ip", "1.2.3.4"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let metric = do_transform(config, event).await.unwrap(); assert_eq!( @@ -941,7 +942,7 @@ mod tests { let event = create_event("response_time", "2.5"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let metric = do_transform(config, event).await.unwrap(); assert_eq!( @@ -971,7 +972,7 @@ mod tests { let event = create_event("response_time", "2.5"); let mut metadata = event.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let metric = do_transform(config, event).await.unwrap(); assert_eq!( diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index b1c7c7394fca2..7e23962aa990c 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -346,6 +346,7 @@ mod tests { use similar_asserts::assert_eq; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; + use vector_common::config::ComponentKey; use vector_core::metric_tags; use super::*; @@ -410,7 +411,7 @@ mod tests { .with_tags(Some(tags())) .with_timestamp(Some(ts())); let mut metadata = counter.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let log = do_transform(counter).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -438,7 +439,7 @@ mod tests { ) .with_timestamp(Some(ts())); let mut metadata = gauge.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let log = do_transform(gauge).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -466,7 +467,7 @@ mod tests { ) .with_timestamp(Some(ts())); let mut metadata = set.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let log = do_transform(set).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -496,7 +497,7 @@ mod tests { ) .with_timestamp(Some(ts())); let mut metadata = distro.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let log = do_transform(distro).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -545,7 +546,7 @@ mod tests { ) .with_timestamp(Some(ts())); let mut metadata = histo.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let log = do_transform(histo).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -592,7 +593,7 @@ mod tests { ) .with_timestamp(Some(ts())); let mut metadata = summary.metadata().clone(); - metadata.set_source_id(Arc::new(OutputId::from("in"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); let log = do_transform(summary).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index 845883f4ae045..8488658e8ea55 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use vector_core::config::OutputId; +use vector_common::config::ComponentKey; use vector_core::metric_tags; use super::*; @@ -85,8 +85,8 @@ async fn drop_event(config: TagCardinalityLimitConfig) { let new_event3 = out.recv().await; - event1.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); @@ -131,9 +131,9 @@ async fn drop_tag(config: TagCardinalityLimitConfig) { drop(tx); topology.stop().await; - event1.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_source_id(Arc::new(OutputId::from("in"))); - event3.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event3.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); @@ -203,9 +203,9 @@ async fn drop_tag_multi_value(config: TagCardinalityLimitConfig) { let new_event2 = out.recv().await; let new_event3 = out.recv().await; - event1.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_source_id(Arc::new(OutputId::from("in"))); - event3.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event3.set_source_id(Arc::new(ComponentKey::from("in"))); drop(tx); topology.stop().await; @@ -253,9 +253,9 @@ async fn separate_value_limit_per_tag(config: TagCardinalityLimitConfig) { drop(tx); topology.stop().await; - event1.set_source_id(Arc::new(OutputId::from("in"))); - event2.set_source_id(Arc::new(OutputId::from("in"))); - event3.set_source_id(Arc::new(OutputId::from("in"))); + event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event3.set_source_id(Arc::new(ComponentKey::from("in"))); assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2));