From 63060998492e5be0d045982aab4e539cfb636de7 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Wed, 26 Jun 2024 17:33:08 -0700 Subject: [PATCH] [receiver/discovery] Do not emit endpoint that are not evaluated yet This change fixes the issue when "unknown" service can be shown in the inventory service right at the collector startup. We don't want to send entity events for endpoints that are not evaluated yet --- CHANGELOG.md | 7 ++ .../discoveryreceiver/endpoint_tracker.go | 10 +- .../endpoint_tracker_test.go | 48 +++++--- .../receivers/discovery/host_observer_test.go | 11 -- .../host_observer_endpoints_config.yaml | 34 ------ .../host_observer_endpoints.yaml | 109 ------------------ 6 files changed, 51 insertions(+), 168 deletions(-) delete mode 100644 tests/receivers/discovery/testdata/host_observer_endpoints_config.yaml delete mode 100644 tests/receivers/discovery/testdata/resource_logs/host_observer_endpoints.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index e6e47b533b..42574d8fd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## Unreleased + +### 🧰 Bug fixes 🧰 + +- (Splunk) `receiver/discovery`: Do not emit entity events for discovered endpoints that are not evaluated yet + to avoid showing "unknown" services on the Service Inventory page ([#5032](https://github.com/signalfx/splunk-otel-collector/pull/5032)) + ## v0.103.0 This Splunk OpenTelemetry Collector release includes changes from the [opentelemetry-collector v0.103.0](https://github.com/open-telemetry/opentelemetry-collector/releases/tag/v0.103.0) and the [opentelemetry-collector-contrib v0.103.0](https://github.com/open-telemetry/opentelemetry-collector-contrib/releases/tag/v0.103.0) releases where appropriate. diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker.go b/internal/receiver/discoveryreceiver/endpoint_tracker.go index a0f15dd3f6..e96b5d614e 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker.go @@ -233,6 +233,8 @@ func (n *notify) OnChange(changed []observer.Endpoint) { n.endpointTracker.updateEndpoints(changed, n.observerID) } +// entityStateEvents converts observer endpoints to entity state events excluding those +// that don't have a discovery status attribute yet. func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) { entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() for _, endpoint := range endpoints { @@ -242,6 +244,12 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c continue } + endpointAttrs := correlations.Attrs(endpoint.ID) + if _, ok := endpointAttrs[discovery.StatusAttr]; !ok { + // If the endpoint doesn't have a status attribute, it's not ready to be emitted. + continue + } + entityEvent := entityEvents.AppendEmpty() entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts)) entityState := entityEvent.SetEntityState() @@ -259,7 +267,7 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c attrs.PutStr("endpoint", endpoint.Target) attrs.PutStr(observerNameAttr, observerID.Name()) attrs.PutStr(observerTypeAttr, observerID.Type().String()) - for k, v := range correlations.Attrs(endpoint.ID) { + for k, v := range endpointAttrs { attrs.PutStr(k, v) } attrs.PutStr(serviceTypeAttr, deduceServiceType(attrs)) diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go index 302826f980..67f455ab54 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go @@ -63,6 +63,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "pod") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "my-mysql") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -92,6 +93,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "port") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "redis-cart") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -114,6 +116,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "hostport") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "process.name") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -143,6 +146,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "container") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "container.name") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -174,15 +178,18 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "k8s.node") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "unknown") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, } { test := tt t.Run(test.name, func(t *testing.T) { + corr := newCorrelationStore(zap.NewNop(), time.Hour) + corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"}) events, failed, err := entityStateEvents( component.MustNewIDWithName("observer_type", "observer.name"), - []observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0, + []observer.Endpoint{test.endpoint}, corr, t0, ) require.NoError(t, err) require.Zero(t, failed) @@ -226,6 +233,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { attrs.PutStr("type", "empty.details.env") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "unknown") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -248,6 +256,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { attrs.PutStr("type", "unexpected.env") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "unknown") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -290,10 +299,11 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { } { test := tt t.Run(test.name, func(t *testing.T) { - // Validate entity_state event + corr := newCorrelationStore(zap.NewNop(), time.Hour) + corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"}) events, failed, err := entityStateEvents( component.MustNewIDWithName("observer_type", "observer.name"), - []observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0, + []observer.Endpoint{test.endpoint}, corr, t0, ) if test.expectedError != "" { require.Error(t, err) @@ -337,6 +347,8 @@ func FuzzEndpointToPlogs(f *testing.F) { if err != nil { observerTypeSanitized = discovery.NoType.Type() } + corr := newCorrelationStore(zap.NewNop(), time.Hour) + corr.(*store).attrs.Store(observer.EndpointID(endpointID), map[string]string{discovery.StatusAttr: "successful"}) events, failed, err := entityStateEvents( component.MustNewIDWithName(observerTypeSanitized.String(), observerName), []observer.Endpoint{ { @@ -361,7 +373,7 @@ func FuzzEndpointToPlogs(f *testing.F) { Transport: observer.Transport(transport), }, }, - }, newCorrelationStore(zap.NewNop(), time.Hour), t0, + }, corr, t0, ) expectedLogs := expectedPLogs() @@ -372,6 +384,7 @@ func FuzzEndpointToPlogs(f *testing.F) { entityIDAttr.Map().PutInt("source.port", int64(port)) attrs := lr.Attributes().PutEmptyMap(discovery.OtelEntityAttributesAttr) attrs.PutStr(discovery.EndpointIDAttr, endpointID) + attrs.PutStr(discovery.StatusAttr, "successful") attrs.PutStr(observerNameAttr, observerName) attrs.PutStr(observerTypeAttr, observerTypeSanitized.String()) attrs.PutStr("endpoint", target) @@ -655,13 +668,14 @@ func TestEntityEmittingLifecycle(t *testing.T) { ch := make(chan plog.Logs, 10) obsID := component.MustNewIDWithName("fake_observer", "") obs := &fakeObservable{} + corr := newCorrelationStore(logger, cfg.CorrelationTTL) et := &endpointTracker{ config: cfg, observables: map[component.ID]observer.Observable{obsID: obs}, logger: logger, pLogs: ch, - correlations: newCorrelationStore(logger, cfg.CorrelationTTL), - emitInterval: 50 * time.Millisecond, + correlations: corr, + emitInterval: 20 * time.Millisecond, stopCh: make(chan struct{}), } et.start() @@ -676,13 +690,19 @@ func TestEntityEmittingLifecycle(t *testing.T) { obs.onAdd([]observer.Endpoint{portEndpoint}) + // Ensure that no entities are emitted without discovery.status attribute. + time.Sleep(30 * time.Millisecond) + require.Empty(t, ch) + + // Once the status attribute is set, the entity should be emitted. + corr.(*store).attrs.Store(portEndpoint.ID, map[string]string{discovery.StatusAttr: "successful"}) + // Wait for at least 2 entity events to be emitted to confirm periodic emitting is working. - require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 60*time.Millisecond) gotLogs := <-ch require.Equal(t, 1, gotLogs.LogRecordCount()) - expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, - newCorrelationStore(logger, time.Hour), t0) + expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, corr, t0) require.NoError(t, err) require.Zero(t, failed) require.NoError(t, plogtest.CompareLogs(expectedEvents.ConvertAndMoveToLogs(), gotLogs, plogtest.IgnoreTimestamp())) @@ -732,6 +752,7 @@ func TestEntityStateEvents(t *testing.T) { cStore := newCorrelationStore(logger, cfg.CorrelationTTL) cStore.UpdateAttrs(portEndpoint.ID, map[string]string{ discovery.ReceiverTypeAttr: "redis", + discovery.StatusAttr: "successful", "attr1": "val1", "attr2": "val2", }) @@ -747,10 +768,11 @@ func TestEntityStateEvents(t *testing.T) { assert.Equal(t, t0, event.Timestamp().AsTime()) assert.Equal(t, map[string]any{"k8s.pod.uid": "uid", "source.port": int64(1)}, event.ID().AsRaw()) assert.Equal(t, map[string]any{ - observerNameAttr: "observer.name", - observerTypeAttr: "observer_type", - "endpoint": "port.target", - "name": "port.name", + observerNameAttr: "observer.name", + observerTypeAttr: "observer_type", + discovery.StatusAttr: "successful", + "endpoint": "port.target", + "name": "port.name", "annotations": map[string]any{ "annotation.one": "value.one", "annotation.two": "value.two", diff --git a/tests/receivers/discovery/host_observer_test.go b/tests/receivers/discovery/host_observer_test.go index aed4314983..1f52509be5 100644 --- a/tests/receivers/discovery/host_observer_test.go +++ b/tests/receivers/discovery/host_observer_test.go @@ -22,17 +22,6 @@ import ( "github.com/signalfx/splunk-otel-collector/tests/testutils" ) -func TestDiscoveryReceiverWithHostObserverProvidesEndpointLogs(t *testing.T) { - testutils.SkipIfNotContainerTest(t) - if testutils.CollectorImageIsForArm(t) { - t.Skip("host_observer missing process info on arm") - } - testutils.AssertAllLogsReceived( - t, "host_observer_endpoints.yaml", - "host_observer_endpoints_config.yaml", nil, nil, - ) -} - func TestDiscoveryReceiverWithHostObserverAndSimplePrometheusReceiverProvideStatusLogs(t *testing.T) { testutils.SkipIfNotContainerTest(t) if testutils.CollectorImageIsForArm(t) { diff --git a/tests/receivers/discovery/testdata/host_observer_endpoints_config.yaml b/tests/receivers/discovery/testdata/host_observer_endpoints_config.yaml deleted file mode 100644 index 7ade36f303..0000000000 --- a/tests/receivers/discovery/testdata/host_observer_endpoints_config.yaml +++ /dev/null @@ -1,34 +0,0 @@ -extensions: - host_observer: - host_observer/with_name: - host_observer/with/another/name: - -receivers: - discovery: - receivers: - hostmetrics: - rule: type == "hostport" - status: - metrics: - - status: successful - regexp: .* - watch_observers: - - host_observer - - host_observer/with_name - - host_observer/with/another/name - -exporters: - otlp: - endpoint: "${OTLP_ENDPOINT}" - tls: - insecure: true - -service: - extensions: - - host_observer - - host_observer/with_name - - host_observer/with/another/name - pipelines: - logs: - receivers: [discovery] - exporters: [otlp] diff --git a/tests/receivers/discovery/testdata/resource_logs/host_observer_endpoints.yaml b/tests/receivers/discovery/testdata/resource_logs/host_observer_endpoints.yaml deleted file mode 100644 index 6376036744..0000000000 --- a/tests/receivers/discovery/testdata/resource_logs/host_observer_endpoints.yaml +++ /dev/null @@ -1,109 +0,0 @@ -resource_logs: - - scope_logs: - - logs: - - attributes: - otel.entity.type: service - otel.entity.id: - source.port: 55554 - otel.entity.event.type: entity_state - otel.entity.attributes: - discovery.endpoint.id: (host_observer)127.0.0.1-55554-TCP-1 - discovery.observer.type: host_observer - discovery.observer.name: "" - command: /otelcol --config /etc/config.yaml - endpoint: 127.0.0.1:55554 - is_ipv6: false - process_name: otelcol - transport: TCP - type: hostport - service.type: unknown - service.name: otelcol - - attributes: - otel.entity.type: service - otel.entity.id: - source.port: 8888 - otel.entity.event.type: entity_state - otel.entity.attributes: - discovery.endpoint.id: (host_observer)[::]-8888-TCP-1 - discovery.observer.type: host_observer - discovery.observer.name: "" - command: /otelcol --config /etc/config.yaml - endpoint: '[::]:8888' - is_ipv6: true - process_name: otelcol - transport: TCP - type: hostport - service.type: unknown - service.name: otelcol - - scope_logs: - - logs: - - attributes: - otel.entity.type: service - otel.entity.id: - source.port: 55554 - otel.entity.event.type: entity_state - otel.entity.attributes: - discovery.endpoint.id: (host_observer/with_name)127.0.0.1-55554-TCP-1 - discovery.observer.type: host_observer - discovery.observer.name: with_name - command: /otelcol --config /etc/config.yaml - endpoint: 127.0.0.1:55554 - is_ipv6: false - process_name: otelcol - transport: TCP - type: hostport - service.type: unknown - service.name: otelcol - - attributes: - otel.entity.type: service - otel.entity.id: - source.port: 8888 - otel.entity.event.type: entity_state - otel.entity.attributes: - discovery.endpoint.id: (host_observer/with_name)[::]-8888-TCP-1 - discovery.observer.type: host_observer - discovery.observer.name: with_name - command: /otelcol --config /etc/config.yaml - endpoint: '[::]:8888' - is_ipv6: true - process_name: otelcol - transport: TCP - type: hostport - service.type: unknown - service.name: otelcol - - scope_logs: - - logs: - - attributes: - otel.entity.type: service - otel.entity.id: - source.port: 55554 - otel.entity.event.type: entity_state - otel.entity.attributes: - discovery.endpoint.id: (host_observer/with/another/name)127.0.0.1-55554-TCP-1 - discovery.observer.type: host_observer - discovery.observer.name: with/another/name - command: /otelcol --config /etc/config.yaml - endpoint: 127.0.0.1:55554 - is_ipv6: false - process_name: otelcol - transport: TCP - type: hostport - service.type: unknown - service.name: otelcol - - attributes: - otel.entity.type: service - otel.entity.id: - source.port: 8888 - otel.entity.event.type: entity_state - otel.entity.attributes: - discovery.endpoint.id: (host_observer/with/another/name)[::]-8888-TCP-1 - discovery.observer.type: host_observer - discovery.observer.name: with/another/name - command: /otelcol --config /etc/config.yaml - endpoint: '[::]:8888' - is_ipv6: true - process_name: otelcol - transport: TCP - type: hostport - service.type: unknown - service.name: otelcol