Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/discovery] Emit entity events for discovered endpoints #4684

Merged
merged 1 commit into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### 🛑 Breaking changes 🛑

- (Splunk) `receiver/discovery`: Emit entity events for discovered endpoints with log_endpoints: true ([#4684](https://github.com/signalfx/splunk-otel-collector/pull/4684))

### 💡 Enhancements 💡

- (Splunk) Include [`splunk-otel-dotnet`](https://github.com/signalfx/splunk-otel-dotnet) in the `splunk-otel-auto-instrumentation` deb/rpm packages (x86_64/amd64 only) ([#4679](https://github.com/signalfx/splunk-otel-collector/pull/4679))
Expand Down
10 changes: 6 additions & 4 deletions internal/common/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ const (
StatusAttr = "discovery.status"
MessageAttr = "discovery.message"

OtelEntityAttributesAttr = "otel.entity.attributes"
OtelEntityIDAttr = "otel.entity.id"
OtelEntityEventTypeAttr = "otel.entity.event.type"
OtelEntityEventTypeState = "entity_state"
OtelEntityAttributesAttr = "otel.entity.attributes"
OtelEntityIDAttr = "otel.entity.id"
OtelEntityEventTypeAttr = "otel.entity.event.type"
OtelEntityEventTypeState = "entity_state"
OtelEntityEventTypeDelete = "entity_delete"
OtelEntityEventAsLogAttr = "otel.entity.event_as_log"

DiscoExtensionsKey = "extensions/splunk.discovery"
DiscoReceiversKey = "receivers/splunk.discovery"
Expand Down
84 changes: 22 additions & 62 deletions internal/receiver/discoveryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ component-level log statements are similarly intercepted by a log evaluator, and
records based on the `status: statements` rules you define. The matching rules SHOULD NOT conflict with each other.
The first matching rule in the list will be used to determine the status of the receiver.

The receiver also allows you to emit log records for all
The receiver also allows you to emit entity events for all
[Endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/observer/endpoints.go)
events from the specified `watch_observers`. This way you can report your environment as observed by platform-specific
observers in real time, with or without discovering receiver statuses:
Expand All @@ -42,77 +42,37 @@ service:
```

```
2022-07-27T19:00:32.305Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "logging", "#logs": 3}
2022-07-27T19:00:32.306Z info ResourceLog #0
2024-04-17T19:53:24.285Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 2}
2024-04-17T19:53:24.286Z info ResourceLog #0
Resource SchemaURL:
Resource labels:
-> event.type: STRING(endpoint.added)
ScopeLogs #0
ScopeLogs SchemaURL:
InstrumentationScope
InstrumentationScope attributes:
-> otel.entity.event_as_log: Bool(true)
LogRecord #0
ObservedTimestamp: 2022-07-27 19:00:32.30572809 +0000 UTC
Timestamp: 2022-07-27 19:00:32.305727862 +0000 UTC
Severity: info
Body: {"alternate_port":5432,"command":"postgres -c shared_preload_libraries=pg_stat_statements -c wal_level=logical -c max_replication_slots=2","container_id":"58e71612910cd3e2a89d809f1b53fee779c4c81d9bbf36ef37f4c05b04037353","endpoint":"172.17.0.4:5432","host":"172.17.0.4","id":"58e71612910cd3e2a89d809f1b53fee779c4c81d9bbf36ef37f4c05b04037353:5432","image":"postgres","labels":{},"name":"naughty_heisenberg","port":5432,"tag":"latest","transport":"TCP","type":"container"}
ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2024-04-17 19:53:24.28493287 +0000 UTC
SeverityText:
SeverityNumber: Unspecified(0)
Body: Empty()
Attributes:
-> type: STRING(endpoint)
-> image: STRING(postgres)
-> tag: STRING(latest)
-> port: STRING(5432)
-> alternate_port: STRING(5432)
-> host: STRING(172.17.0.4)
-> transport: STRING(TCP)
-> labels: STRING(map[])
-> endpoint: STRING(172.17.0.4:5432)
-> name: STRING(naughty_heisenberg)
-> command: STRING(postgres -c shared_preload_libraries=pg_stat_statements -c wal_level=logical -c max_replication_slots=2)
-> container_id: STRING(58e71612910cd3e2a89d809f1b53fee779c4c81d9bbf36ef37f4c05b04037353)
-> id: STRING(58e71612910cd3e2a89d809f1b53fee779c4c81d9bbf36ef37f4c05b04037353:5432)
-> otel.entity.id: Map({"discovery.endpoint.id":"k8s_observer/ed171efd-f5ab-4bab-923d-d30f3f221367/(9080)"})
-> otel.entity.event.type: Str(entity_state)
-> otel.entity.attributes: Map({"discovery.observer.name":"","discovery.observer.type":"k8s_observer","endpoint":"192.168.33.122:9080","name":"","pod":{"annotations":{"kubernetes.io/psp":"eks.privileged"},"labels":{"appv":"reviews","pod-template-hash":"7bff4f6574","version":"v1"},"name":"reviews-v1-7bff4f6574-fbkw9","namespace":"default","uid":"ed171efd-f5ab-4bab-923d-d30f3f221367"},"port":9080,"transport":"TCP","type":"port"})
Trace ID:
Span ID:
Flags: 0
LogRecord #1
ObservedTimestamp: 2022-07-27 19:00:32.305771365 +0000 UTC
Timestamp: 2022-07-27 19:00:32.305771316 +0000 UTC
Severity: info
Body: {"alternate_port":0,"command":"nginx -g daemon off;","container_id":"2567fdbc764706d29120b01efcc3a310d87e9e121ec9debbc977d66f5497cdda","endpoint":"172.17.0.2:80","host":"172.17.0.2","id":"2567fdbc764706d29120b01efcc3a310d87e9e121ec9debbc977d66f5497cdda:80","image":"nginx","labels":{"maintainer":"NGINX Docker Maintainers \u003cdocker-maint@nginx.com\u003e"},"name":"ecstatic_davinci","port":80,"tag":"latest","transport":"TCP","type":"container"}
Attributes:
-> type: STRING(endpoint)
-> labels: STRING(map[maintainer:NGINX Docker Maintainers <docker-maint@nginx.com>])
-> id: STRING(2567fdbc764706d29120b01efcc3a310d87e9e121ec9debbc977d66f5497cdda:80)
-> tag: STRING(latest)
-> port: STRING(80)
-> alternate_port: STRING(0)
-> command: STRING(nginx -g daemon off;)
-> container_id: STRING(2567fdbc764706d29120b01efcc3a310d87e9e121ec9debbc977d66f5497cdda)
-> transport: STRING(TCP)
-> name: STRING(ecstatic_davinci)
-> image: STRING(nginx)
-> host: STRING(172.17.0.2)
-> endpoint: STRING(172.17.0.2:80)
Trace ID:
Span ID:
Flags: 0
LogRecord #2
ObservedTimestamp: 2022-07-27 19:00:32.305784658 +0000 UTC
Timestamp: 2022-07-27 19:00:32.305784612 +0000 UTC
Severity: info
Body: {"alternate_port":0,"command":"redis-server","container_id":"5f7f5f007f798c59a60c765e566db5d22bff59c614268db8d1b9abbc3ee70bf7","endpoint":"172.17.0.3:6379","host":"172.17.0.3","id":"5f7f5f007f798c59a60c765e566db5d22bff59c614268db8d1b9abbc3ee70bf7:6379","image":"redis","labels":{},"name":"beautiful_clarke","port":6379,"tag":"latest","transport":"TCP","type":"container"}
ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2024-04-17 19:53:24.28493287 +0000 UTC
SeverityText:
SeverityNumber: Unspecified(0)
Body: Empty()
Attributes:
-> type: STRING(endpoint)
-> command: STRING(redis-server)
-> transport: STRING(TCP)
-> endpoint: STRING(172.17.0.3:6379)
-> port: STRING(6379)
-> image: STRING(redis)
-> tag: STRING(latest)
-> alternate_port: STRING(0)
-> container_id: STRING(5f7f5f007f798c59a60c765e566db5d22bff59c614268db8d1b9abbc3ee70bf7)
-> host: STRING(172.17.0.3)
-> labels: STRING(map[])
-> id: STRING(5f7f5f007f798c59a60c765e566db5d22bff59c614268db8d1b9abbc3ee70bf7:6379)
-> name: STRING(beautiful_clarke)
-> otel.entity.id: Map({"discovery.endpoint.id":"k8s_observer/ea8ee4f5-31a7-48f0-a3c7-ec41e736ccad/jaeger-grpc(14250)"})
-> otel.entity.event.type: Str(entity_state)
-> otel.entity.attributes: Map({"discovery.observer.name":"","discovery.observer.type":"k8s_observer","endpoint":"192.168.57.181:14250","name":"jaeger-grpc","pod":{"annotations":{"checksum/config":"d6cc5d07fe24d77b0d0af827295879943d59e87013f0f1e34fa916b942c51336","kubectl.kubernetes.io/default-container":"otel-collector","kubernetes.io/psp":"eks.privileged"},"labels":{"app":"splunk-otel-collector","component":"otel-collector-agent","controller-revision-hash":"6cb7d5c864","pod-template-generation":"5","release":"my-splunk-otel-collector"},"name":"my-splunk-otel-collector-agent-cdv7s","namespace":"default","uid":"ea8ee4f5-31a7-48f0-a3c7-ec41e736ccad"},"port":14250,"transport":"TCP","type":"port"})
Trace ID:
Span ID:
Flags: 0
Expand Down Expand Up @@ -271,7 +231,7 @@ Flags: 0
| Name | Type | Default | Docs |
|------------------------------|---------------------------|------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `watch_observers` (required) | []string | <no value> | The array of Observer extensions to receive Endpoint events from |
| `log_endpoints` | bool | false | Whether to emit log records for Observer Endpoint events |
| `log_endpoints` | bool | false | Whether to emit all endpoint activity as entity events |
| `embed_receiver_config` | bool | false | Whether to embed a base64-encoded, minimal Receiver Creator config for the generated receiver as a reported metrics `discovery.receiver.rule` resource attribute value for status log record matches |
| `receivers` | map[string]ReceiverConfig | <no value> | The mapping of receiver names to their Receiver sub-config |

Expand Down
3 changes: 1 addition & 2 deletions internal/receiver/discoveryreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ type Config struct {
// The configured Observer extensions from which to receive Endpoint events.
// Must implement the observer.Observable interface.
WatchObservers []component.ID `mapstructure:"watch_observers"`
// Whether to emit log records for all endpoint activity, consisting of Endpoint
// content as record attributes.
// Whether to emit all endpoint activity as entity events.
LogEndpoints bool `mapstructure:"log_endpoints"`
// Whether to include the receiver config as a base64-encoded "discovery.receiver.config"
// resource attribute string value. Will also contain the configured observer that
Expand Down
54 changes: 27 additions & 27 deletions internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

type endpointState string
Expand Down Expand Up @@ -91,7 +94,7 @@ func (et *endpointTracker) stop() {

func (et *endpointTracker) emitEndpointLogs(observerCID component.ID, eventType endpointState, endpoints []observer.Endpoint, received time.Time) {
if et.config.LogEndpoints && et.pLogs != nil {
pLogs, numFailed, err := endpointToPLogs(observerCID, fmt.Sprintf("endpoint.%s", eventType), endpoints, received)
pLogs, numFailed, err := endpointToPLogs(observerCID, eventType, endpoints, received)
if err != nil {
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to log records", numFailed), zap.Error(err))
}
Expand Down Expand Up @@ -160,36 +163,33 @@ func (n *notify) OnChange(changed []observer.Endpoint) {
n.endpointTracker.updateEndpoints(changed, changedState, n.observerID)
}

func endpointToPLogs(observerID component.ID, eventType string, endpoints []observer.Endpoint, received time.Time) (pLogs plog.Logs, failed int, err error) {
pLogs = plog.NewLogs()
rlog := pLogs.ResourceLogs().AppendEmpty()
rAttrs := rlog.Resource().Attributes()
rAttrs.PutStr(eventTypeAttr, eventType)
rAttrs.PutStr(observerNameAttr, observerID.Name())
rAttrs.PutStr(observerTypeAttr, observerID.Type().String())
sl := rlog.ScopeLogs().AppendEmpty()
func endpointToPLogs(observerID component.ID, eventType endpointState, endpoints []observer.Endpoint, received time.Time) (pLogs plog.Logs, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
logRecord := sl.LogRecords().AppendEmpty()
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(received))
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
attrs := logRecord.Attributes()
if endpoint.Details != nil {
logRecord.Body().SetStr(fmt.Sprintf("%s %s endpoint %s", eventType, endpoint.Details.Type(), endpoint.ID))
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
}
attrs.PutStr("type", string(endpoint.Details.Type()))
entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(received))
entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
if eventType == removedState {
entityEvent.SetEntityDelete()
} else {
logRecord.Body().SetStr(fmt.Sprintf("%s endpoint %s", eventType, endpoint.ID))
entityState := entityEvent.SetEntityState()
attrs := entityState.Attributes()
if endpoint.Details != nil {
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
}
attrs.PutStr("type", string(endpoint.Details.Type()))
}
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr(observerNameAttr, observerID.Name())
attrs.PutStr(observerTypeAttr, observerID.Type().String())
}
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr("id", string(endpoint.ID))
}
return
return entityEvents.ConvertAndMoveToLogs(), failed, err
}

func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer.EndpointEnv) (pcommon.Map, error) {
Expand Down
Loading
Loading