Skip to content

Commit

Permalink
[receiver/discovery] Do not emit endpoint that are not evaluated yet
Browse files Browse the repository at this point in the history
This change fixes the issue when "unknown" service can be shown in the inventory service right at the collector startup. We don't want to send entity events for endpoints that are not evaluated yet
  • Loading branch information
dmitryax committed Jun 27, 2024
1 parent e61cb7a commit 23dc993
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 167 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Unreleased

### 🧰 Bug fixes 🧰

- (Splunk) `receiver/discovery`: Do not emit entity events for discovered endpoints that are not evaluated yet
to avoid showing "unknown" services on the Service Inventory page ([#5032](https://github.com/signalfx/splunk-otel-collector/pull/5032))

## v0.103.0

This Splunk OpenTelemetry Collector release includes changes from the [opentelemetry-collector v0.103.0](https://github.com/open-telemetry/opentelemetry-collector/releases/tag/v0.103.0) and the [opentelemetry-collector-contrib v0.103.0](https://github.com/open-telemetry/opentelemetry-collector-contrib/releases/tag/v0.103.0) releases where appropriate.
Expand Down
10 changes: 9 additions & 1 deletion internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (n *notify) OnChange(changed []observer.Endpoint) {
n.endpointTracker.updateEndpoints(changed, n.observerID)
}

// entityStateEvents converts observer endpoints to entity state events excluding those
// that don't have a discovery status attribute yet.
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
Expand All @@ -242,6 +244,12 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
continue
}

endpointAttrs := correlations.Attrs(endpoint.ID)
if _, ok := endpointAttrs[discovery.StatusAttr]; !ok {
// If the endpoint doesn't have a status attribute, it's not ready to be emitted.
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityState := entityEvent.SetEntityState()
Expand All @@ -259,7 +267,7 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr(observerNameAttr, observerID.Name())
attrs.PutStr(observerTypeAttr, observerID.Type().String())
for k, v := range correlations.Attrs(endpoint.ID) {
for k, v := range endpointAttrs {
attrs.PutStr(k, v)
}
attrs.PutStr(serviceTypeAttr, deduceServiceType(attrs))
Expand Down
40 changes: 28 additions & 12 deletions internal/receiver/discoveryreceiver/endpoint_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "pod")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "my-mysql")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -92,6 +93,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "port")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "redis-cart")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand All @@ -114,6 +116,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "hostport")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "process.name")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -143,6 +146,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "container")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "container.name")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -174,15 +178,18 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "k8s.node")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
} {
test := tt
t.Run(test.name, func(t *testing.T) {
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName("observer_type", "observer.name"),
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
[]observer.Endpoint{test.endpoint}, corr, t0,
)
require.NoError(t, err)
require.Zero(t, failed)
Expand Down Expand Up @@ -226,6 +233,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
attrs.PutStr("type", "empty.details.env")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand All @@ -248,6 +256,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
attrs.PutStr("type", "unexpected.env")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -290,10 +299,11 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
} {
test := tt
t.Run(test.name, func(t *testing.T) {
// Validate entity_state event
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName("observer_type", "observer.name"),
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
[]observer.Endpoint{test.endpoint}, corr, t0,
)
if test.expectedError != "" {
require.Error(t, err)
Expand Down Expand Up @@ -337,6 +347,8 @@ func FuzzEndpointToPlogs(f *testing.F) {
if err != nil {
observerTypeSanitized = discovery.NoType.Type()
}
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(observer.EndpointID(endpointID), map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName(observerTypeSanitized.String(), observerName), []observer.Endpoint{
{
Expand All @@ -361,7 +373,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
Transport: observer.Transport(transport),
},
},
}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
}, corr, t0,
)

expectedLogs := expectedPLogs()
Expand All @@ -372,6 +384,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
entityIDAttr.Map().PutInt("source.port", int64(port))
attrs := lr.Attributes().PutEmptyMap(discovery.OtelEntityAttributesAttr)
attrs.PutStr(discovery.EndpointIDAttr, endpointID)
attrs.PutStr(discovery.StatusAttr, "successful")
attrs.PutStr(observerNameAttr, observerName)
attrs.PutStr(observerTypeAttr, observerTypeSanitized.String())
attrs.PutStr("endpoint", target)
Expand Down Expand Up @@ -655,12 +668,14 @@ func TestEntityEmittingLifecycle(t *testing.T) {
ch := make(chan plog.Logs, 10)
obsID := component.MustNewIDWithName("fake_observer", "")
obs := &fakeObservable{}
corr := newCorrelationStore(logger, cfg.CorrelationTTL)
corr.(*store).attrs.Store(portEndpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
et := &endpointTracker{
config: cfg,
observables: map[component.ID]observer.Observable{obsID: obs},
logger: logger,
pLogs: ch,
correlations: newCorrelationStore(logger, cfg.CorrelationTTL),
correlations: corr,
emitInterval: 50 * time.Millisecond,
stopCh: make(chan struct{}),
}
Expand All @@ -677,12 +692,11 @@ func TestEntityEmittingLifecycle(t *testing.T) {
obs.onAdd([]observer.Endpoint{portEndpoint})

// Wait for at least 2 entity events to be emitted to confirm periodic emitting is working.
require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 50*time.Millisecond)
require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 100*time.Millisecond)

gotLogs := <-ch
require.Equal(t, 1, gotLogs.LogRecordCount())
expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint},
newCorrelationStore(logger, time.Hour), t0)
expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, corr, t0)
require.NoError(t, err)
require.Zero(t, failed)
require.NoError(t, plogtest.CompareLogs(expectedEvents.ConvertAndMoveToLogs(), gotLogs, plogtest.IgnoreTimestamp()))
Expand Down Expand Up @@ -732,6 +746,7 @@ func TestEntityStateEvents(t *testing.T) {
cStore := newCorrelationStore(logger, cfg.CorrelationTTL)
cStore.UpdateAttrs(portEndpoint.ID, map[string]string{
discovery.ReceiverTypeAttr: "redis",
discovery.StatusAttr: "successful",
"attr1": "val1",
"attr2": "val2",
})
Expand All @@ -747,10 +762,11 @@ func TestEntityStateEvents(t *testing.T) {
assert.Equal(t, t0, event.Timestamp().AsTime())
assert.Equal(t, map[string]any{"k8s.pod.uid": "uid", "source.port": int64(1)}, event.ID().AsRaw())
assert.Equal(t, map[string]any{
observerNameAttr: "observer.name",
observerTypeAttr: "observer_type",
"endpoint": "port.target",
"name": "port.name",
observerNameAttr: "observer.name",
observerTypeAttr: "observer_type",
discovery.StatusAttr: "successful",
"endpoint": "port.target",
"name": "port.name",
"annotations": map[string]any{
"annotation.one": "value.one",
"annotation.two": "value.two",
Expand Down
11 changes: 0 additions & 11 deletions tests/receivers/discovery/host_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ import (
"github.com/signalfx/splunk-otel-collector/tests/testutils"
)

func TestDiscoveryReceiverWithHostObserverProvidesEndpointLogs(t *testing.T) {
testutils.SkipIfNotContainerTest(t)
if testutils.CollectorImageIsForArm(t) {
t.Skip("host_observer missing process info on arm")
}
testutils.AssertAllLogsReceived(
t, "host_observer_endpoints.yaml",
"host_observer_endpoints_config.yaml", nil, nil,
)
}

func TestDiscoveryReceiverWithHostObserverAndSimplePrometheusReceiverProvideStatusLogs(t *testing.T) {
testutils.SkipIfNotContainerTest(t)
if testutils.CollectorImageIsForArm(t) {
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 23dc993

Please sign in to comment.