diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 2e705c86a6c..6031d115482 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -4,7 +4,6 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( - "container/ring" "context" "errors" "slices" @@ -255,11 +254,11 @@ type queue struct { sync.Mutex cap, len int - read, write *ring.Ring + read, write *ring } func newQueue(size int) *queue { - r := ring.New(size) + r := newRing(size) return &queue{ cap: size, read: r, @@ -304,7 +303,7 @@ func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { n := min(len(buf), q.len) for i := 0; i < n; i++ { - buf[i] = q.read.Value.(Record) + buf[i] = q.read.Value q.read = q.read.Next() } @@ -324,7 +323,7 @@ func (q *queue) Flush() []Record { out := make([]Record, q.len) for i := range out { - out[i] = q.read.Value.(Record) + out[i] = q.read.Value q.read = q.read.Next() } q.len = 0 diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 90630ac6be4..416a19e0094 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -10,6 +10,7 @@ import ( "sync" "testing" "time" + "unsafe" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -560,3 +561,31 @@ func TestQueue(t *testing.T) { assert.Len(t, out, goRoutines, "flushed Records") }) } + +func BenchmarkBatchProcessorOnEmit(b *testing.B) { + var r Record + body := log.BoolValue(true) + r.SetBody(body) + + rSize := unsafe.Sizeof(r) + unsafe.Sizeof(body) + ctx := context.Background() + bp := NewBatchProcessor( + defaultNoopExporter, + WithMaxQueueSize(b.N+1), + WithExportMaxBatchSize(b.N+1), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + b.Cleanup(func() { _ = bp.Shutdown(ctx) }) + + b.SetBytes(int64(rSize)) + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + var err error + for pb.Next() { + err = bp.OnEmit(ctx, r) + } + _ = err + }) +} diff --git a/sdk/log/ring.go b/sdk/log/ring.go new file mode 100644 index 00000000000..5e84cb16455 --- /dev/null +++ b/sdk/log/ring.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package log // import "go.opentelemetry.io/otel/sdk/log" + +// A ring is an element of a circular list, or ring. Rings do not have a +// beginning or end; a pointer to any ring element serves as reference to the +// entire ring. Empty rings are represented as nil ring pointers. The zero +// value for a ring is a one-element ring with a nil Value. +// +// This is copied from the "container/ring" package. It uses a Record type for +// Value instead of any to avoid allocations. +type ring struct { + next, prev *ring + Value Record +} + +func (r *ring) init() *ring { + r.next = r + r.prev = r + return r +} + +// Next returns the next ring element. r must not be empty. +func (r *ring) Next() *ring { + if r.next == nil { + return r.init() + } + return r.next +} + +// Prev returns the previous ring element. r must not be empty. +func (r *ring) Prev() *ring { + if r.next == nil { + return r.init() + } + return r.prev +} + +// newRing creates a ring of n elements. +func newRing(n int) *ring { + if n <= 0 { + return nil + } + r := new(ring) + p := r + for i := 1; i < n; i++ { + p.next = &ring{prev: p} + p = p.next + } + p.next = r + r.prev = p + return r +} + +// Len computes the number of elements in ring r. It executes in time +// proportional to the number of elements. +func (r *ring) Len() int { + n := 0 + if r != nil { + n = 1 + for p := r.Next(); p != r; p = p.next { + n++ + } + } + return n +} + +// Do calls function f on each element of the ring, in forward order. The +// behavior of Do is undefined if f changes *r. +func (r *ring) Do(f func(Record)) { + if r != nil { + f(r.Value) + for p := r.Next(); p != r; p = p.next { + f(p.Value) + } + } +} diff --git a/sdk/log/ring_test.go b/sdk/log/ring_test.go new file mode 100644 index 00000000000..6e36777417d --- /dev/null +++ b/sdk/log/ring_test.go @@ -0,0 +1,86 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package log + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/log" +) + +func verifyRing(t *testing.T, r *ring, N int, sum int) { + // Length. + assert.Equal(t, N, r.Len(), "r.Len()") + + // Iteration. + var n, s int + r.Do(func(v Record) { + n++ + body := v.Body() + if body.Kind() != log.KindEmpty { + s += int(body.AsInt64()) + } + }) + assert.Equal(t, N, n, "number of forward iterations") + if sum >= 0 { + assert.Equal(t, sum, s, "forward ring sum") + } + + if r == nil { + return + } + + // Connections. + if r.next != nil { + var p *ring // previous element. + for q := r; p == nil || q != r; q = q.next { + if p != nil { + assert.Equalf(t, p, q.prev, "prev = %p, expected q.prev = %p", p, q.prev) + } + p = q + } + assert.Equalf(t, p, r.prev, "prev = %p, expected r.prev = %p", p, r.prev) + } + + // Next, Prev. + assert.Equal(t, r.next, r.Next(), "r.Next() != r.next") + assert.Equal(t, r.prev, r.Prev(), "r.Prev() != r.prev") +} + +func TestNewRing(t *testing.T) { + for i := 0; i < 10; i++ { + // Empty value. + r := newRing(i) + verifyRing(t, r, i, -1) + } + + for n := 0; n < 10; n++ { + r := newRing(n) + for i := 1; i <= n; i++ { + var rec Record + rec.SetBody(log.IntValue(i)) + r.Value = rec + r = r.Next() + } + + sum := (n*n + n) / 2 + verifyRing(t, r, n, sum) + } +} + +func TestEmptyRing(t *testing.T) { + var rNext, rPrev ring + verifyRing(t, rNext.Next(), 1, 0) + verifyRing(t, rPrev.Prev(), 1, 0) + + var rLen, rDo *ring + assert.Equal(t, rLen.Len(), 0, "Len()") + rDo.Do(func(Record) { assert.Fail(t, "Do func arg called") }) +}