Skip to content

Commit

Permalink
chore(observability): emit component_sent events by source and `s…
Browse files Browse the repository at this point in the history
…ervice` (#17549)

Closes #17580 
Closes #17581 

This is still in draft until I can get the following done.

- [ ] ~~There are way more clones that I am happy with here, especially
since this is in a hot path. These need reducing.~~

The remaining clones that I would like to remove are in the `get_tags`
functions. This didn't seem trivial, and given the fairly positive
regression numbers, I think it should be ok to defer for now.

- [x] Function documentation.
- [ ] Currently source schemas aren't being attached to the event at
runtime, so the service meaning can't be retrieved. That won't work
until this has been done. This will be a separate PR - #17692
- [x] I've only tested this with the kafka sink so far. I think it
should work with all Stream sinks without needing any further
modification - but further testing is needed.
- [x] Tests. A bunch of tests need writing. 
- [x] The Vector source tests are failing I think because we now have
`EventsSent` and `TaggedEventsSent` which both emit
`component_sent_event` events and the test framework doesn't like this.
This needs fixed.
- [ ] We will need to review every sink to ensure they work with this.
All the stream based sinks should, but the others are highly likely to
need some work.

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored Jun 26, 2023
1 parent 6a6b42b commit dcf7f9a
Show file tree
Hide file tree
Showing 77 changed files with 1,387 additions and 501 deletions.
4 changes: 2 additions & 2 deletions docs/tutorials/sinks/2_http_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,9 @@ impl DriverResponse for BasicResponse {
EventStatus::Delivered
}

fn events_sent(&self) -> CountByteSize {
fn events_sent(&self) -> RequestCountByteSize {
// (events count, byte size)
CountByteSize(1, self.byte_size)
CountByteSize(1, self.byte_size).into()
}
}
```
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ bytes = { version = "1.4.0", default-features = false, optional = true }
chrono-tz = { version = "0.8.2", default-features = false, features = ["serde"] }
chrono = { version = "0.4", default-features = false, optional = true, features = ["clock"] }
crossbeam-utils = { version = "0.8.16", default-features = false }
derivative = "2.1.3"
derivative = { version = "2.2.0", default-features = false }
futures = { version = "0.3.28", default-features = false, features = ["std"] }
indexmap = { version = "~1.9.3", default-features = false }
metrics = "0.21.0"
Expand Down
69 changes: 69 additions & 0 deletions lib/vector-common/src/internal_event/cached_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
};

use derivative::Derivative;

use super::{InternalEventHandle, RegisterInternalEvent};

/// Metrics (eg. `component_sent_event_bytes_total`) may need to emit tags based on
/// values contained within the events. These tags can't be determined in advance.
///
/// Metrics need to be registered and the handle needs to be held onto in order to
/// prevent them from expiring and being dropped (this would result in the counter
/// resetting to zero).
/// `CachedEvent` is used to maintain a store of these registered metrics. When a
/// new event is emitted for a previously unseen set of tags an event is registered
/// and stored in the cache.
#[derive(Derivative)]
#[derivative(Clone(bound = ""), Default(bound = ""))]
pub struct RegisteredEventCache<Event: RegisterTaggedInternalEvent> {
cache: Arc<
RwLock<
BTreeMap<
<Event as RegisterTaggedInternalEvent>::Tags,
<Event as RegisterInternalEvent>::Handle,
>,
>,
>,
}

/// This trait must be implemented by events that emit dynamic tags. `register` must
/// be implemented to register an event based on the set of tags passed.
pub trait RegisterTaggedInternalEvent: RegisterInternalEvent {
/// The type that will contain the data necessary to extract the tags
/// that will be used when registering the event.
type Tags;

fn register(tags: Self::Tags) -> <Self as RegisterInternalEvent>::Handle;
}

impl<Event, EventHandle, Data, Tags> RegisteredEventCache<Event>
where
Data: Sized,
EventHandle: InternalEventHandle<Data = Data>,
Tags: Ord + Clone,
Event: RegisterInternalEvent<Handle = EventHandle> + RegisterTaggedInternalEvent<Tags = Tags>,
{
/// Emits the event with the given tags.
/// It will register the event and store in the cache if this has not already
/// been done.
///
/// # Panics
///
/// This will panic if the lock is poisoned.
pub fn emit(&self, tags: &Tags, value: Data) {
let read = self.cache.read().unwrap();
if let Some(event) = read.get(tags) {
event.emit(value);
} else {
let event = <Event as RegisterTaggedInternalEvent>::register(tags.clone());
event.emit(value);

// Ensure the read lock is dropped so we can write.
drop(read);
self.cache.write().unwrap().insert(tags.clone(), event);
}
}
}
65 changes: 64 additions & 1 deletion lib/vector-common/src/internal_event/events_sent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::sync::Arc;

use metrics::{register_counter, Counter};
use tracing::trace;

use super::{CountByteSize, Output, SharedString};
use crate::{config::ComponentKey, request_metadata::EventCountTags};

use super::{CountByteSize, OptionalTag, Output, SharedString};

pub const DEFAULT_OUTPUT: &str = "_default";

Expand Down Expand Up @@ -44,3 +48,62 @@ impl From<Output> for EventsSent {
Self { output: output.0 }
}
}

/// Makes a list of the tags to use with the events sent event.
fn make_tags(
source: &OptionalTag<Arc<ComponentKey>>,
service: &OptionalTag<String>,
) -> Vec<(&'static str, String)> {
let mut tags = Vec::new();
if let OptionalTag::Specified(tag) = source {
tags.push((
"source",
tag.as_ref()
.map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
));
}

if let OptionalTag::Specified(tag) = service {
tags.push(("service", tag.clone().unwrap_or("-".to_string())));
}

tags
}

crate::registered_event!(
TaggedEventsSent {
source: OptionalTag<Arc<ComponentKey>>,
service: OptionalTag<String>,
} => {
events: Counter = {
register_counter!("component_sent_events_total", &make_tags(&self.source, &self.service))
},
event_bytes: Counter = {
register_counter!("component_sent_event_bytes_total", &make_tags(&self.source, &self.service))
},
}

fn emit(&self, data: CountByteSize) {
let CountByteSize(count, byte_size) = data;
trace!(message = "Events sent.", %count, %byte_size);

self.events.increment(count as u64);
self.event_bytes.increment(byte_size.get() as u64);
}

fn register(tags: EventCountTags) {
super::register(TaggedEventsSent::new(
tags,
))
}
);

impl TaggedEventsSent {
#[must_use]
pub fn new(tags: EventCountTags) -> Self {
Self {
source: tags.source,
service: tags.service,
}
}
}
40 changes: 38 additions & 2 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
mod bytes_received;
mod bytes_sent;
mod cached_event;
pub mod component_events_dropped;
mod events_received;
mod events_sent;
mod optional_tag;
mod prelude;
pub mod service;

use std::ops::{Add, AddAssign};

pub use metrics::SharedString;

pub use bytes_received::BytesReceived;
pub use bytes_sent::BytesSent;
#[allow(clippy::module_name_repetitions)]
pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache};
pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
pub use events_received::EventsReceived;
pub use events_sent::{EventsSent, DEFAULT_OUTPUT};
pub use events_sent::{EventsSent, TaggedEventsSent, DEFAULT_OUTPUT};
pub use optional_tag::OptionalTag;
pub use prelude::{error_stage, error_type};
pub use service::{CallError, PollReadyError};

Expand Down Expand Up @@ -109,9 +116,24 @@ pub struct ByteSize(pub usize);
pub struct Count(pub usize);

/// Holds the tuple `(count_of_events, estimated_json_size_of_events)`.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct CountByteSize(pub usize, pub JsonSize);

impl AddAssign for CountByteSize {
fn add_assign(&mut self, rhs: Self) {
self.0 += rhs.0;
self.1 += rhs.1;
}
}

impl Add<CountByteSize> for CountByteSize {
type Output = CountByteSize;

fn add(self, rhs: CountByteSize) -> Self::Output {
CountByteSize(self.0 + rhs.0, self.1 + rhs.1)
}
}

// Wrapper types used to hold parameters for registering events

pub struct Output(pub Option<SharedString>);
Expand Down Expand Up @@ -196,6 +218,9 @@ macro_rules! registered_event {

fn emit(&$slf:ident, $data_name:ident: $data:ident)
$emit_body:block

$(fn register($tags_name:ident: $tags:ty)
$register_body:block)?
) => {
paste::paste!{
#[derive(Clone)]
Expand Down Expand Up @@ -223,6 +248,17 @@ macro_rules! registered_event {
fn emit(&$slf, $data_name: $data)
$emit_body
}

$(impl $crate::internal_event::cached_event::RegisterTaggedInternalEvent for $event {
type Tags = $tags;

fn register(
$tags_name: $tags,
) -> <TaggedEventsSent as super::RegisterInternalEvent>::Handle {
$register_body
}
})?

}
};
}
14 changes: 14 additions & 0 deletions lib/vector-common/src/internal_event/optional_tag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/// The user can configure whether a tag should be emitted. If they configure it to
/// be emitted, but the value doesn't exist - we should emit the tag but with a value
/// of `-`.
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)]
pub enum OptionalTag<T> {
Ignored,
Specified(Option<T>),
}

impl<T> From<Option<T>> for OptionalTag<T> {
fn from(value: Option<T>) -> Self {
Self::Specified(value)
}
}
Loading

0 comments on commit dcf7f9a

Please sign in to comment.