Skip to content

Commit

Permalink
chore(observability)!: remove peer_addr internal metric tag (#18982)
Browse files Browse the repository at this point in the history
* OPW-94 remove peer_addr tag

* fmt
  • Loading branch information
dsmith3197 authored and neuronull committed Oct 30, 2023
1 parent 25320d3 commit b033286
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 41 deletions.
3 changes: 1 addition & 2 deletions src/internal_events/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ impl InternalEvent for TcpBytesReceived {
);
counter!(
"component_received_bytes_total", self.byte_size as u64,
"protocol" => "tcp",
"peer_addr" => self.peer_addr.to_string()
"protocol" => "tcp"
);
}
}
54 changes: 27 additions & 27 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ mod test {
sources::util::net::SocketListenAddr,
test_util::{
collect_n, collect_n_limited,
components::{assert_source_compliance, SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS},
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp,
},
tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
Expand All @@ -389,7 +389,7 @@ mod test {
//////// TCP TESTS ////////
#[tokio::test]
async fn tcp_it_includes_host() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand All @@ -414,7 +414,7 @@ mod test {

#[tokio::test]
async fn tcp_it_includes_vector_namespaced_fields() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();
let mut conf = TcpConfig::from_address(addr.into());
Expand Down Expand Up @@ -454,7 +454,7 @@ mod test {

#[tokio::test]
async fn tcp_splits_on_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -486,7 +486,7 @@ mod test {

#[tokio::test]
async fn tcp_it_includes_source_type() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand All @@ -512,7 +512,7 @@ mod test {

#[tokio::test]
async fn tcp_continue_after_long_line() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -553,7 +553,7 @@ mod test {

#[tokio::test]
async fn tcp_with_tls() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -617,7 +617,7 @@ mod test {

#[tokio::test]
async fn tcp_with_tls_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -692,7 +692,7 @@ mod test {

#[tokio::test]
async fn tcp_shutdown_simple() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let source_id = ComponentKey::from("tcp_shutdown_simple");
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();
Expand Down Expand Up @@ -960,7 +960,7 @@ mod test {

#[tokio::test]
async fn udp_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -977,7 +977,7 @@ mod test {

#[tokio::test]
async fn udp_message_preserves_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -994,7 +994,7 @@ mod test {

#[tokio::test]
async fn udp_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -1015,7 +1015,7 @@ mod test {

#[tokio::test]
async fn udp_max_length() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
Expand Down Expand Up @@ -1051,7 +1051,7 @@ mod test {
/// Windows will drop the entire packet if we exceed the max_length so we are unable to
/// extract anything.
async fn udp_max_length_delimited() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
Expand Down Expand Up @@ -1082,7 +1082,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_host() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -1097,7 +1097,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_vector_namespaced_fields() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, true).await;

Expand Down Expand Up @@ -1125,7 +1125,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_source_type() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -1142,7 +1142,7 @@ mod test {

#[tokio::test]
async fn udp_shutdown_simple() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let source_id = ComponentKey::from("udp_shutdown_simple");

Expand Down Expand Up @@ -1172,7 +1172,7 @@ mod test {

#[tokio::test]
async fn udp_shutdown_infinite_stream() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let source_id = ComponentKey::from("udp_shutdown_infinite_stream");

Expand Down Expand Up @@ -1332,7 +1332,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", false, false).await;
let events = collect_n(rx, 1).await;

Expand Down Expand Up @@ -1399,7 +1399,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message_with_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", false, true).await;
let events = collect_n(rx, 1).await;
let log = events[0].as_log();
Expand All @@ -1424,7 +1424,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message_preserves_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("foo\nbar", false, false).await;
let events = collect_n(rx, 1).await;

Expand All @@ -1444,7 +1444,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
unix_multiple_packets(false).await
})
.await;
Expand Down Expand Up @@ -1511,7 +1511,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", true, false).await;
let events = collect_n(rx, 1).await;

Expand All @@ -1531,7 +1531,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message_with_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", true, true).await;
let events = collect_n(rx, 1).await;
let log = events[0].as_log();
Expand All @@ -1554,7 +1554,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message_splits_on_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("foo\nbar", true, false).await;
let events = collect_n(rx, 2).await;

Expand Down Expand Up @@ -1582,7 +1582,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
unix_multiple_packets(true).await
})
.await;
Expand Down
8 changes: 4 additions & 4 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ mod test {
collect_limited,
components::{
assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS,
SOCKET_PUSH_SOURCE_TAGS,
},
metrics::{assert_counter, assert_distribution, assert_gauge, assert_set},
next_addr,
Expand All @@ -365,7 +365,7 @@ mod test {

#[tokio::test]
async fn test_statsd_udp() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_addr = next_addr();
let config = StatsdConfig::Udp(UdpConfig::from_address(in_addr.into()));
let (sender, mut receiver) = mpsc::channel(200);
Expand All @@ -384,7 +384,7 @@ mod test {

#[tokio::test]
async fn test_statsd_tcp() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_addr = next_addr();
let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
let (sender, mut receiver) = mpsc::channel(200);
Expand Down Expand Up @@ -427,7 +427,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn test_statsd_unix() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test");
let config = StatsdConfig::Unix(UnixConfig {
path: in_path.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1190,14 +1190,14 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn test_unix_stream_syslog() {
use crate::test_util::components::SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS;
use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
use futures_util::{stream, SinkExt};
use std::os::unix::net::UnixStream as StdUnixStream;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio_util::codec::{FramedWrite, LinesCodec};

assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let num_messages: usize = 1;
let in_path = tempfile::tempdir().unwrap().into_path().join("stream_test");

Expand Down
7 changes: 1 addition & 6 deletions src/test_util/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ pub const HTTP_PULL_SOURCE_TAGS: [&str; 2] = ["endpoint", "protocol"];
pub const HTTP_PUSH_SOURCE_TAGS: [&str; 2] = ["http_path", "protocol"];

/// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`.
pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 2] = ["peer_addr", "protocol"];

/// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`, but
/// specifically sources that experience high cardinality i.e. many many clients, where emitting metrics with the peer
/// address as a tag would represent too high of a cost to pay.
pub const SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"];
pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"];

/// The standard set of tags for all generic socket-based sources that poll connections i.e. Redis.
pub const SOCKET_PULL_SOURCE_TAGS: [&str; 2] = ["remote_addr", "protocol"];
Expand Down
12 changes: 12 additions & 0 deletions website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Vector's 0.34.0 release includes **breaking changes**:

1. [Removal of Deprecated Datadog Component Config Options](#datadog-deprecated-config-options)
1. [Removal of Deprecated `component_name` Metric Tag](#deprecated-component-name)
1. [Blackhole sink no longer reports by default](#blackhole-sink-reporting)
1. [Removal of `peer_addr` Metric Tag](#remove-peer-addr)

We cover them below to help you upgrade quickly:

Expand All @@ -31,3 +33,13 @@ been removed from the Enterprise configuration. Instead of `region`, `site` shou
#### Removal of Deprecated `component_name` Metric Tag {#deprecated-component-name}

The deprecated `component_name` tag has been removed from all internal metrics. Instead the `component_id` tag should be used.

#### Blackhole sink no longer reports by default {#blackhole-sink-reporting}

The `blackhole` sink no longer reports events processed every second by default. Instead this
behavior can be opted into by setting `print_interval_secs` to `1` (or any other integer). This
change was made due to users being surprised that this sink generates output by default.

#### Removal of `peer_addr` Metric Tag {#remove-peer-addr}

The `peer_addr` tag has been removed from the `component_received_bytes_total` internal metric for TCP-based sources due to its unbounded cardinality.

0 comments on commit b033286

Please sign in to comment.