Skip to content

Commit

Permalink
fix(observability): Emit processed_events_total in topology for tra…
Browse files Browse the repository at this point in the history
…nsforms (#5492)

* Remove excess metric emits

Signed-off-by: ktf <krunotf@gmail.com>

* Remove unneded labels

Signed-off-by: ktf <krunotf@gmail.com>

* Emit EventProcessed for transforms

Signed-off-by: ktf <krunotf@gmail.com>

* Remove transform emits of events_processed

Signed-off-by: ktf <krunotf@gmail.com>
  • Loading branch information
ktff authored Dec 18, 2020
1 parent 01e726a commit 9f760f2
Show file tree
Hide file tree
Showing 60 changed files with 47 additions and 410 deletions.
9 changes: 0 additions & 9 deletions src/internal_events/add_fields.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct AddFieldsEventProcessed;

impl InternalEvent for AddFieldsEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub struct AddFieldsTemplateRenderingError<'a> {
pub field: &'a str,
Expand Down
10 changes: 0 additions & 10 deletions src/internal_events/add_tags.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct AddTagsEventProcessed;

impl InternalEvent for AddTagsEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub struct AddTagsTagOverwritten<'a> {
Expand Down
9 changes: 0 additions & 9 deletions src/internal_events/ansi_stripper.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct ANSIStripperEventProcessed;

impl InternalEvent for ANSIStripperEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub struct ANSIStripperFieldMissing<'a> {
pub field: &'a str,
Expand Down
13 changes: 0 additions & 13 deletions src/internal_events/aws_cloudwatch_logs_subscription_parser.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub(crate) struct AwsCloudwatchLogsSubscriptionParserEventProcessed;

impl InternalEvent for AwsCloudwatchLogsSubscriptionParserEventProcessed {
fn emit_logs(&self) {
trace!(message = "Received one event.");
}

fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub(crate) struct AwsCloudwatchLogsSubscriptionParserFailedParse {
pub error: serde_json::Error,
Expand Down
13 changes: 0 additions & 13 deletions src/internal_events/aws_ec2_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct AwsEc2MetadataEventProcessed;

impl InternalEvent for AwsEc2MetadataEventProcessed {
fn emit_logs(&self) {
trace!(message = "Processed one event.");
}

fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub struct AwsEc2MetadataRefreshSuccessful;

Expand Down
9 changes: 0 additions & 9 deletions src/internal_events/coercer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct CoercerEventProcessed;

impl InternalEvent for CoercerEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub(crate) struct CoercerConversionFailed<'a> {
pub field: &'a str,
Expand Down
9 changes: 0 additions & 9 deletions src/internal_events/concat.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct ConcatEventProcessed;

impl InternalEvent for ConcatEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub struct ConcatSubstringError<'a> {
pub source: &'a str,
Expand Down
9 changes: 0 additions & 9 deletions src/internal_events/dedupe.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub(crate) struct DedupeEventProcessed;

impl InternalEvent for DedupeEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub(crate) struct DedupeEventDiscarded {
pub event: crate::Event,
Expand Down
9 changes: 0 additions & 9 deletions src/internal_events/geoip.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct GeoipEventProcessed;

impl InternalEvent for GeoipEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub(crate) struct GeoipIpAddressParseError<'a> {
pub address: &'a str,
Expand Down
13 changes: 0 additions & 13 deletions src/internal_events/grok_parser.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub(crate) struct GrokParserEventProcessed;

impl InternalEvent for GrokParserEventProcessed {
fn emit_logs(&self) {
trace!(message = "Processed one event.");
}

fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub(crate) struct GrokParserFailedMatch<'a> {
pub value: &'a str,
Expand Down
13 changes: 0 additions & 13 deletions src/internal_events/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,6 @@ use super::InternalEvent;
use metrics::counter;
use serde_json::Error;

#[derive(Debug)]
pub(crate) struct JsonParserEventProcessed;

impl InternalEvent for JsonParserEventProcessed {
fn emit_logs(&self) {
trace!(message = "Received one event.");
}

fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub(crate) struct JsonParserFailedParse<'a> {
pub field: &'a str,
Expand Down
18 changes: 0 additions & 18 deletions src/internal_events/key_value_parser.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub(crate) struct KeyValueEventProcessed;

impl InternalEvent for KeyValueEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1,
"component_kind" => "transform",
"component_type" => "key_value",
);
}
}

#[derive(Debug)]
pub(crate) struct KeyValueParseFailed {
pub key: String,
Expand All @@ -31,8 +19,6 @@ impl InternalEvent for KeyValueParseFailed {

fn emit_metrics(&self) {
counter!("processing_errors_total", 1,
"component_kind" => "transform",
"component_type" => "key_value_parser",
"error_type" => "failed_parse",
);
}
Expand All @@ -54,8 +40,6 @@ impl<'a> InternalEvent for KeyValueTargetExists<'a> {

fn emit_metrics(&self) {
counter!("processing_errors_total", 1,
"component_kind" => "transform",
"component_type" => "key_value_parser",
"error_type" => "target_field_exists",
);
}
Expand All @@ -77,8 +61,6 @@ impl InternalEvent for KeyValueFieldDoesNotExist {

fn emit_metrics(&self) {
counter!("processing_errors_total", 1,
"component_kind" => "transform",
"component_type" => "key_value_parser",
"error_type" => "failed_parse",
);
}
Expand Down
12 changes: 0 additions & 12 deletions src/internal_events/log_to_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@ use super::InternalEvent;
use metrics::counter;
use std::num::ParseFloatError;

pub(crate) struct LogToMetricEventProcessed;

impl InternalEvent for LogToMetricEventProcessed {
fn emit_logs(&self) {
trace!(message = "Processed one event.");
}

fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

pub(crate) struct LogToMetricFieldNotFound<'a> {
pub field: &'a str,
}
Expand Down
13 changes: 0 additions & 13 deletions src/internal_events/logfmt_parser.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct LogfmtParserEventProcessed;

impl InternalEvent for LogfmtParserEventProcessed {
fn emit_logs(&self) {
trace!(message = "Processed one event.");
}

fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub struct LogfmtParserMissingField<'a> {
pub field: &'a str,
Expand Down
9 changes: 0 additions & 9 deletions src/internal_events/lua.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use super::InternalEvent;
use metrics::{counter, gauge};

#[derive(Debug)]
pub struct LuaEventProcessed;

impl InternalEvent for LuaEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub struct LuaGcTriggered {
pub used_memory: usize,
Expand Down
13 changes: 0 additions & 13 deletions src/internal_events/metric_to_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,6 @@ use super::InternalEvent;
use metrics::counter;
use serde_json::Error;

#[derive(Debug)]
pub(crate) struct MetricToLogEventProcessed;

impl InternalEvent for MetricToLogEventProcessed {
fn emit_logs(&self) {
trace!(message = "Processed one event.");
}

fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub(crate) struct MetricToLogFailedSerialize {
pub error: Error,
Expand Down
6 changes: 2 additions & 4 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ mod regex_parser;
mod remap;
#[cfg(feature = "transforms-remove_fields")]
mod remove_fields;
#[cfg(feature = "transforms-remove_tags")]
mod remove_tags;
#[cfg(feature = "transforms-rename_fields")]
mod rename_fields;
mod sampler;
Expand All @@ -108,6 +106,7 @@ mod tag_cardinality_limit;
mod tcp;
#[cfg(feature = "transforms-tokenizer")]
mod tokenizer;
mod topology;
mod udp;
mod unix;
mod vector;
Expand Down Expand Up @@ -202,8 +201,6 @@ pub(crate) use self::regex_parser::*;
pub use self::remap::*;
#[cfg(feature = "transforms-remove_fields")]
pub use self::remove_fields::*;
#[cfg(feature = "transforms-remove_tags")]
pub use self::remove_tags::*;
#[cfg(feature = "transforms-rename_fields")]
pub use self::rename_fields::*;
pub use self::sampler::*;
Expand All @@ -226,6 +223,7 @@ pub(crate) use self::tag_cardinality_limit::*;
pub use self::tcp::*;
#[cfg(feature = "transforms-tokenizer")]
pub(crate) use self::tokenizer::*;
pub use self::topology::*;
pub use self::udp::*;
pub use self::unix::*;
pub use self::vector::*;
Expand Down
6 changes: 0 additions & 6 deletions src/internal_events/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,13 @@ impl InternalEvent for PrometheusRemoteWriteSnapError {

#[derive(Debug)]
pub struct PrometheusRemoteWriteReceived {
pub byte_size: usize,
pub count: usize,
}

impl InternalEvent for PrometheusRemoteWriteReceived {
fn emit_logs(&self) {
debug!(message = "Received remote_write events.", count = ?self.count);
}

fn emit_metrics(&self) {
counter!("processed_events_total", self.count as u64);
counter!("processed_bytes_total", self.byte_size as u64);
}
}

#[derive(Debug)]
Expand Down
9 changes: 0 additions & 9 deletions src/internal_events/reduce.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub(crate) struct ReduceEventProcessed;

impl InternalEvent for ReduceEventProcessed {
fn emit_metrics(&self) {
counter!("processed_events_total", 1);
}
}

#[derive(Debug)]
pub(crate) struct ReduceStaleEventFlushed;

Expand Down
Loading

0 comments on commit 9f760f2

Please sign in to comment.