Skip to content

Commit

Permalink
[receiver/discovery] Emit entity events for discovered endpoints
Browse files Browse the repository at this point in the history
Change log records output when `log_endpoints` is enabled from discovery log records to entity events. This will serve as a basis for emitting entity events on regular basis
  • Loading branch information
dmitryax committed Apr 17, 2024
1 parent a079b0c commit b3ad814
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 244 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### 🛑 Breaking changes 🛑

- (Splunk) `receiver/discovery`: Emit entity events for discovered endpoints with log_endpoints: true ([#4684](https://github.com/signalfx/splunk-otel-collector/pull/4684))

### 💡 Enhancements 💡

- (Splunk) Include [`splunk-otel-dotnet`](https://github.com/signalfx/splunk-otel-dotnet) in the `splunk-otel-auto-instrumentation` deb/rpm packages (x86_64/amd64 only) ([#4679](https://github.com/signalfx/splunk-otel-collector/pull/4679))
Expand Down
10 changes: 6 additions & 4 deletions internal/common/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ const (
StatusAttr = "discovery.status"
MessageAttr = "discovery.message"

OtelEntityAttributesAttr = "otel.entity.attributes"
OtelEntityIDAttr = "otel.entity.id"
OtelEntityEventTypeAttr = "otel.entity.event.type"
OtelEntityEventTypeState = "entity_state"
OtelEntityAttributesAttr = "otel.entity.attributes"
OtelEntityIDAttr = "otel.entity.id"
OtelEntityEventTypeAttr = "otel.entity.event.type"
OtelEntityEventTypeState = "entity_state"
OtelEntityEventTypeDelete = "entity_delete"
OtelEntityEventAsLogAttr = "otel.entity.event_as_log"

DiscoExtensionsKey = "extensions/splunk.discovery"
DiscoReceiversKey = "receivers/splunk.discovery"
Expand Down
84 changes: 22 additions & 62 deletions internal/receiver/discoveryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ component-level log statements are similarly intercepted by a log evaluator, and
records based on the `status: statements` rules you define. The matching rules SHOULD NOT conflict with each other.
The first matching rule in the list will be used to determine the status of the receiver.

The receiver also allows you to emit log records for all
The receiver also allows you to emit entity events for all
[Endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/observer/endpoints.go)
events from the specified `watch_observers`. This way you can report your environment as observed by platform-specific
observers in real time, with or without discovering receiver statuses:
Expand All @@ -42,77 +42,37 @@ service:
```
```
2022-07-27T19:00:32.305Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "logging", "#logs": 3}
2022-07-27T19:00:32.306Z info ResourceLog #0
2024-04-17T19:53:24.285Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 2}
2024-04-17T19:53:24.286Z info ResourceLog #0
Resource SchemaURL:
Resource labels:
-> event.type: STRING(endpoint.added)
ScopeLogs #0
ScopeLogs SchemaURL:
InstrumentationScope
InstrumentationScope attributes:
-> otel.entity.event_as_log: Bool(true)
LogRecord #0
ObservedTimestamp: 2022-07-27 19:00:32.30572809 +0000 UTC
Timestamp: 2022-07-27 19:00:32.305727862 +0000 UTC
Severity: info
Body: {"alternate_port":5432,"command":"postgres -c shared_preload_libraries=pg_stat_statements -c wal_level=logical -c max_replication_slots=2","container_id":"58e71612910cd3e2a89d809f1b53fee779c4c81d9bbf36ef37f4c05b04037353","endpoint":"172.17.0.4:5432","host":"172.17.0.4","id":"58e71612910cd3e2a89d809f1b53fee779c4c81d9bbf36ef37f4c05b04037353:5432","image":"postgres","labels":{},"name":"naughty_heisenberg","port":5432,"tag":"latest","transport":"TCP","type":"container"}
ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2024-04-17 19:53:24.28493287 +0000 UTC
SeverityText:
SeverityNumber: Unspecified(0)
Body: Empty()
Attributes:
-> type: STRING(endpoint)
-> image: STRING(postgres)
-> tag: STRING(latest)
-> port: STRING(5432)
-> alternate_port: STRING(5432)
-> host: STRING(172.17.0.4)
-> transport: STRING(TCP)
-> labels: STRING(map[])
-> endpoint: STRING(172.17.0.4:5432)
-> name: STRING(naughty_heisenberg)
-> command: STRING(postgres -c shared_preload_libraries=pg_stat_statements -c wal_level=logical -c max_replication_slots=2)
-> container_id: STRING(58e71612910cd3e2a89d809f1b53fee779c4c81d9bbf36ef37f4c05b04037353)
-> id: STRING(58e71612910cd3e2a89d809f1b53fee779c4c81d9bbf36ef37f4c05b04037353:5432)
-> otel.entity.id: Map({"discovery.endpoint.id":"k8s_observer/ed171efd-f5ab-4bab-923d-d30f3f221367/(9080)"})
-> otel.entity.event.type: Str(entity_state)
-> otel.entity.attributes: Map({"discovery.observer.name":"","discovery.observer.type":"k8s_observer","endpoint":"192.168.33.122:9080","name":"","pod":{"annotations":{"kubernetes.io/psp":"eks.privileged"},"labels":{"appv":"reviews","pod-template-hash":"7bff4f6574","version":"v1"},"name":"reviews-v1-7bff4f6574-fbkw9","namespace":"default","uid":"ed171efd-f5ab-4bab-923d-d30f3f221367"},"port":9080,"transport":"TCP","type":"port"})
Trace ID:
Span ID:
Flags: 0
LogRecord #1
ObservedTimestamp: 2022-07-27 19:00:32.305771365 +0000 UTC
Timestamp: 2022-07-27 19:00:32.305771316 +0000 UTC
Severity: info
Body: {"alternate_port":0,"command":"nginx -g daemon off;","container_id":"2567fdbc764706d29120b01efcc3a310d87e9e121ec9debbc977d66f5497cdda","endpoint":"172.17.0.2:80","host":"172.17.0.2","id":"2567fdbc764706d29120b01efcc3a310d87e9e121ec9debbc977d66f5497cdda:80","image":"nginx","labels":{"maintainer":"NGINX Docker Maintainers \u003cdocker-maint@nginx.com\u003e"},"name":"ecstatic_davinci","port":80,"tag":"latest","transport":"TCP","type":"container"}
Attributes:
-> type: STRING(endpoint)
-> labels: STRING(map[maintainer:NGINX Docker Maintainers <docker-maint@nginx.com>])
-> id: STRING(2567fdbc764706d29120b01efcc3a310d87e9e121ec9debbc977d66f5497cdda:80)
-> tag: STRING(latest)
-> port: STRING(80)
-> alternate_port: STRING(0)
-> command: STRING(nginx -g daemon off;)
-> container_id: STRING(2567fdbc764706d29120b01efcc3a310d87e9e121ec9debbc977d66f5497cdda)
-> transport: STRING(TCP)
-> name: STRING(ecstatic_davinci)
-> image: STRING(nginx)
-> host: STRING(172.17.0.2)
-> endpoint: STRING(172.17.0.2:80)
Trace ID:
Span ID:
Flags: 0
LogRecord #2
ObservedTimestamp: 2022-07-27 19:00:32.305784658 +0000 UTC
Timestamp: 2022-07-27 19:00:32.305784612 +0000 UTC
Severity: info
Body: {"alternate_port":0,"command":"redis-server","container_id":"5f7f5f007f798c59a60c765e566db5d22bff59c614268db8d1b9abbc3ee70bf7","endpoint":"172.17.0.3:6379","host":"172.17.0.3","id":"5f7f5f007f798c59a60c765e566db5d22bff59c614268db8d1b9abbc3ee70bf7:6379","image":"redis","labels":{},"name":"beautiful_clarke","port":6379,"tag":"latest","transport":"TCP","type":"container"}
ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2024-04-17 19:53:24.28493287 +0000 UTC
SeverityText:
SeverityNumber: Unspecified(0)
Body: Empty()
Attributes:
-> type: STRING(endpoint)
-> command: STRING(redis-server)
-> transport: STRING(TCP)
-> endpoint: STRING(172.17.0.3:6379)
-> port: STRING(6379)
-> image: STRING(redis)
-> tag: STRING(latest)
-> alternate_port: STRING(0)
-> container_id: STRING(5f7f5f007f798c59a60c765e566db5d22bff59c614268db8d1b9abbc3ee70bf7)
-> host: STRING(172.17.0.3)
-> labels: STRING(map[])
-> id: STRING(5f7f5f007f798c59a60c765e566db5d22bff59c614268db8d1b9abbc3ee70bf7:6379)
-> name: STRING(beautiful_clarke)
-> otel.entity.id: Map({"discovery.endpoint.id":"k8s_observer/ea8ee4f5-31a7-48f0-a3c7-ec41e736ccad/jaeger-grpc(14250)"})
-> otel.entity.event.type: Str(entity_state)
-> otel.entity.attributes: Map({"discovery.observer.name":"","discovery.observer.type":"k8s_observer","endpoint":"192.168.57.181:14250","name":"jaeger-grpc","pod":{"annotations":{"checksum/config":"d6cc5d07fe24d77b0d0af827295879943d59e87013f0f1e34fa916b942c51336","kubectl.kubernetes.io/default-container":"otel-collector","kubernetes.io/psp":"eks.privileged"},"labels":{"app":"splunk-otel-collector","component":"otel-collector-agent","controller-revision-hash":"6cb7d5c864","pod-template-generation":"5","release":"my-splunk-otel-collector"},"name":"my-splunk-otel-collector-agent-cdv7s","namespace":"default","uid":"ea8ee4f5-31a7-48f0-a3c7-ec41e736ccad"},"port":14250,"transport":"TCP","type":"port"})
Trace ID:
Span ID:
Flags: 0
Expand Down Expand Up @@ -271,7 +231,7 @@ Flags: 0
| Name | Type | Default | Docs |
|------------------------------|---------------------------|------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `watch_observers` (required) | []string | <no value> | The array of Observer extensions to receive Endpoint events from |
| `log_endpoints` | bool | false | Whether to emit log records for Observer Endpoint events |
| `log_endpoints` | bool | false | Whether to emit all endpoint activity as entity events |
| `embed_receiver_config` | bool | false | Whether to embed a base64-encoded, minimal Receiver Creator config for the generated receiver as a reported metrics `discovery.receiver.rule` resource attribute value for status log record matches |
| `receivers` | map[string]ReceiverConfig | <no value> | The mapping of receiver names to their Receiver sub-config |
Expand Down
3 changes: 1 addition & 2 deletions internal/receiver/discoveryreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ type Config struct {
// The configured Observer extensions from which to receive Endpoint events.
// Must implement the observer.Observable interface.
WatchObservers []component.ID `mapstructure:"watch_observers"`
// Whether to emit log records for all endpoint activity, consisting of Endpoint
// content as record attributes.
// Whether to emit all endpoint activity as entity events.
LogEndpoints bool `mapstructure:"log_endpoints"`
// Whether to include the receiver config as a base64-encoded "discovery.receiver.config"
// resource attribute string value. Will also contain the configured observer that
Expand Down
54 changes: 27 additions & 27 deletions internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

type endpointState string
Expand Down Expand Up @@ -91,7 +94,7 @@ func (et *endpointTracker) stop() {

func (et *endpointTracker) emitEndpointLogs(observerCID component.ID, eventType endpointState, endpoints []observer.Endpoint, received time.Time) {
if et.config.LogEndpoints && et.pLogs != nil {
pLogs, numFailed, err := endpointToPLogs(observerCID, fmt.Sprintf("endpoint.%s", eventType), endpoints, received)
pLogs, numFailed, err := endpointToPLogs(observerCID, eventType, endpoints, received)
if err != nil {
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to log records", numFailed), zap.Error(err))
}
Expand Down Expand Up @@ -160,36 +163,33 @@ func (n *notify) OnChange(changed []observer.Endpoint) {
n.endpointTracker.updateEndpoints(changed, changedState, n.observerID)
}

func endpointToPLogs(observerID component.ID, eventType string, endpoints []observer.Endpoint, received time.Time) (pLogs plog.Logs, failed int, err error) {
pLogs = plog.NewLogs()
rlog := pLogs.ResourceLogs().AppendEmpty()
rAttrs := rlog.Resource().Attributes()
rAttrs.PutStr(eventTypeAttr, eventType)
rAttrs.PutStr(observerNameAttr, observerID.Name())
rAttrs.PutStr(observerTypeAttr, observerID.Type().String())
sl := rlog.ScopeLogs().AppendEmpty()
func endpointToPLogs(observerID component.ID, eventType endpointState, endpoints []observer.Endpoint, received time.Time) (pLogs plog.Logs, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
logRecord := sl.LogRecords().AppendEmpty()
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(received))
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
attrs := logRecord.Attributes()
if endpoint.Details != nil {
logRecord.Body().SetStr(fmt.Sprintf("%s %s endpoint %s", eventType, endpoint.Details.Type(), endpoint.ID))
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
}
attrs.PutStr("type", string(endpoint.Details.Type()))
entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(received))
entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
if eventType == removedState {
entityEvent.SetEntityDelete()
} else {
logRecord.Body().SetStr(fmt.Sprintf("%s endpoint %s", eventType, endpoint.ID))
entityState := entityEvent.SetEntityState()
attrs := entityState.Attributes()
if endpoint.Details != nil {
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
}
attrs.PutStr("type", string(endpoint.Details.Type()))
}
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr(observerNameAttr, observerID.Name())
attrs.PutStr(observerTypeAttr, observerID.Type().String())
}
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr("id", string(endpoint.ID))
}
return
return entityEvents.ConvertAndMoveToLogs(), failed, err
}

func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer.EndpointEnv) (pcommon.Map, error) {
Expand Down
Loading

0 comments on commit b3ad814

Please sign in to comment.