Skip to content

Commit

Permalink
Merge branch 'main' into prometheus_exemplars
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM authored Apr 3, 2024
2 parents 525c53d + 6c6e1e7 commit db1801f
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 0 deletions.
87 changes: 87 additions & 0 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"container/ring"
"context"
"sync"
"time"
)

Expand Down Expand Up @@ -76,6 +78,91 @@ func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
return nil
}

// queue holds a queue of logging records.
//
// When the queue becomes full, the oldest records in the queue are
// overwritten.
type queue struct {
sync.Mutex

cap, len int
read, write *ring.Ring
}

func newQueue(size int) *queue {
r := ring.New(size)
return &queue{
cap: size,
read: r,
write: r,
}
}

// Enqueue adds r to the queue. The queue size, including the addition of r, is
// returned.
//
// If enqueueing r will exceed the capacity of q, the oldest Record held in q
// will be dropped and r retained.
func (q *queue) Enqueue(r Record) int {
q.Lock()
defer q.Unlock()

q.write.Value = r
q.write = q.write.Next()

q.len++
if q.len > q.cap {
// Overflow. Advance read to be the new "oldest".
q.len = q.cap
q.read = q.read.Next()
}
return q.len
}

// TryDequeue attempts to dequeue up to len(buf) Records. The available Records
// will be assigned into buf and passed to write. If write fails, returning
// false, the Records will not be removed from the queue. If write succeeds,
// returning true, the dequeued Records are removed from the queue. The number
// of Records remaining in the queue are returned.
//
// When write is called the lock of q is held. The write function must not call
// other methods of this q that acquire the lock.
func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {
q.Lock()
defer q.Unlock()

origRead := q.read

n := min(len(buf), q.len)
for i := 0; i < n; i++ {
buf[i] = q.read.Value.(Record)
q.read = q.read.Next()
}

if write(buf[:n]) {
q.len -= n
} else {
q.read = origRead
}
return q.len
}

// Flush returns all the Records held in the queue and resets it to be
// empty.
func (q *queue) Flush() []Record {
q.Lock()
defer q.Unlock()

out := make([]Record, q.len)
for i := range out {
out[i] = q.read.Value.(Record)
q.read = q.read.Next()
}
q.len = 0

return out
}

type batchingConfig struct {
maxQSize setting[int]
expInterval setting[time.Duration]
Expand Down
113 changes: 113 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"slices"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/log"
)

func TestNewBatchingConfig(t *testing.T) {
Expand Down Expand Up @@ -126,3 +130,112 @@ func TestNewBatchingConfig(t *testing.T) {
})
}
}

func TestQueue(t *testing.T) {
var r Record
r.SetBody(log.BoolValue(true))

t.Run("newQueue", func(t *testing.T) {
const size = 1
q := newQueue(size)
assert.Equal(t, q.len, 0)
assert.Equal(t, size, q.cap, "capacity")
assert.Equal(t, size, q.read.Len(), "read ring")
assert.Same(t, q.read, q.write, "different rings")
})

t.Run("Enqueue", func(t *testing.T) {
const size = 2
q := newQueue(size)

var notR Record
notR.SetBody(log.IntValue(10))

assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch")
assert.Equal(t, 1, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")

assert.Equal(t, 2, q.Enqueue(r), "complete batch")
assert.Equal(t, 2, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")

assert.Equal(t, 2, q.Enqueue(r), "overflow batch")
assert.Equal(t, 2, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")

assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records")
})

t.Run("Flush", func(t *testing.T) {
const size = 2
q := newQueue(size)
q.write.Value = r
q.write = q.write.Next()
q.len = 1

assert.Equal(t, []Record{r}, q.Flush(), "flushed")
})

t.Run("TryFlush", func(t *testing.T) {
const size = 3
q := newQueue(size)
for i := 0; i < size-1; i++ {
q.write.Value = r
q.write = q.write.Next()
q.len++
}

buf := make([]Record, 1)
f := func([]Record) bool { return false }
assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed")
require.Equal(t, size-1, q.len, "length")
require.NotSame(t, q.read, q.write, "read ring advanced")

var flushed []Record
f = func(r []Record) bool {
flushed = append(flushed, r...)
return true
}
if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") {
assert.Equal(t, []Record{r}, flushed, "Records")
}

buf = slices.Grow(buf, size)
flushed = flushed[:0]
if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") {
assert.Equal(t, []Record{r}, flushed, "Records")
}
})

t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10

flushed := make(chan []Record, goRoutines)
out := make([]Record, 0, goRoutines)
done := make(chan struct{})
go func() {
defer close(done)
for recs := range flushed {
out = append(out, recs...)
}
}()

var wg sync.WaitGroup
wg.Add(goRoutines)

b := newQueue(goRoutines)
for i := 0; i < goRoutines; i++ {
go func() {
defer wg.Done()
b.Enqueue(Record{})
flushed <- b.Flush()
}()
}

wg.Wait()
close(flushed)
<-done

assert.Len(t, out, goRoutines, "flushed Records")
})
}

0 comments on commit db1801f

Please sign in to comment.