diff --git a/internal/confmapprovider/discovery/discoverer.go b/internal/confmapprovider/discovery/discoverer.go index c55db5e878..2a3edc3088 100644 --- a/internal/confmapprovider/discovery/discoverer.go +++ b/internal/confmapprovider/discovery/discoverer.go @@ -708,14 +708,8 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error { } } - entityIDAttr, ok := lr.Attributes().Get(discovery.OtelEntityIDAttr) - if !ok { - d.logger.Debug("invalid entity event without id", zap.Any("log record", lr)) - continue - } - endpointID := "unavailable" - if eid, k := entityIDAttr.Map().Get(discovery.EndpointIDAttr); k { + if eid, k := entityAttrs.Get(discovery.EndpointIDAttr); k { endpointID = eid.AsString() } diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker.go b/internal/receiver/discoveryreceiver/endpoint_tracker.go index d725ff6702..dd5b8366f6 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker.go @@ -23,15 +23,25 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.22.0" "go.uber.org/multierr" "go.uber.org/zap" "github.com/signalfx/splunk-otel-collector/internal/common/discovery" ) -var ( - _ observer.Notify = (*notify)(nil) -) +const sourcePortAttr = "source.port" + +// identifyingAttrKeys are the keys of attributes that are used to identify an entity. +var identifyingAttrKeys = []string{ + semconv.AttributeK8SPodUID, + semconv.AttributeContainerID, + semconv.AttributeK8SNodeUID, + semconv.AttributeHostID, + sourcePortAttr, +} + +var _ observer.Notify = (*notify)(nil) type endpointTracker struct { correlations correlationStore @@ -118,7 +128,7 @@ func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpo if et.pLogs != nil { entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, et.correlations, time.Now()) if err != nil { - et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to log records", numFailed), zap.Error(err)) + et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity state events", numFailed), zap.Error(err)) } if entityEvents.Len() > 0 { et.pLogs <- entityEvents.ConvertAndMoveToLogs() @@ -128,7 +138,10 @@ func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpo func (et *endpointTracker) emitEntityDeleteEvents(endpoints []observer.Endpoint) { if et.pLogs != nil { - entityEvents := entityDeleteEvents(endpoints, time.Now()) + entityEvents, numFailed, err := entityDeleteEvents(endpoints, time.Now()) + if err != nil { + et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity delete events", numFailed), zap.Error(err)) + } if entityEvents.Len() > 0 { et.pLogs <- entityEvents.ConvertAndMoveToLogs() } @@ -214,40 +227,56 @@ func (n *notify) OnChange(changed []observer.Endpoint) { func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) { entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() for _, endpoint := range endpoints { + if endpoint.Details == nil { + failed++ + err = multierr.Combine(err, fmt.Errorf("endpoint %q has no details", endpoint.ID)) + continue + } + entityEvent := entityEvents.AppendEmpty() entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts)) - entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpoint.ID)) entityState := entityEvent.SetEntityState() attrs := entityState.Attributes() - if endpoint.Details != nil { - if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil { - err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e)) - failed++ - } else { - // this must be the first mutation of attrs since it's destructive - envAttrs.CopyTo(attrs) - } - attrs.PutStr("type", string(endpoint.Details.Type())) + if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil { + err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e)) + failed++ + } else { + // this must be the first mutation of attrs since it's destructive + envAttrs.CopyTo(attrs) } + attrs.PutStr("type", string(endpoint.Details.Type())) + attrs.PutStr(discovery.EndpointIDAttr, string(endpoint.ID)) attrs.PutStr("endpoint", endpoint.Target) attrs.PutStr(observerNameAttr, observerID.Name()) attrs.PutStr(observerTypeAttr, observerID.Type().String()) for k, v := range correlations.Attrs(endpoint.ID) { attrs.PutStr(k, v) } + extractIdentifyingAttrs(attrs, entityEvent.ID()) } return entityEvents, failed, err } -func entityDeleteEvents(endpoints []observer.Endpoint, ts time.Time) experimentalmetricmetadata.EntityEventsSlice { +func entityDeleteEvents(endpoints []observer.Endpoint, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) { entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() for _, endpoint := range endpoints { + if endpoint.Details == nil { + failed++ + err = multierr.Combine(err, fmt.Errorf("endpoint %q has no details", endpoint.ID)) + continue + } + entityEvent := entityEvents.AppendEmpty() entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts)) - entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpoint.ID)) entityEvent.SetEntityDelete() + if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil { + err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e)) + failed++ + } else { + extractIdentifyingAttrs(envAttrs, entityEvent.ID()) + } } - return entityEvents + return entityEvents, failed, err } func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer.EndpointEnv) (pcommon.Map, error) { @@ -273,10 +302,34 @@ func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer if e != nil { return attrs, fmt.Errorf("failed parsing %v pod attributes ", endpointType) } - podAttrs.CopyTo(attrs.PutEmptyMap(k)) + podAttrs.Range(func(k string, v pcommon.Value) bool { + v.CopyTo(attrs.PutEmpty(k)) + return true + }) } else { return attrs, fmt.Errorf("failed parsing %v pod env %#v", endpointType, v) } + + // rename keys according to the OTel Semantic Conventions + case k == "container_id": + attrs.PutEmpty(semconv.AttributeContainerID).FromRaw(v) + case k == "port": + attrs.PutEmpty(sourcePortAttr).FromRaw(v) + case endpointType == observer.PodType: + if k == "namespace" { + attrs.PutEmpty(semconv.AttributeK8SNamespaceName).FromRaw(v) + } else { + attrs.PutEmpty("k8s.pod." + k).FromRaw(v) + } + case endpointType == observer.K8sNodeType: + switch k { + case "name": + attrs.PutEmpty(semconv.AttributeK8SNodeName).FromRaw(v) + case "uid": + attrs.PutEmpty(semconv.AttributeK8SNodeUID).FromRaw(v) + default: + attrs.PutEmpty(k).FromRaw(v) + } default: switch vVal := v.(type) { case uint16: @@ -298,3 +351,12 @@ func shouldEmbedMap(endpointType observer.EndpointType, k string) bool { endpointType == observer.ContainerType || endpointType == observer.K8sNodeType)) } + +func extractIdentifyingAttrs(from pcommon.Map, to pcommon.Map) { + for _, k := range identifyingAttrKeys { + if v, ok := from.Get(k); ok { + v.CopyTo(to.PutEmpty(k)) + from.Remove(k) + } + } +} diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go index 6f49aa469b..5f81ceb1aa 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go @@ -44,10 +44,13 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { name: "pod", endpoint: podEndpoint, expectedPLogs: func() plog.Logs { - plogs := expectedPLogs("pod.endpoint.id") + plogs := expectedPLogs() lr := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + entityIDAttr, _ := lr.Attributes().Get(discovery.OtelEntityIDAttr) + entityIDAttr.Map().PutStr("k8s.pod.uid", "uid") entityAttrsAttr, _ := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) attrs := entityAttrsAttr.Map() + attrs.PutStr(discovery.EndpointIDAttr, "pod.endpoint.id") annotationsMap := attrs.PutEmptyMap("annotations") annotationsMap.PutStr("annotation.one", "value.one") annotationsMap.PutStr("annotation.two", "value.two") @@ -55,10 +58,9 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { labelsMap := attrs.PutEmptyMap("labels") labelsMap.PutStr("label.one", "value.one") labelsMap.PutStr("label.two", "value.two") - attrs.PutStr("name", "pod.name") - attrs.PutStr("namespace", "namespace") + attrs.PutStr("k8s.pod.name", "pod.name") + attrs.PutStr("k8s.namespace.name", "namespace") attrs.PutStr("type", "pod") - attrs.PutStr("uid", "uid") return plogs }(), }, @@ -66,25 +68,24 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { name: "port", endpoint: portEndpoint, expectedPLogs: func() plog.Logs { - plogs := expectedPLogs("port.endpoint.id") + plogs := expectedPLogs() lr := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + entityIDAttr, _ := lr.Attributes().Get(discovery.OtelEntityIDAttr) + entityIDAttr.Map().PutStr("k8s.pod.uid", "uid") + entityIDAttr.Map().PutInt("source.port", 1) entityAttrsAttr, _ := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) attrs := entityAttrsAttr.Map() + attrs.PutStr(discovery.EndpointIDAttr, "port.endpoint.id") attrs.PutStr("endpoint", "port.target") attrs.PutStr("name", "port.name") - - podEnvMap := attrs.PutEmptyMap("pod") - annotationsMap := podEnvMap.PutEmptyMap("annotations") + attrs.PutStr("k8s.pod.name", "pod.name") + attrs.PutStr("k8s.namespace.name", "namespace") + annotationsMap := attrs.PutEmptyMap("annotations") annotationsMap.PutStr("annotation.one", "value.one") annotationsMap.PutStr("annotation.two", "value.two") - labelsMap := podEnvMap.PutEmptyMap("labels") + labelsMap := attrs.PutEmptyMap("labels") labelsMap.PutStr("label.one", "value.one") labelsMap.PutStr("label.two", "value.two") - podEnvMap.PutStr("name", "pod.name") - podEnvMap.PutStr("namespace", "namespace") - podEnvMap.PutStr("uid", "uid") - - attrs.PutInt("port", 1) attrs.PutStr("transport", "transport") attrs.PutStr("type", "port") return plogs @@ -94,14 +95,16 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { name: "hostport", endpoint: hostportEndpoint, expectedPLogs: func() plog.Logs { - plogs := expectedPLogs("hostport.endpoint.id") + plogs := expectedPLogs() lr := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + entityIDAttr, _ := lr.Attributes().Get(discovery.OtelEntityIDAttr) + entityIDAttr.Map().PutInt("source.port", 1) entityAttrsAttr, _ := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) attrs := entityAttrsAttr.Map() + attrs.PutStr(discovery.EndpointIDAttr, "hostport.endpoint.id") attrs.PutStr("command", "command") attrs.PutStr("endpoint", "hostport.target") attrs.PutBool("is_ipv6", true) - attrs.PutInt("port", 1) attrs.PutStr("process_name", "process.name") attrs.PutStr("transport", "transport") attrs.PutStr("type", "hostport") @@ -112,13 +115,16 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { name: "container", endpoint: containerEndpoint, expectedPLogs: func() plog.Logs { - plogs := expectedPLogs("container.endpoint.id") + plogs := expectedPLogs() lr := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + entityIDAttr, _ := lr.Attributes().Get(discovery.OtelEntityIDAttr) + entityIDAttr.Map().PutStr("container.id", "container.id") + entityIDAttr.Map().PutInt("source.port", 1) entityAttrsAttr, _ := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) attrs := entityAttrsAttr.Map() + attrs.PutStr(discovery.EndpointIDAttr, "container.endpoint.id") attrs.PutInt("alternate_port", 2) attrs.PutStr("command", "command") - attrs.PutStr("container_id", "container.id") attrs.PutStr("endpoint", "container.target") attrs.PutStr("host", "host") attrs.PutStr("image", "image") @@ -126,7 +132,6 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { labelsMap.PutStr("label.one", "value.one") labelsMap.PutStr("label.two", "value.two") attrs.PutStr("name", "container.name") - attrs.PutInt("port", 1) attrs.PutStr("tag", "tag") attrs.PutStr("transport", "transport") attrs.PutStr("type", "container") @@ -137,10 +142,13 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { name: "k8s.node", endpoint: k8sNodeEndpoint, expectedPLogs: func() plog.Logs { - plogs := expectedPLogs("k8s.node.endpoint.id") + plogs := expectedPLogs() lr := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + entityIDAttr, _ := lr.Attributes().Get(discovery.OtelEntityIDAttr) + entityIDAttr.Map().PutStr("k8s.node.uid", "uid") entityAttrsAttr, _ := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) attrs := entityAttrsAttr.Map() + attrs.PutStr(discovery.EndpointIDAttr, "k8s.node.endpoint.id") annotationsMap := attrs.PutEmptyMap("annotations") annotationsMap.PutStr("annotation.one", "value.one") annotationsMap.PutStr("annotation.two", "value.two") @@ -154,9 +162,8 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { labelsMap := attrs.PutEmptyMap("labels") labelsMap.PutStr("label.one", "value.one") labelsMap.PutStr("label.two", "value.two") - attrs.PutStr("name", "k8s.node.name") + attrs.PutStr("k8s.node.name", "k8s.node.name") attrs.PutStr("type", "k8s.node") - attrs.PutStr("uid", "uid") return plogs }(), }, @@ -190,14 +197,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { Target: "endpoint.target", Details: nil, }, - expectedPLogs: func() plog.Logs { - plogs := expectedPLogs("endpoint.id") - lr := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) - entityAttrsAttr, _ := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) - attrs := entityAttrsAttr.Map() - attrs.PutStr("endpoint", "endpoint.target") - return plogs - }(), + expectedError: `endpoint "endpoint.id" has no details`, }, { name: "empty details env", @@ -207,10 +207,11 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { Details: emptyDetailsEnv{}, }, expectedPLogs: func() plog.Logs { - plogs := expectedPLogs("endpoint.id") + plogs := expectedPLogs() lr := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) entityAttrsAttr, _ := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) attrs := entityAttrsAttr.Map() + attrs.PutStr(discovery.EndpointIDAttr, "endpoint.id") attrs.PutStr("endpoint", "endpoint.target") attrs.PutStr("type", "empty.details.env") return plogs @@ -224,10 +225,11 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { Details: unexpectedLabelsAndAnnotations{t: observer.EndpointType("unexpected.env")}, }, expectedPLogs: func() plog.Logs { - plogs := expectedPLogs("endpoint.id") + plogs := expectedPLogs() lr := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) entityAttrsAttr, _ := lr.Attributes().Get(discovery.OtelEntityAttributesAttr) attrs := entityAttrsAttr.Map() + attrs.PutStr(discovery.EndpointIDAttr, "endpoint.id") attrs.PutBool("annotations", false) attrs.PutStr("endpoint", "endpoint.target") attrs.PutBool("labels", true) @@ -294,7 +296,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { lr := expectedDeleteEvent.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) lr.Attributes().PutStr(discovery.OtelEntityEventTypeAttr, discovery.OtelEntityEventTypeDelete) lr.Attributes().Remove(discovery.OtelEntityAttributesAttr) - events = entityDeleteEvents([]observer.Endpoint{test.endpoint}, t0) + events, failed, err = entityDeleteEvents([]observer.Endpoint{test.endpoint}, t0) require.NoError(t, err) require.Zero(t, failed) require.Equal(t, 1, events.Len()) @@ -347,26 +349,27 @@ func FuzzEndpointToPlogs(f *testing.F) { }, newCorrelationStore(zap.NewNop(), time.Hour), t0, ) - expectedLogs := expectedPLogs(endpointID) + expectedLogs := expectedPLogs() lr := expectedLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) lr.SetTimestamp(pcommon.NewTimestampFromTime(t0)) + entityIDAttr, _ := lr.Attributes().Get(discovery.OtelEntityIDAttr) + entityIDAttr.Map().PutStr("k8s.pod.uid", uid) + entityIDAttr.Map().PutInt("source.port", int64(port)) attrs := lr.Attributes().PutEmptyMap(discovery.OtelEntityAttributesAttr) + attrs.PutStr(discovery.EndpointIDAttr, endpointID) attrs.PutStr(observerNameAttr, observerName) attrs.PutStr(observerTypeAttr, observerTypeSanitized.String()) attrs.PutStr("endpoint", target) attrs.PutStr("name", portName) - podEnvMap := attrs.PutEmptyMap("pod") - annotationsMap := podEnvMap.PutEmptyMap("annotations") + annotationsMap := attrs.PutEmptyMap("annotations") annotationsMap.PutStr(annotationOne, annotationValueOne) annotationsMap.PutStr(annotationTwo, annotationValueTwo) - labelsMap := podEnvMap.PutEmptyMap("labels") + labelsMap := attrs.PutEmptyMap("labels") labelsMap.PutStr(labelOne, labelValueOne) labelsMap.PutStr(labelTwo, labelValueTwo) - podEnvMap.PutStr("name", podName) - podEnvMap.PutStr("namespace", namespace) - podEnvMap.PutStr("uid", uid) - attrs.PutInt("port", int64(port)) + attrs.PutStr("k8s.pod.name", podName) + attrs.PutStr("k8s.namespace.name", namespace) attrs.PutStr("transport", transport) attrs.PutStr("type", "port") require.Equal(t, 1, events.Len()) @@ -478,13 +481,13 @@ var ( } ) -func expectedPLogs(endpointID string) plog.Logs { +func expectedPLogs() plog.Logs { plogs := plog.NewLogs() scopeLog := plogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() scopeLog.Scope().Attributes().PutBool(discovery.OtelEntityEventAsLogAttr, true) lr := scopeLog.LogRecords().AppendEmpty() lr.Attributes().PutStr(discovery.OtelEntityEventTypeAttr, discovery.OtelEntityEventTypeState) - lr.Attributes().PutEmptyMap(discovery.OtelEntityIDAttr).PutStr(discovery.EndpointIDAttr, endpointID) + lr.Attributes().PutEmptyMap(discovery.OtelEntityIDAttr) attrs := lr.Attributes().PutEmptyMap(discovery.OtelEntityAttributesAttr) attrs.PutStr(observerNameAttr, "observer.name") attrs.PutStr(observerTypeAttr, "observer_type") @@ -670,7 +673,10 @@ func TestEntityEmittingLifecycle(t *testing.T) { obs.onRemove([]observer.Endpoint{portEndpoint}) // Wait for an entity delete event. - expectedLogs := entityDeleteEvents([]observer.Endpoint{portEndpoint}, t0).ConvertAndMoveToLogs() + expectedEvents, failed, err = entityDeleteEvents([]observer.Endpoint{portEndpoint}, t0) + require.NoError(t, err) + require.Zero(t, failed) + expectedLogs := expectedEvents.ConvertAndMoveToLogs() require.EventuallyWithT(t, func(c *assert.CollectT) { logs := <-ch assert.NoError(c, plogtest.CompareLogs(expectedLogs, logs, plogtest.IgnoreTimestamp())) @@ -720,39 +726,41 @@ func TestEntityStateEvents(t *testing.T) { event := events.At(0) assert.Equal(t, experimentalmetricmetadata.EventTypeState, event.EventType()) assert.Equal(t, t0, event.Timestamp().AsTime()) - assert.Equal(t, map[string]any{discovery.EndpointIDAttr: string(portEndpoint.ID)}, event.ID().AsRaw()) + assert.Equal(t, map[string]any{"k8s.pod.uid": "uid", "source.port": int64(1)}, event.ID().AsRaw()) assert.Equal(t, map[string]any{ observerNameAttr: "observer.name", observerTypeAttr: "observer_type", "endpoint": "port.target", "name": "port.name", - "port": int64(1), - "pod": map[string]any{ - "annotations": map[string]any{ - "annotation.one": "value.one", - "annotation.two": "value.two", - }, - "labels": map[string]any{ - "label.one": "value.one", - "label.two": "value.two", - }, - "name": "pod.name", - "namespace": "namespace", - "uid": "uid", + "annotations": map[string]any{ + "annotation.one": "value.one", + "annotation.two": "value.two", + }, + "labels": map[string]any{ + "label.one": "value.one", + "label.two": "value.two", }, - "transport": "transport", - "type": "port", - "attr1": "val1", - "attr2": "val2", + "discovery.endpoint.id": "port.endpoint.id", + "k8s.pod.name": "pod.name", + "k8s.namespace.name": "namespace", + "transport": "transport", + "type": "port", + "attr1": "val1", + "attr2": "val2", }, event.EntityStateDetails().Attributes().AsRaw()) } func TestEntityDeleteEvents(t *testing.T) { - events := entityDeleteEvents([]observer.Endpoint{portEndpoint}, t0) + events, failed, err := entityDeleteEvents([]observer.Endpoint{portEndpoint}, t0) + require.Zero(t, failed) + require.NoError(t, err) require.Equal(t, 1, events.Len()) event := events.At(0) assert.Equal(t, experimentalmetricmetadata.EventTypeDelete, event.EventType()) assert.Equal(t, t0, event.Timestamp().AsTime()) - assert.Equal(t, map[string]any{discovery.EndpointIDAttr: string(portEndpoint.ID)}, event.ID().AsRaw()) + assert.Equal(t, map[string]any{ + sourcePortAttr: int64(1), + "k8s.pod.uid": "uid", + }, event.ID().AsRaw()) } diff --git a/tests/receivers/discovery/testdata/resource_logs/host_observer_endpoints.yaml b/tests/receivers/discovery/testdata/resource_logs/host_observer_endpoints.yaml index b75cdc0def..c905d1cd5f 100644 --- a/tests/receivers/discovery/testdata/resource_logs/host_observer_endpoints.yaml +++ b/tests/receivers/discovery/testdata/resource_logs/host_observer_endpoints.yaml @@ -3,29 +3,29 @@ resource_logs: - logs: - attributes: otel.entity.id: - discovery.endpoint.id: (host_observer)127.0.0.1-55554-TCP-1 + source.port: 55554 otel.entity.event.type: entity_state otel.entity.attributes: + discovery.endpoint.id: (host_observer)127.0.0.1-55554-TCP-1 discovery.observer.type: host_observer discovery.observer.name: "" command: /otelcol --config /etc/config.yaml endpoint: 127.0.0.1:55554 is_ipv6: false - port: 55554 process_name: otelcol transport: TCP type: hostport - attributes: otel.entity.id: - discovery.endpoint.id: (host_observer)[::]-8888-TCP-1 + source.port: 8888 otel.entity.event.type: entity_state otel.entity.attributes: + discovery.endpoint.id: (host_observer)[::]-8888-TCP-1 discovery.observer.type: host_observer discovery.observer.name: "" command: /otelcol --config /etc/config.yaml endpoint: '[::]:8888' is_ipv6: true - port: 8888 process_name: otelcol transport: TCP type: hostport @@ -33,29 +33,29 @@ resource_logs: - logs: - attributes: otel.entity.id: - discovery.endpoint.id: (host_observer/with_name)127.0.0.1-55554-TCP-1 + source.port: 55554 otel.entity.event.type: entity_state otel.entity.attributes: + discovery.endpoint.id: (host_observer/with_name)127.0.0.1-55554-TCP-1 discovery.observer.type: host_observer discovery.observer.name: with_name command: /otelcol --config /etc/config.yaml endpoint: 127.0.0.1:55554 is_ipv6: false - port: 55554 process_name: otelcol transport: TCP type: hostport - attributes: otel.entity.id: - discovery.endpoint.id: (host_observer/with_name)[::]-8888-TCP-1 + source.port: 8888 otel.entity.event.type: entity_state otel.entity.attributes: + discovery.endpoint.id: (host_observer/with_name)[::]-8888-TCP-1 discovery.observer.type: host_observer discovery.observer.name: with_name command: /otelcol --config /etc/config.yaml endpoint: '[::]:8888' is_ipv6: true - port: 8888 process_name: otelcol transport: TCP type: hostport @@ -63,29 +63,29 @@ resource_logs: - logs: - attributes: otel.entity.id: - discovery.endpoint.id: (host_observer/with/another/name)127.0.0.1-55554-TCP-1 + source.port: 55554 otel.entity.event.type: entity_state otel.entity.attributes: + discovery.endpoint.id: (host_observer/with/another/name)127.0.0.1-55554-TCP-1 discovery.observer.type: host_observer discovery.observer.name: with/another/name command: /otelcol --config /etc/config.yaml endpoint: 127.0.0.1:55554 is_ipv6: false - port: 55554 process_name: otelcol transport: TCP type: hostport - attributes: otel.entity.id: - discovery.endpoint.id: (host_observer/with/another/name)[::]-8888-TCP-1 + source.port: 8888 otel.entity.event.type: entity_state otel.entity.attributes: + discovery.endpoint.id: (host_observer/with/another/name)[::]-8888-TCP-1 discovery.observer.type: host_observer discovery.observer.name: with/another/name command: /otelcol --config /etc/config.yaml endpoint: '[::]:8888' is_ipv6: true - port: 8888 process_name: otelcol transport: TCP type: hostport diff --git a/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml b/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml index 0c199e5f6f..6decae9836 100644 --- a/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml +++ b/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml @@ -3,9 +3,10 @@ resource_logs: - logs: - attributes: otel.entity.id: - discovery.endpoint.id: (host_observer)[::]-8888-TCP-1 + source.port: 8888 otel.entity.event.type: entity_state otel.entity.attributes: + discovery.endpoint.id: (host_observer)[::]-8888-TCP-1 discovery.event.type: metric.match discovery.observer.id: host_observer discovery.receiver.config: cmVjZWl2ZXJzOgogIHByb21ldGhldXNfc2ltcGxlOgogICAgY29uZmlnOiB7fQogICAgcmVzb3VyY2VfYXR0cmlidXRlczoKICAgICAgb25lLmtleTogb25lLnZhbHVlCiAgICAgIHR3by5rZXk6IHR3by52YWx1ZQogICAgcnVsZTogdHlwZSA9PSAiaG9zdHBvcnQiIGFuZCBjb21tYW5kIGNvbnRhaW5zICJvdGVsY29sIgp3YXRjaF9vYnNlcnZlcnM6Ci0gaG9zdF9vYnNlcnZlcgo= @@ -28,7 +29,6 @@ resource_logs: command: /otelcol --config /etc/config.yaml endpoint: '[::]:8888' is_ipv6: true - port: 8888 process_name: otelcol transport: TCP type: hostport @@ -36,9 +36,10 @@ resource_logs: - logs: - attributes: otel.entity.id: - discovery.endpoint.id: (host_observer)[::]-4318-TCP-1 + source.port: 4318 otel.entity.event.type: entity_state otel.entity.attributes: + discovery.endpoint.id: (host_observer)[::]-4318-TCP-1 discovery.event.type: statement.match discovery.observer.id: host_observer discovery.receiver.config: cmVjZWl2ZXJzOgogIHByb21ldGhldXNfc2ltcGxlOgogICAgY29uZmlnOiB7fQogICAgcmVzb3VyY2VfYXR0cmlidXRlczoKICAgICAgb25lLmtleTogb25lLnZhbHVlCiAgICAgIHR3by5rZXk6IHR3by52YWx1ZQogICAgcnVsZTogdHlwZSA9PSAiaG9zdHBvcnQiIGFuZCBjb21tYW5kIGNvbnRhaW5zICJvdGVsY29sIgp3YXRjaF9vYnNlcnZlcnM6Ci0gaG9zdF9vYnNlcnZlcgo= @@ -56,7 +57,6 @@ resource_logs: command: /otelcol --config /etc/config.yaml endpoint: '[::]:4318' is_ipv6: true - port: 4318 process_name: otelcol transport: TCP type: hostport