From 2b406f270ed095bb82a453468f5b5f86b29eaed2 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Wed, 26 Jun 2024 17:33:08 -0700 Subject: [PATCH] [receiver/discovery] Do not emit endpoint that are not evaluated yet This change fixes the issue when "unknown" service can be shown in the inventory service right at the collector startup. We don't want to send entity events for endpoints that are not evaluated yet --- .../discoveryreceiver/endpoint_tracker.go | 10 ++++- .../endpoint_tracker_test.go | 40 +++++++++++++------ 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker.go b/internal/receiver/discoveryreceiver/endpoint_tracker.go index a0f15dd3f6..e96b5d614e 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker.go @@ -233,6 +233,8 @@ func (n *notify) OnChange(changed []observer.Endpoint) { n.endpointTracker.updateEndpoints(changed, n.observerID) } +// entityStateEvents converts observer endpoints to entity state events excluding those +// that don't have a discovery status attribute yet. func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) { entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() for _, endpoint := range endpoints { @@ -242,6 +244,12 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c continue } + endpointAttrs := correlations.Attrs(endpoint.ID) + if _, ok := endpointAttrs[discovery.StatusAttr]; !ok { + // If the endpoint doesn't have a status attribute, it's not ready to be emitted. + continue + } + entityEvent := entityEvents.AppendEmpty() entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts)) entityState := entityEvent.SetEntityState() @@ -259,7 +267,7 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c attrs.PutStr("endpoint", endpoint.Target) attrs.PutStr(observerNameAttr, observerID.Name()) attrs.PutStr(observerTypeAttr, observerID.Type().String()) - for k, v := range correlations.Attrs(endpoint.ID) { + for k, v := range endpointAttrs { attrs.PutStr(k, v) } attrs.PutStr(serviceTypeAttr, deduceServiceType(attrs)) diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go index 302826f980..0015203366 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go @@ -63,6 +63,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "pod") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "my-mysql") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -92,6 +93,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "port") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "redis-cart") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -114,6 +116,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "hostport") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "process.name") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -143,6 +146,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "container") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "container.name") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -174,15 +178,18 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { attrs.PutStr("type", "k8s.node") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "unknown") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, } { test := tt t.Run(test.name, func(t *testing.T) { + corr := newCorrelationStore(zap.NewNop(), time.Hour) + corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"}) events, failed, err := entityStateEvents( component.MustNewIDWithName("observer_type", "observer.name"), - []observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0, + []observer.Endpoint{test.endpoint}, corr, t0, ) require.NoError(t, err) require.Zero(t, failed) @@ -226,6 +233,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { attrs.PutStr("type", "empty.details.env") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "unknown") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -248,6 +256,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { attrs.PutStr("type", "unexpected.env") attrs.PutStr("service.type", "unknown") attrs.PutStr("service.name", "unknown") + attrs.PutStr(discovery.StatusAttr, "successful") return plogs }(), }, @@ -290,10 +299,11 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { } { test := tt t.Run(test.name, func(t *testing.T) { - // Validate entity_state event + corr := newCorrelationStore(zap.NewNop(), time.Hour) + corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"}) events, failed, err := entityStateEvents( component.MustNewIDWithName("observer_type", "observer.name"), - []observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0, + []observer.Endpoint{test.endpoint}, corr, t0, ) if test.expectedError != "" { require.Error(t, err) @@ -337,6 +347,8 @@ func FuzzEndpointToPlogs(f *testing.F) { if err != nil { observerTypeSanitized = discovery.NoType.Type() } + corr := newCorrelationStore(zap.NewNop(), time.Hour) + corr.(*store).attrs.Store(observer.EndpointID(endpointID), map[string]string{discovery.StatusAttr: "successful"}) events, failed, err := entityStateEvents( component.MustNewIDWithName(observerTypeSanitized.String(), observerName), []observer.Endpoint{ { @@ -361,7 +373,7 @@ func FuzzEndpointToPlogs(f *testing.F) { Transport: observer.Transport(transport), }, }, - }, newCorrelationStore(zap.NewNop(), time.Hour), t0, + }, corr, t0, ) expectedLogs := expectedPLogs() @@ -372,6 +384,7 @@ func FuzzEndpointToPlogs(f *testing.F) { entityIDAttr.Map().PutInt("source.port", int64(port)) attrs := lr.Attributes().PutEmptyMap(discovery.OtelEntityAttributesAttr) attrs.PutStr(discovery.EndpointIDAttr, endpointID) + attrs.PutStr(discovery.StatusAttr, "successful") attrs.PutStr(observerNameAttr, observerName) attrs.PutStr(observerTypeAttr, observerTypeSanitized.String()) attrs.PutStr("endpoint", target) @@ -655,12 +668,14 @@ func TestEntityEmittingLifecycle(t *testing.T) { ch := make(chan plog.Logs, 10) obsID := component.MustNewIDWithName("fake_observer", "") obs := &fakeObservable{} + corr := newCorrelationStore(logger, cfg.CorrelationTTL) + corr.(*store).attrs.Store(portEndpoint.ID, map[string]string{discovery.StatusAttr: "successful"}) et := &endpointTracker{ config: cfg, observables: map[component.ID]observer.Observable{obsID: obs}, logger: logger, pLogs: ch, - correlations: newCorrelationStore(logger, cfg.CorrelationTTL), + correlations: corr, emitInterval: 50 * time.Millisecond, stopCh: make(chan struct{}), } @@ -677,12 +692,11 @@ func TestEntityEmittingLifecycle(t *testing.T) { obs.onAdd([]observer.Endpoint{portEndpoint}) // Wait for at least 2 entity events to be emitted to confirm periodic emitting is working. - require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 100*time.Millisecond) gotLogs := <-ch require.Equal(t, 1, gotLogs.LogRecordCount()) - expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, - newCorrelationStore(logger, time.Hour), t0) + expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, corr, t0) require.NoError(t, err) require.Zero(t, failed) require.NoError(t, plogtest.CompareLogs(expectedEvents.ConvertAndMoveToLogs(), gotLogs, plogtest.IgnoreTimestamp())) @@ -732,6 +746,7 @@ func TestEntityStateEvents(t *testing.T) { cStore := newCorrelationStore(logger, cfg.CorrelationTTL) cStore.UpdateAttrs(portEndpoint.ID, map[string]string{ discovery.ReceiverTypeAttr: "redis", + discovery.StatusAttr: "successful", "attr1": "val1", "attr2": "val2", }) @@ -747,10 +762,11 @@ func TestEntityStateEvents(t *testing.T) { assert.Equal(t, t0, event.Timestamp().AsTime()) assert.Equal(t, map[string]any{"k8s.pod.uid": "uid", "source.port": int64(1)}, event.ID().AsRaw()) assert.Equal(t, map[string]any{ - observerNameAttr: "observer.name", - observerTypeAttr: "observer_type", - "endpoint": "port.target", - "name": "port.name", + observerNameAttr: "observer.name", + observerTypeAttr: "observer_type", + discovery.StatusAttr: "successful", + "endpoint": "port.target", + "name": "port.name", "annotations": map[string]any{ "annotation.one": "value.one", "annotation.two": "value.two",