Skip to content

Commit

Permalink
[receiver/discovery] Do not emit endpoint that are not evaluated yet (#…
Browse files Browse the repository at this point in the history
…5032)

This change fixes the issue when "unknown" service can be shown in the inventory service right at the collector startup. We don't want to send entity events for endpoints that are not evaluated yet
  • Loading branch information
dmitryax authored Jun 27, 2024
1 parent e61cb7a commit a43ec90
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 168 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Unreleased

### 🧰 Bug fixes 🧰

- (Splunk) `receiver/discovery`: Do not emit entity events for discovered endpoints that are not evaluated yet
to avoid showing "unknown" services on the Service Inventory page ([#5032](https://github.com/signalfx/splunk-otel-collector/pull/5032))

## v0.103.0

This Splunk OpenTelemetry Collector release includes changes from the [opentelemetry-collector v0.103.0](https://github.com/open-telemetry/opentelemetry-collector/releases/tag/v0.103.0) and the [opentelemetry-collector-contrib v0.103.0](https://github.com/open-telemetry/opentelemetry-collector-contrib/releases/tag/v0.103.0) releases where appropriate.
Expand Down
10 changes: 9 additions & 1 deletion internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (n *notify) OnChange(changed []observer.Endpoint) {
n.endpointTracker.updateEndpoints(changed, n.observerID)
}

// entityStateEvents converts observer endpoints to entity state events excluding those
// that don't have a discovery status attribute yet.
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
Expand All @@ -242,6 +244,12 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
continue
}

endpointAttrs := correlations.Attrs(endpoint.ID)
if _, ok := endpointAttrs[discovery.StatusAttr]; !ok {
// If the endpoint doesn't have a status attribute, it's not ready to be emitted.
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityState := entityEvent.SetEntityState()
Expand All @@ -259,7 +267,7 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr(observerNameAttr, observerID.Name())
attrs.PutStr(observerTypeAttr, observerID.Type().String())
for k, v := range correlations.Attrs(endpoint.ID) {
for k, v := range endpointAttrs {
attrs.PutStr(k, v)
}
attrs.PutStr(serviceTypeAttr, deduceServiceType(attrs))
Expand Down
48 changes: 35 additions & 13 deletions internal/receiver/discoveryreceiver/endpoint_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "pod")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "my-mysql")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -92,6 +93,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "port")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "redis-cart")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand All @@ -114,6 +116,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "hostport")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "process.name")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -143,6 +146,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "container")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "container.name")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -174,15 +178,18 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "k8s.node")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
} {
test := tt
t.Run(test.name, func(t *testing.T) {
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName("observer_type", "observer.name"),
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
[]observer.Endpoint{test.endpoint}, corr, t0,
)
require.NoError(t, err)
require.Zero(t, failed)
Expand Down Expand Up @@ -226,6 +233,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
attrs.PutStr("type", "empty.details.env")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand All @@ -248,6 +256,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
attrs.PutStr("type", "unexpected.env")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -290,10 +299,11 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
} {
test := tt
t.Run(test.name, func(t *testing.T) {
// Validate entity_state event
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName("observer_type", "observer.name"),
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
[]observer.Endpoint{test.endpoint}, corr, t0,
)
if test.expectedError != "" {
require.Error(t, err)
Expand Down Expand Up @@ -337,6 +347,8 @@ func FuzzEndpointToPlogs(f *testing.F) {
if err != nil {
observerTypeSanitized = discovery.NoType.Type()
}
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(observer.EndpointID(endpointID), map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName(observerTypeSanitized.String(), observerName), []observer.Endpoint{
{
Expand All @@ -361,7 +373,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
Transport: observer.Transport(transport),
},
},
}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
}, corr, t0,
)

expectedLogs := expectedPLogs()
Expand All @@ -372,6 +384,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
entityIDAttr.Map().PutInt("source.port", int64(port))
attrs := lr.Attributes().PutEmptyMap(discovery.OtelEntityAttributesAttr)
attrs.PutStr(discovery.EndpointIDAttr, endpointID)
attrs.PutStr(discovery.StatusAttr, "successful")
attrs.PutStr(observerNameAttr, observerName)
attrs.PutStr(observerTypeAttr, observerTypeSanitized.String())
attrs.PutStr("endpoint", target)
Expand Down Expand Up @@ -655,13 +668,14 @@ func TestEntityEmittingLifecycle(t *testing.T) {
ch := make(chan plog.Logs, 10)
obsID := component.MustNewIDWithName("fake_observer", "")
obs := &fakeObservable{}
corr := newCorrelationStore(logger, cfg.CorrelationTTL)
et := &endpointTracker{
config: cfg,
observables: map[component.ID]observer.Observable{obsID: obs},
logger: logger,
pLogs: ch,
correlations: newCorrelationStore(logger, cfg.CorrelationTTL),
emitInterval: 50 * time.Millisecond,
correlations: corr,
emitInterval: 20 * time.Millisecond,
stopCh: make(chan struct{}),
}
et.start()
Expand All @@ -676,13 +690,19 @@ func TestEntityEmittingLifecycle(t *testing.T) {

obs.onAdd([]observer.Endpoint{portEndpoint})

// Ensure that no entities are emitted without discovery.status attribute.
time.Sleep(30 * time.Millisecond)
require.Empty(t, ch)

// Once the status attribute is set, the entity should be emitted.
corr.(*store).attrs.Store(portEndpoint.ID, map[string]string{discovery.StatusAttr: "successful"})

// Wait for at least 2 entity events to be emitted to confirm periodic emitting is working.
require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 50*time.Millisecond)
require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 60*time.Millisecond)

gotLogs := <-ch
require.Equal(t, 1, gotLogs.LogRecordCount())
expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint},
newCorrelationStore(logger, time.Hour), t0)
expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, corr, t0)
require.NoError(t, err)
require.Zero(t, failed)
require.NoError(t, plogtest.CompareLogs(expectedEvents.ConvertAndMoveToLogs(), gotLogs, plogtest.IgnoreTimestamp()))
Expand Down Expand Up @@ -732,6 +752,7 @@ func TestEntityStateEvents(t *testing.T) {
cStore := newCorrelationStore(logger, cfg.CorrelationTTL)
cStore.UpdateAttrs(portEndpoint.ID, map[string]string{
discovery.ReceiverTypeAttr: "redis",
discovery.StatusAttr: "successful",
"attr1": "val1",
"attr2": "val2",
})
Expand All @@ -747,10 +768,11 @@ func TestEntityStateEvents(t *testing.T) {
assert.Equal(t, t0, event.Timestamp().AsTime())
assert.Equal(t, map[string]any{"k8s.pod.uid": "uid", "source.port": int64(1)}, event.ID().AsRaw())
assert.Equal(t, map[string]any{
observerNameAttr: "observer.name",
observerTypeAttr: "observer_type",
"endpoint": "port.target",
"name": "port.name",
observerNameAttr: "observer.name",
observerTypeAttr: "observer_type",
discovery.StatusAttr: "successful",
"endpoint": "port.target",
"name": "port.name",
"annotations": map[string]any{
"annotation.one": "value.one",
"annotation.two": "value.two",
Expand Down
11 changes: 0 additions & 11 deletions tests/receivers/discovery/host_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ import (
"github.com/signalfx/splunk-otel-collector/tests/testutils"
)

func TestDiscoveryReceiverWithHostObserverProvidesEndpointLogs(t *testing.T) {
testutils.SkipIfNotContainerTest(t)
if testutils.CollectorImageIsForArm(t) {
t.Skip("host_observer missing process info on arm")
}
testutils.AssertAllLogsReceived(
t, "host_observer_endpoints.yaml",
"host_observer_endpoints_config.yaml", nil, nil,
)
}

func TestDiscoveryReceiverWithHostObserverAndSimplePrometheusReceiverProvideStatusLogs(t *testing.T) {
testutils.SkipIfNotContainerTest(t)
if testutils.CollectorImageIsForArm(t) {
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit a43ec90

Please sign in to comment.