Skip to content

Commit

Permalink
[receiver/discovery] Do not emit endpoint that are not evaluated yet
Browse files Browse the repository at this point in the history
This change fixes the issue when "unknown" service can be shown in the inventory service right at the collector startup. We don't want to send entity events for endpoints that are not evaluated yet
  • Loading branch information
dmitryax committed Jun 27, 2024
1 parent 30515a9 commit 2b406f2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 13 deletions.
10 changes: 9 additions & 1 deletion internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (n *notify) OnChange(changed []observer.Endpoint) {
n.endpointTracker.updateEndpoints(changed, n.observerID)
}

// entityStateEvents converts observer endpoints to entity state events excluding those
// that don't have a discovery status attribute yet.
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
Expand All @@ -242,6 +244,12 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
continue
}

endpointAttrs := correlations.Attrs(endpoint.ID)
if _, ok := endpointAttrs[discovery.StatusAttr]; !ok {
// If the endpoint doesn't have a status attribute, it's not ready to be emitted.
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityState := entityEvent.SetEntityState()
Expand All @@ -259,7 +267,7 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr(observerNameAttr, observerID.Name())
attrs.PutStr(observerTypeAttr, observerID.Type().String())
for k, v := range correlations.Attrs(endpoint.ID) {
for k, v := range endpointAttrs {
attrs.PutStr(k, v)
}
attrs.PutStr(serviceTypeAttr, deduceServiceType(attrs))
Expand Down
40 changes: 28 additions & 12 deletions internal/receiver/discoveryreceiver/endpoint_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "pod")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "my-mysql")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -92,6 +93,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "port")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "redis-cart")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand All @@ -114,6 +116,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "hostport")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "process.name")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -143,6 +146,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "container")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "container.name")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -174,15 +178,18 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
attrs.PutStr("type", "k8s.node")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
} {
test := tt
t.Run(test.name, func(t *testing.T) {
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName("observer_type", "observer.name"),
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
[]observer.Endpoint{test.endpoint}, corr, t0,
)
require.NoError(t, err)
require.Zero(t, failed)
Expand Down Expand Up @@ -226,6 +233,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
attrs.PutStr("type", "empty.details.env")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand All @@ -248,6 +256,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
attrs.PutStr("type", "unexpected.env")
attrs.PutStr("service.type", "unknown")
attrs.PutStr("service.name", "unknown")
attrs.PutStr(discovery.StatusAttr, "successful")
return plogs
}(),
},
Expand Down Expand Up @@ -290,10 +299,11 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
} {
test := tt
t.Run(test.name, func(t *testing.T) {
// Validate entity_state event
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(test.endpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName("observer_type", "observer.name"),
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
[]observer.Endpoint{test.endpoint}, corr, t0,
)
if test.expectedError != "" {
require.Error(t, err)
Expand Down Expand Up @@ -337,6 +347,8 @@ func FuzzEndpointToPlogs(f *testing.F) {
if err != nil {
observerTypeSanitized = discovery.NoType.Type()
}
corr := newCorrelationStore(zap.NewNop(), time.Hour)
corr.(*store).attrs.Store(observer.EndpointID(endpointID), map[string]string{discovery.StatusAttr: "successful"})
events, failed, err := entityStateEvents(
component.MustNewIDWithName(observerTypeSanitized.String(), observerName), []observer.Endpoint{
{
Expand All @@ -361,7 +373,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
Transport: observer.Transport(transport),
},
},
}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
}, corr, t0,
)

expectedLogs := expectedPLogs()
Expand All @@ -372,6 +384,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
entityIDAttr.Map().PutInt("source.port", int64(port))
attrs := lr.Attributes().PutEmptyMap(discovery.OtelEntityAttributesAttr)
attrs.PutStr(discovery.EndpointIDAttr, endpointID)
attrs.PutStr(discovery.StatusAttr, "successful")
attrs.PutStr(observerNameAttr, observerName)
attrs.PutStr(observerTypeAttr, observerTypeSanitized.String())
attrs.PutStr("endpoint", target)
Expand Down Expand Up @@ -655,12 +668,14 @@ func TestEntityEmittingLifecycle(t *testing.T) {
ch := make(chan plog.Logs, 10)
obsID := component.MustNewIDWithName("fake_observer", "")
obs := &fakeObservable{}
corr := newCorrelationStore(logger, cfg.CorrelationTTL)
corr.(*store).attrs.Store(portEndpoint.ID, map[string]string{discovery.StatusAttr: "successful"})
et := &endpointTracker{
config: cfg,
observables: map[component.ID]observer.Observable{obsID: obs},
logger: logger,
pLogs: ch,
correlations: newCorrelationStore(logger, cfg.CorrelationTTL),
correlations: corr,
emitInterval: 50 * time.Millisecond,
stopCh: make(chan struct{}),
}
Expand All @@ -677,12 +692,11 @@ func TestEntityEmittingLifecycle(t *testing.T) {
obs.onAdd([]observer.Endpoint{portEndpoint})

// Wait for at least 2 entity events to be emitted to confirm periodic emitting is working.
require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 50*time.Millisecond)
require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 100*time.Millisecond)

gotLogs := <-ch
require.Equal(t, 1, gotLogs.LogRecordCount())
expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint},
newCorrelationStore(logger, time.Hour), t0)
expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, corr, t0)
require.NoError(t, err)
require.Zero(t, failed)
require.NoError(t, plogtest.CompareLogs(expectedEvents.ConvertAndMoveToLogs(), gotLogs, plogtest.IgnoreTimestamp()))
Expand Down Expand Up @@ -732,6 +746,7 @@ func TestEntityStateEvents(t *testing.T) {
cStore := newCorrelationStore(logger, cfg.CorrelationTTL)
cStore.UpdateAttrs(portEndpoint.ID, map[string]string{
discovery.ReceiverTypeAttr: "redis",
discovery.StatusAttr: "successful",
"attr1": "val1",
"attr2": "val2",
})
Expand All @@ -747,10 +762,11 @@ func TestEntityStateEvents(t *testing.T) {
assert.Equal(t, t0, event.Timestamp().AsTime())
assert.Equal(t, map[string]any{"k8s.pod.uid": "uid", "source.port": int64(1)}, event.ID().AsRaw())
assert.Equal(t, map[string]any{
observerNameAttr: "observer.name",
observerTypeAttr: "observer_type",
"endpoint": "port.target",
"name": "port.name",
observerNameAttr: "observer.name",
observerTypeAttr: "observer_type",
discovery.StatusAttr: "successful",
"endpoint": "port.target",
"name": "port.name",
"annotations": map[string]any{
"annotation.one": "value.one",
"annotation.two": "value.two",
Expand Down

0 comments on commit 2b406f2

Please sign in to comment.