diff --git a/lib/vector-common/src/internal_event/cached_event.rs b/lib/vector-common/src/internal_event/cached_event.rs index daa3f8ea803af..54f6b5ed5c64f 100644 --- a/lib/vector-common/src/internal_event/cached_event.rs +++ b/lib/vector-common/src/internal_event/cached_event.rs @@ -1,5 +1,6 @@ use std::{ - collections::BTreeMap, + collections::HashMap, + hash::Hash, sync::{Arc, RwLock}, }; @@ -22,7 +23,7 @@ pub struct RegisteredEventCache { fixed_tags: T, cache: Arc< RwLock< - BTreeMap< + HashMap< ::Tags, ::Handle, >, @@ -48,7 +49,7 @@ impl RegisteredEventCache, - Tags: Ord + Clone, + Tags: Clone + Eq + Hash, FixedTags: Clone, Event: RegisterInternalEvent + RegisterTaggedInternalEvent, diff --git a/lib/vector-common/src/internal_event/events_sent.rs b/lib/vector-common/src/internal_event/events_sent.rs index 22d614b0016eb..b902b6afaff65 100644 --- a/lib/vector-common/src/internal_event/events_sent.rs +++ b/lib/vector-common/src/internal_event/events_sent.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use metrics::{register_counter, Counter}; use tracing::trace; -use crate::{config::ComponentKey, request_metadata::EventCountTags}; +use crate::config::ComponentKey; use super::{CountByteSize, OptionalTag, Output, SharedString}; @@ -91,19 +91,25 @@ crate::registered_event!( self.event_bytes.increment(byte_size.get() as u64); } - fn register(_fixed: (), tags: EventCountTags) { - super::register(TaggedEventsSent::new( - tags, - )) + fn register(_fixed: (), tags: TaggedEventsSent) { + super::register(tags) } ); impl TaggedEventsSent { #[must_use] - pub fn new(tags: EventCountTags) -> Self { + pub fn new_empty() -> Self { Self { - source: tags.source, - service: tags.service, + source: OptionalTag::Specified(None), + service: OptionalTag::Specified(None), + } + } + + #[must_use] + pub fn new_unspecified() -> Self { + Self { + source: OptionalTag::Ignored, + service: OptionalTag::Ignored, } } } diff --git a/lib/vector-common/src/internal_event/mod.rs b/lib/vector-common/src/internal_event/mod.rs index 8785fa45757a8..ee3747ed4ce38 100644 --- a/lib/vector-common/src/internal_event/mod.rs +++ b/lib/vector-common/src/internal_event/mod.rs @@ -194,7 +194,7 @@ impl From for SharedString { macro_rules! registered_event { // A registered event struct with no fields (zero-sized type). ($event:ident => $($tail:tt)*) => { - #[derive(Debug)] + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct $event; $crate::registered_event!(=> $event $($tail)*); @@ -202,7 +202,7 @@ macro_rules! registered_event { // A normal registered event struct. ($event:ident { $( $field:ident: $type:ty, )* } => $($tail:tt)*) => { - #[derive(Debug)] + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct $event { $( pub $field: $type, )* } diff --git a/lib/vector-common/src/request_metadata.rs b/lib/vector-common/src/request_metadata.rs index 881918c899f8c..5b2ffeaef0eb4 100644 --- a/lib/vector-common/src/request_metadata.rs +++ b/lib/vector-common/src/request_metadata.rs @@ -1,44 +1,18 @@ +use std::collections::HashMap; use std::ops::Add; -use std::{collections::HashMap, sync::Arc}; use crate::{ - config::ComponentKey, internal_event::{ - CountByteSize, InternalEventHandle, OptionalTag, RegisterTaggedInternalEvent, - RegisteredEventCache, + CountByteSize, InternalEventHandle, RegisterTaggedInternalEvent, RegisteredEventCache, + TaggedEventsSent, }, json_size::JsonSize, }; -/// Tags that are used to group the events within a batch for emitting telemetry. -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct EventCountTags { - pub source: OptionalTag>, - pub service: OptionalTag, -} - -impl EventCountTags { - #[must_use] - pub fn new_empty() -> Self { - Self { - source: OptionalTag::Specified(None), - service: OptionalTag::Specified(None), - } - } - - #[must_use] - pub fn new_unspecified() -> Self { - Self { - source: OptionalTag::Ignored, - service: OptionalTag::Ignored, - } - } -} - /// Must be implemented by events to get the tags that will be attached to /// the `component_sent_event_*` emitted metrics. pub trait GetEventCountTags { - fn get_tags(&self) -> EventCountTags; + fn get_tags(&self) -> TaggedEventsSent; } /// Keeps track of the estimated json size of a given batch of events by @@ -48,7 +22,7 @@ pub enum GroupedCountByteSize { /// When we need to keep track of the events by certain tags we use this /// variant. Tagged { - sizes: HashMap, + sizes: HashMap, }, /// If we don't need to track the events by certain tags we can use /// this variant to avoid allocating a `HashMap`, @@ -86,7 +60,7 @@ impl GroupedCountByteSize { /// Returns `None` if we are not tracking by tags. #[must_use] #[cfg(test)] - pub fn sizes(&self) -> Option<&HashMap> { + pub fn sizes(&self) -> Option<&HashMap> { match self { Self::Tagged { sizes } => Some(sizes), Self::Untagged { .. } => None, @@ -131,7 +105,7 @@ impl GroupedCountByteSize { /// Emits our counts to a `RegisteredEvent` cached event. pub fn emit_event(&self, event_cache: &RegisteredEventCache<(), T>) where - T: RegisterTaggedInternalEvent, + T: RegisterTaggedInternalEvent, H: InternalEventHandle, { match self { @@ -141,7 +115,7 @@ impl GroupedCountByteSize { } } GroupedCountByteSize::Untagged { size } => { - event_cache.emit(&EventCountTags::new_unspecified(), *size); + event_cache.emit(&TaggedEventsSent::new_unspecified(), *size); } } } @@ -177,10 +151,10 @@ impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize { // The following two scenarios shouldn't really occur in practice, but are provided for completeness. (Self::Tagged { mut sizes }, Self::Untagged { size }) => { - match sizes.get_mut(&EventCountTags::new_empty()) { + match sizes.get_mut(&TaggedEventsSent::new_empty()) { Some(empty_size) => *empty_size += *size, None => { - sizes.insert(EventCountTags::new_empty(), *size); + sizes.insert(TaggedEventsSent::new_empty(), *size); } } @@ -188,10 +162,10 @@ impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize { } (Self::Untagged { size }, Self::Tagged { sizes }) => { let mut sizes = sizes.clone(); - match sizes.get_mut(&EventCountTags::new_empty()) { + match sizes.get_mut(&TaggedEventsSent::new_empty()) { Some(empty_size) => *empty_size += size, None => { - sizes.insert(EventCountTags::new_empty(), size); + sizes.insert(TaggedEventsSent::new_empty(), size); } } @@ -307,6 +281,10 @@ pub trait MetaDescriptive { #[cfg(test)] mod tests { + use std::sync::Arc; + + use crate::{config::ComponentKey, internal_event::OptionalTag}; + use super::*; struct DummyEvent { @@ -315,8 +293,8 @@ mod tests { } impl GetEventCountTags for DummyEvent { - fn get_tags(&self) -> EventCountTags { - EventCountTags { + fn get_tags(&self) -> TaggedEventsSent { + TaggedEventsSent { source: self.source.clone(), service: self.service.clone(), } @@ -380,14 +358,14 @@ mod tests { assert_eq!( vec![ ( - EventCountTags { + TaggedEventsSent { source: OptionalTag::Ignored, service: Some("cabbage".to_string()).into() }, CountByteSize(2, JsonSize::new(78)) ), ( - EventCountTags { + TaggedEventsSent { source: OptionalTag::Ignored, service: Some("tomato".to_string()).into() }, diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 87da44d6040c6..d615a5ba340c9 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -15,9 +15,9 @@ use lookup::lookup_v2::TargetPath; use lookup::PathPrefix; use serde::{Deserialize, Serialize, Serializer}; use vector_common::{ - internal_event::OptionalTag, + internal_event::{OptionalTag, TaggedEventsSent}, json_size::{JsonSize, NonZeroJsonSize}, - request_metadata::{EventCountTags, GetEventCountTags}, + request_metadata::GetEventCountTags, EventDataEq, }; @@ -215,7 +215,7 @@ impl EstimatedJsonEncodedSizeOf for LogEvent { } impl GetEventCountTags for LogEvent { - fn get_tags(&self) -> EventCountTags { + fn get_tags(&self) -> TaggedEventsSent { let source = if telemetry().tags().emit_source { self.metadata().source_id().cloned().into() } else { @@ -230,7 +230,7 @@ impl GetEventCountTags for LogEvent { OptionalTag::Ignored }; - EventCountTags { source, service } + TaggedEventsSent { source, service } } } diff --git a/lib/vector-core/src/event/metric/mod.rs b/lib/vector-core/src/event/metric/mod.rs index fa62bec7ec52c..9d29e4fd054ae 100644 --- a/lib/vector-core/src/event/metric/mod.rs +++ b/lib/vector-core/src/event/metric/mod.rs @@ -12,9 +12,9 @@ use std::{ use chrono::{DateTime, Utc}; use vector_common::{ - internal_event::OptionalTag, + internal_event::{OptionalTag, TaggedEventsSent}, json_size::JsonSize, - request_metadata::{EventCountTags, GetEventCountTags}, + request_metadata::GetEventCountTags, EventDataEq, }; use vector_config::configurable_component; @@ -483,7 +483,7 @@ impl Finalizable for Metric { } impl GetEventCountTags for Metric { - fn get_tags(&self) -> EventCountTags { + fn get_tags(&self) -> TaggedEventsSent { let source = if telemetry().tags().emit_source { self.metadata().source_id().cloned().into() } else { @@ -500,7 +500,7 @@ impl GetEventCountTags for Metric { OptionalTag::Ignored }; - EventCountTags { source, service } + TaggedEventsSent { source, service } } } diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index 9547f58dc5ed3..5c385df1fd913 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -20,11 +20,8 @@ use serde::{Deserialize, Serialize}; pub use trace::TraceEvent; use vector_buffers::EventCount; use vector_common::{ - config::ComponentKey, - finalization, - json_size::JsonSize, - request_metadata::{EventCountTags, GetEventCountTags}, - EventDataEq, + config::ComponentKey, finalization, internal_event::TaggedEventsSent, json_size::JsonSize, + request_metadata::GetEventCountTags, EventDataEq, }; pub use vrl::value::Value; #[cfg(feature = "vrl")] @@ -97,7 +94,7 @@ impl Finalizable for Event { } impl GetEventCountTags for Event { - fn get_tags(&self) -> EventCountTags { + fn get_tags(&self) -> TaggedEventsSent { match self { Event::Log(log) => log.get_tags(), Event::Metric(metric) => metric.get_tags(), diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index 3885b50b9f13d..3760b7ad286af 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -4,8 +4,7 @@ use lookup::lookup_v2::TargetPath; use serde::{Deserialize, Serialize}; use vector_buffers::EventCount; use vector_common::{ - json_size::JsonSize, - request_metadata::{EventCountTags, GetEventCountTags}, + internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags, EventDataEq, }; @@ -149,7 +148,7 @@ impl AsMut for TraceEvent { } impl GetEventCountTags for TraceEvent { - fn get_tags(&self) -> EventCountTags { + fn get_tags(&self) -> TaggedEventsSent { self.0.get_tags() } } diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index 8f136bd02a195..cdee7812097e6 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -3,8 +3,9 @@ use std::{io, io::Write}; use serde::Serialize; use vector_buffers::EventCount; use vector_common::{ + internal_event::TaggedEventsSent, json_size::JsonSize, - request_metadata::{EventCountTags, GetEventCountTags, GroupedCountByteSize}, + request_metadata::{GetEventCountTags, GroupedCountByteSize}, }; use vector_core::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf}; @@ -51,7 +52,7 @@ impl EventCount for ProcessedEvent { } impl GetEventCountTags for ProcessedEvent { - fn get_tags(&self) -> EventCountTags { + fn get_tags(&self) -> TaggedEventsSent { self.log.get_tags() } } diff --git a/src/sinks/loki/event.rs b/src/sinks/loki/event.rs index 28a115243c3e8..bae1fa3d2448e 100644 --- a/src/sinks/loki/event.rs +++ b/src/sinks/loki/event.rs @@ -158,7 +158,7 @@ pub struct LokiRecord { pub event: LokiEvent, pub json_byte_size: JsonSize, pub finalizers: EventFinalizers, - pub event_count_tags: EventCountTags, + pub event_count_tags: TaggedEventsSent, } impl ByteSizeOf for LokiRecord { @@ -191,7 +191,7 @@ impl Finalizable for LokiRecord { } impl GetEventCountTags for LokiRecord { - fn get_tags(&self) -> EventCountTags { + fn get_tags(&self) -> TaggedEventsSent { self.event_count_tags.clone() } } diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs index e3b3adc4327b6..b66de220ed5ed 100644 --- a/src/sinks/prelude.rs +++ b/src/sinks/prelude.rs @@ -28,11 +28,9 @@ pub use tower::{Service, ServiceBuilder}; pub use vector_buffers::EventCount; pub use vector_common::{ finalization::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, + internal_event::{CountByteSize, TaggedEventsSent}, json_size::JsonSize, - request_metadata::{ - EventCountTags, GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata, - }, + request_metadata::{GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; pub use vector_config::configurable_component; pub use vector_core::{ diff --git a/src/sinks/util/processed_event.rs b/src/sinks/util/processed_event.rs index fe4a50a8eb42d..63b6ba8e1425b 100644 --- a/src/sinks/util/processed_event.rs +++ b/src/sinks/util/processed_event.rs @@ -1,7 +1,6 @@ use serde::Serialize; use vector_common::{ - json_size::JsonSize, - request_metadata::{EventCountTags, GetEventCountTags}, + internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags, }; use vector_core::{ event::{EventFinalizers, Finalizable, LogEvent, MaybeAsLogMut}, @@ -57,7 +56,7 @@ impl GetEventCountTags for ProcessedEvent where E: GetEventCountTags, { - fn get_tags(&self) -> EventCountTags { + fn get_tags(&self) -> TaggedEventsSent { self.event.get_tags() } }