Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log warning when a trace attribute/event/link is discarded due to limits #5434

Merged
merged 8 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- Log a warning to the OpenTelemetry internal logger when a `Record` in `go.opentelemetry.io/otel/sdk/log` drops an attribute due to a limit being reached. (#5376)
- Log a warning to the OpenTelemetry internal logger when a `Span` in `go.opentelemetry.io/otel/sdk/trace` drops an attribute, event, or link due to a limit being reached. (#5434)

## [1.27.0/0.49.0/0.3.0] 2024-05-21

Expand Down
31 changes: 26 additions & 5 deletions sdk/trace/evictedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,38 @@

package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
"fmt"
"slices"
"sync"

"go.opentelemetry.io/otel/internal/global"
)

// evictedQueue is a FIFO queue with a configurable capacity.
type evictedQueue struct {
queue []interface{}
type evictedQueue[T any] struct {
queue []T
capacity int
droppedCount int
logDropped func()
}

func newEvictedQueue(capacity int) evictedQueue {
func newEvictedQueue[T any](capacity int) evictedQueue[T] {
var tVal T
msg := fmt.Sprintf("limit reached: dropping trace %T", tVal)
// Do not pre-allocate queue, do this lazily.
return evictedQueue{capacity: capacity}
return evictedQueue[T]{
capacity: capacity,
logDropped: sync.OnceFunc(func() { global.Warn(msg) }),
}
}

// add adds value to the evictedQueue eq. If eq is at capacity, the oldest
// queued value will be discarded and the drop count incremented.
func (eq *evictedQueue) add(value interface{}) {
func (eq *evictedQueue[T]) add(value T) {
if eq.capacity == 0 {
eq.droppedCount++
eq.logDropped()
return
}

Expand All @@ -28,6 +43,12 @@ func (eq *evictedQueue) add(value interface{}) {
copy(eq.queue[:eq.capacity-1], eq.queue[1:])
eq.queue = eq.queue[:eq.capacity-1]
eq.droppedCount++
eq.logDropped()
}
eq.queue = append(eq.queue, value)
}

// copy returns a copy of the evictedQueue.
func (eq *evictedQueue[T]) copy() []T {
return slices.Clone(eq.queue)
}
31 changes: 22 additions & 9 deletions sdk/trace/evictedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,47 @@ package trace
import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

func init() {
}

func TestAdd(t *testing.T) {
q := newEvictedQueue(3)
q := newEvictedQueue[string](3)
q.add("value1")
q.add("value2")
if wantLen, gotLen := 2, len(q.queue); wantLen != gotLen {
t.Errorf("got queue length %d want %d", gotLen, wantLen)
}
}

func (eq *evictedQueue) queueToArray() []string {
arr := make([]string, 0)
for _, value := range eq.queue {
arr = append(arr, value.(string))
}
return arr
func TestCopy(t *testing.T) {
q := newEvictedQueue[string](3)
q.add("value1")
cp := q.copy()

q.add("value2")
assert.Equal(t, []string{"value1"}, cp, "queue update modified copy")

cp[0] = "value0"
assert.Equal(t, "value1", q.queue[0], "copy update modified queue")
}

func TestDropCount(t *testing.T) {
q := newEvictedQueue(3)
q := newEvictedQueue[string](3)
var called bool
q.logDropped = func() { called = true }

q.add("value1")
assert.False(t, called, `"value1" logged as dropped`)
q.add("value2")
assert.False(t, called, `"value2" logged as dropped`)
q.add("value3")
assert.False(t, called, `"value3" logged as dropped`)
q.add("value1")
assert.True(t, called, `"value2" not logged as dropped`)
q.add("value4")
if wantLen, gotLen := 3, len(q.queue); wantLen != gotLen {
t.Errorf("got queue length %d want %d", gotLen, wantLen)
Expand All @@ -42,7 +55,7 @@ func TestDropCount(t *testing.T) {
t.Errorf("got drop count %d want %d", gotDropCount, wantDropCount)
}
wantArr := []string{"value3", "value1", "value4"}
gotArr := q.queueToArray()
gotArr := q.copy()

if wantLen, gotLen := len(wantArr), len(gotArr); gotLen != wantLen {
t.Errorf("got array len %d want %d", gotLen, wantLen)
Expand Down
54 changes: 28 additions & 26 deletions sdk/trace/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/internal"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -137,12 +138,13 @@ type recordingSpan struct {
// ReadOnlySpan exported when the span ends.
attributes []attribute.KeyValue
droppedAttributes int
logDropAttrsOnce sync.Once

// events are stored in FIFO queue capped by configured limit.
events evictedQueue
events evictedQueue[Event]

// links are stored in FIFO queue capped by configured limit.
links evictedQueue
links evictedQueue[Link]

// executionTracerTaskEnd ends the execution tracer span.
executionTracerTaskEnd func()
Expand Down Expand Up @@ -219,7 +221,7 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
limit := s.tracer.provider.spanLimits.AttributeCountLimit
if limit == 0 {
// No attributes allowed.
s.droppedAttributes += len(attributes)
s.addDroppedAttr(len(attributes))
return
}

Expand All @@ -236,14 +238,30 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
for _, a := range attributes {
if !a.Valid() {
// Drop all invalid attributes.
s.droppedAttributes++
s.addDroppedAttr(1)
continue
}
a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a)
s.attributes = append(s.attributes, a)
}
}

// Declared as a var so tests can override.
var logDropAttrs = func() {
global.Warn("limit reached: dropping trace Span attributes")
}

// addDroppedAttr adds incr to the count of dropped attributes.
//
// The first, and only the first, time this method is called a warning will be
// logged.
//
// This method assumes s.mu.Lock is held by the caller.
func (s *recordingSpan) addDroppedAttr(incr int) {
s.droppedAttributes += incr
s.logDropAttrsOnce.Do(logDropAttrs)
}

// addOverCapAttrs adds the attributes attrs to the span s while
// de-duplicating the attributes of s and attrs and dropping attributes that
// exceed the limit.
Expand Down Expand Up @@ -273,7 +291,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) {
for _, a := range attrs {
if !a.Valid() {
// Drop all invalid attributes.
s.droppedAttributes++
s.addDroppedAttr(1)
continue
}

Expand All @@ -286,7 +304,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) {
if len(s.attributes) >= limit {
// Do not just drop all of the remaining attributes, make sure
// updates are checked and performed.
s.droppedAttributes++
s.addDroppedAttr(1)
} else {
a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a)
s.attributes = append(s.attributes, a)
Expand Down Expand Up @@ -585,7 +603,7 @@ func (s *recordingSpan) Links() []Link {
if len(s.links.queue) == 0 {
return []Link{}
}
return s.interfaceArrayToLinksArray()
return s.links.copy()
}

// Events returns the events of this span.
Expand All @@ -595,7 +613,7 @@ func (s *recordingSpan) Events() []Event {
if len(s.events.queue) == 0 {
return []Event{}
}
return s.interfaceArrayToEventArray()
return s.events.copy()
}

// Status returns the status of this span.
Expand Down Expand Up @@ -717,32 +735,16 @@ func (s *recordingSpan) snapshot() ReadOnlySpan {
}
sd.droppedAttributeCount = s.droppedAttributes
if len(s.events.queue) > 0 {
sd.events = s.interfaceArrayToEventArray()
sd.events = s.events.copy()
sd.droppedEventCount = s.events.droppedCount
}
if len(s.links.queue) > 0 {
sd.links = s.interfaceArrayToLinksArray()
sd.links = s.links.copy()
sd.droppedLinkCount = s.links.droppedCount
}
return &sd
}

func (s *recordingSpan) interfaceArrayToLinksArray() []Link {
linkArr := make([]Link, 0)
for _, value := range s.links.queue {
linkArr = append(linkArr, value.(Link))
}
return linkArr
}

func (s *recordingSpan) interfaceArrayToEventArray() []Event {
eventArr := make([]Event, 0)
for _, value := range s.events.queue {
eventArr = append(eventArr, value.(Event))
}
return eventArr
}

func (s *recordingSpan) addChild() {
if !s.IsRecording() {
return
Expand Down
16 changes: 16 additions & 0 deletions sdk/trace/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ func TestTruncateAttr(t *testing.T) {
}
}

func TestLogDropAttrs(t *testing.T) {
orig := logDropAttrs
t.Cleanup(func() { logDropAttrs = orig })

var called bool
logDropAttrs = func() { called = true }

s := &recordingSpan{}
s.addDroppedAttr(1)
assert.True(t, called, "logDropAttrs not called")

called = false
s.addDroppedAttr(1)
assert.False(t, called, "logDropAttrs called multiple times for same Span")
}

func BenchmarkRecordingSpanSetAttributes(b *testing.B) {
var attrs []attribute.KeyValue
for i := 0; i < 100; i++ {
Expand Down
4 changes: 2 additions & 2 deletions sdk/trace/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ func (tr *tracer) newRecordingSpan(psc, sc trace.SpanContext, name string, sr Sa
spanKind: trace.ValidateSpanKind(config.SpanKind()),
name: name,
startTime: startTime,
events: newEvictedQueue(tr.provider.spanLimits.EventCountLimit),
links: newEvictedQueue(tr.provider.spanLimits.LinkCountLimit),
events: newEvictedQueue[Event](tr.provider.spanLimits.EventCountLimit),
links: newEvictedQueue[Link](tr.provider.spanLimits.LinkCountLimit),
tracer: tr,
}

Expand Down