Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/discovery] Do not emit endpoint that are not evaluated yet #5032

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Unreleased

### 🧰 Bug fixes 🧰

- (Splunk) `receiver/discovery`: Do not emit entity events for discovered endpoints that are not evaluated yet
to avoid showing "unknown" services on the Service Inventory page ([#5032](https://github.com/signalfx/splunk-otel-collector/pull/5032))

## v0.103.0

This Splunk OpenTelemetry Collector release includes changes from the [opentelemetry-collector v0.103.0](https://github.com/open-telemetry/opentelemetry-collector/releases/tag/v0.103.0) and the [opentelemetry-collector-contrib v0.103.0](https://github.com/open-telemetry/opentelemetry-collector-contrib/releases/tag/v0.103.0) releases where appropriate.
Expand Down
10 changes: 9 additions & 1 deletion internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (n *notify) OnChange(changed []observer.Endpoint) {
n.endpointTracker.updateEndpoints(changed, n.observerID)
}

// entityStateEvents converts observer endpoints to entity state events excluding those
// that don't have a discovery status attribute yet.
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
Expand All @@ -242,6 +244,12 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
continue
}

endpointAttrs := correlations.Attrs(endpoint.ID)
if _, ok := endpointAttrs[discovery.StatusAttr]; !ok {
// If the endpoint doesn't have a status attribute, it's not ready to be emitted.
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityState := entityEvent.SetEntityState()
Expand All @@ -259,7 +267,7 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr(observerNameAttr, observerID.Name())
attrs.PutStr(observerTypeAttr, observerID.Type().String())
for k, v := range correlations.Attrs(endpoint.ID) {
for k, v := range endpointAttrs {
attrs.PutStr(k, v)
}
attrs.PutStr(serviceTypeAttr, deduceServiceType(attrs))
Expand Down
48 changes: 35 additions & 13 deletions internal/receiver/discoveryreceiver/endpoint_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "pod")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "my-mysql")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -92,6 +93,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "port")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "redis-cart")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand All @@ -114,6 +116,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "hostport")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "process.name")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -143,6 +146,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "container")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "container.name")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -174,15 +178,18 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "k8s.node")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
} {
test := tt
t.Run(test.name, func(t *testing.T) {
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName("observer_type", "observer.name"),
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
[]observer.Endpoint{test.endpoint}, corr, t0,
)
require.NoError(t, err)
require.Zero(t, failed)
Expand Down Expand Up @@ -226,6 +233,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
attrs.PutStr("type", "empty.details.env")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand All @@ -248,6 +256,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
attrs.PutStr("type", "unexpected.env")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -290,10 +299,11 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
} {
test := tt
t.Run(test.name, func(t *testing.T) {
// Validate entity_state event
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName("observer_type", "observer.name"),
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
[]observer.Endpoint{test.endpoint}, corr, t0,
)
if test.expectedError != "" {
require.Error(t, err)
Expand Down Expand Up @@ -337,6 +347,8 @@ func FuzzEndpointToPlogs(f *testing.F) {
if err != nil {
observerTypeSanitized = discovery.NoType.Type()
}
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(observer.EndpointID(endpointID), map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName(observerTypeSanitized.String(), observerName), []observer.Endpoint{
{
Expand All @@ -361,7 +373,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
Transport: observer.Transport(transport),
},
},
}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
}, corr, t0,
)

expectedLogs := expectedPLogs()
Expand All @@ -372,6 +384,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
entityIDAttr.Map().PutInt("source.port", int64(port))
attrs := lr.Attributes().PutEmptyMap(discovery.OtelEntityAttributesAttr)
attrs.PutStr(discovery.EndpointIDAttr, endpointID)
attrs.PutStr(discovery.StatusAttr, "successful")
attrs.PutStr(observerNameAttr, observerName)
attrs.PutStr(observerTypeAttr, observerTypeSanitized.String())
attrs.PutStr("endpoint", target)
Expand Down Expand Up @@ -655,13 +668,14 @@ func TestEntityEmittingLifecycle(t *testing.T) {
ch := make(chan plog.Logs, 10)
obsID := component.MustNewIDWithName("fake_observer", "")
obs := &fakeObservable{}
corr := newCorrelationStore(logger, cfg.CorrelationTTL)
et := &endpointTracker{
config: cfg,
observables: map[component.ID]observer.Observable{obsID: obs},
logger: logger,
pLogs: ch,
correlations: newCorrelationStore(logger, cfg.CorrelationTTL),
emitInterval: 50 * time.Millisecond,
correlations: corr,
emitInterval: 20 * time.Millisecond,
stopCh: make(chan struct{}),
}
et.start()
Expand All @@ -676,13 +690,19 @@ func TestEntityEmittingLifecycle(t *testing.T) {

obs.onAdd([]observer.Endpoint{portEndpoint})

// Ensure that no entities are emitted without discovery.status attribute.
time.Sleep(30 * time.Millisecond)
require.Empty(t, ch)

// Once the status attribute is set, the entity should be emitted.
corr.(*store).attrs.Store(portEndpoint.ID, map[string]string{discovery.StatusAttr: "successful"})

// Wait for at least 2 entity events to be emitted to confirm periodic emitting is working.
require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 50*time.Millisecond)
require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 60*time.Millisecond)

gotLogs := <-ch
require.Equal(t, 1, gotLogs.LogRecordCount())
expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint},
newCorrelationStore(logger, time.Hour), t0)
expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, corr, t0)
require.NoError(t, err)
require.Zero(t, failed)
require.NoError(t, plogtest.CompareLogs(expectedEvents.ConvertAndMoveToLogs(), gotLogs, plogtest.IgnoreTimestamp()))
Expand Down Expand Up @@ -732,6 +752,7 @@ func TestEntityStateEvents(t *testing.T) {
cStore := newCorrelationStore(logger, cfg.CorrelationTTL)
cStore.UpdateAttrs(portEndpoint.ID, map[string]string{
discovery.ReceiverTypeAttr: "redis",
discovery.StatusAttr: "successful",
"attr1": "val1",
"attr2": "val2",
})
Expand All @@ -747,10 +768,11 @@ func TestEntityStateEvents(t *testing.T) {
assert.Equal(t, t0, event.Timestamp().AsTime())
assert.Equal(t, map[string]any{"k8s.pod.uid": "uid", "source.port": int64(1)}, event.ID().AsRaw())
assert.Equal(t, map[string]any{
observerNameAttr: "observer.name",
observerTypeAttr: "observer_type",
"endpoint": "port.target",
"name": "port.name",
observerNameAttr: "observer.name",
observerTypeAttr: "observer_type",
discovery.StatusAttr: "successful",
"endpoint": "port.target",
"name": "port.name",
"annotations": map[string]any{
"annotation.one": "value.one",
"annotation.two": "value.two",
Expand Down
11 changes: 0 additions & 11 deletions tests/receivers/discovery/host_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ import (
"github.com/signalfx/splunk-otel-collector/tests/testutils"
)

func TestDiscoveryReceiverWithHostObserverProvidesEndpointLogs(t *testing.T) {
testutils.SkipIfNotContainerTest(t)
if testutils.CollectorImageIsForArm(t) {
t.Skip("host_observer missing process info on arm")
}
testutils.AssertAllLogsReceived(
t, "host_observer_endpoints.yaml",
"host_observer_endpoints_config.yaml", nil, nil,
)
}

func TestDiscoveryReceiverWithHostObserverAndSimplePrometheusReceiverProvideStatusLogs(t *testing.T) {
testutils.SkipIfNotContainerTest(t)
if testutils.CollectorImageIsForArm(t) {
Expand Down

This file was deleted.

This file was deleted.

Loading