Skip to content

Commit

Permalink
Make the queue's public API accept interface{} instead of publisher.E…
Browse files Browse the repository at this point in the history
…vent (#31699)
  • Loading branch information
faec authored and chrisberkhout committed Jun 1, 2023
1 parent 6a992ac commit 90b5a55
Show file tree
Hide file tree
Showing 17 changed files with 101 additions and 120 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668]
- Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420]
- Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502]
- The `queue.Batch` API now provides access to individual events instead of an array. {pull}31699[31699]

==== Bugfixes

Expand Down
11 changes: 5 additions & 6 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand All @@ -33,7 +32,7 @@ type testQueue struct {
}

type testProducer struct {
publish func(try bool, event publisher.Event) bool
publish func(try bool, event interface{}) bool
cancel func() int
}

Expand Down Expand Up @@ -69,14 +68,14 @@ func (q *testQueue) Get(sz int) (queue.Batch, error) {
return nil, nil
}

func (p *testProducer) Publish(event publisher.Event) bool {
func (p *testProducer) Publish(event interface{}) bool {
if p.publish != nil {
return p.publish(false, event)
}
return false
}

func (p *testProducer) TryPublish(event publisher.Event) bool {
func (p *testProducer) TryPublish(event interface{}) bool {
if p.publish != nil {
return p.publish(true, event)
}
Expand Down Expand Up @@ -115,7 +114,7 @@ func makeTestQueue() queue.Queue {
var producer *testProducer
p := blockingProducer(cfg)
producer = &testProducer{
publish: func(try bool, event publisher.Event) bool {
publish: func(try bool, event interface{}) bool {
if try {
return p.TryPublish(event)
}
Expand Down Expand Up @@ -147,7 +146,7 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer {
waiting := atomic.MakeInt(0)

return &testProducer{
publish: func(_ bool, _ publisher.Event) bool {
publish: func(_ bool, _ interface{}) bool {
waiting.Inc()
<-sig
return false
Expand Down
46 changes: 22 additions & 24 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package pipeline

import (
"sync"

"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)
Expand All @@ -29,7 +27,9 @@ type retryer interface {
}

type ttlBatch struct {
original queue.Batch
// The callback to inform the queue (and possibly the producer)
// that this batch has been acknowledged.
ack func()

// The internal hook back to the eventConsumer, used to implement the
// publisher.Batch retry interface.
Expand All @@ -43,44 +43,42 @@ type ttlBatch struct {
events []publisher.Event
}

var batchPool = sync.Pool{
New: func() interface{} {
return &ttlBatch{}
},
}

func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
if original == nil {
panic("empty batch")
}

b := batchPool.Get().(*ttlBatch)
*b = ttlBatch{
original: original,
retryer: retryer,
ttl: ttl,
events: original.Events(),
count := original.Count()
events := make([]publisher.Event, 0, count)
for i := 0; i < count; i++ {
event, ok := original.Event(i).(publisher.Event)
if ok {
// In Beats this conversion will always succeed because only
// publisher.Event objects are inserted into the queue, but
// there's no harm in making sure.
events = append(events, event)
}
}
return b
}

func releaseBatch(b *ttlBatch) {
*b = ttlBatch{} // clear batch
batchPool.Put(b)
b := &ttlBatch{
ack: original.ACK,
retryer: retryer,
ttl: ttl,
events: events,
}
return b
}

func (b *ttlBatch) Events() []publisher.Event {
return b.events
}

func (b *ttlBatch) ACK() {
b.original.ACK()
releaseBatch(b)
b.ack()
}

func (b *ttlBatch) Drop() {
b.original.ACK()
releaseBatch(b)
b.ack()
}

func (b *ttlBatch) Retry() {
Expand Down
3 changes: 1 addition & 2 deletions libbeat/publisher/queue/diskqueue/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_siz
if err != nil {
return err
}
events := batch.Events()
batch.ACK()
received = received + len(events)
received = received + batch.Count()
if received == num_events {
break
}
Expand Down
13 changes: 6 additions & 7 deletions libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package diskqueue
import (
"fmt"

"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -83,12 +82,12 @@ eventLoop:
// diskQueueBatch implementation of the queue.Batch interface
//

func (batch *diskQueueBatch) Events() []publisher.Event {
events := make([]publisher.Event, len(batch.frames))
for i, frame := range batch.frames {
events[i] = frame.event
}
return events
func (batch *diskQueueBatch) Count() int {
return len(batch.frames)
}

func (batch *diskQueueBatch) Event(i int) interface{} {
return batch.frames[i].event
}

func (batch *diskQueueBatch) ACK() {
Expand Down
9 changes: 4 additions & 5 deletions libbeat/publisher/queue/diskqueue/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package diskqueue

import (
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -50,21 +49,21 @@ type producerWriteRequest struct {
// diskQueueProducer implementation of the queue.Producer interface
//

func (producer *diskQueueProducer) Publish(event publisher.Event) bool {
func (producer *diskQueueProducer) Publish(event interface{}) bool {
return producer.publish(event, true)
}

func (producer *diskQueueProducer) TryPublish(event publisher.Event) bool {
func (producer *diskQueueProducer) TryPublish(event interface{}) bool {
return producer.publish(event, false)
}

func (producer *diskQueueProducer) publish(
event publisher.Event, shouldBlock bool,
event interface{}, shouldBlock bool,
) bool {
if producer.cancelled {
return false
}
serialized, err := producer.encoder.encode(&event)
serialized, err := producer.encoder.encode(event)
if err != nil {
producer.queue.logger.Errorf(
"Couldn't serialize incoming event: %v", err)
Expand Down
23 changes: 17 additions & 6 deletions libbeat/publisher/queue/diskqueue/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package diskqueue

import (
"bytes"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -82,7 +83,17 @@ func (e *eventEncoder) reset() {
e.folder = folder
}

func (e *eventEncoder) encode(event *publisher.Event) ([]byte, error) {
func (e *eventEncoder) encode(evt interface{}) ([]byte, error) {
event, ok := evt.(publisher.Event)
if !ok {
// In order to support events of varying type, the disk queue needs
// to know how to encode them. When we decide to do this, we'll need
// to add an encoder to the settings passed in when creating a disk
// queue. See https://github.com/elastic/elastic-agent-shipper/issues/41.
// For now, just return an error.
return nil, fmt.Errorf("disk queue only supports publisher.Event")
}

e.buf.Reset()

err := e.folder.Fold(entry{
Expand Down Expand Up @@ -131,12 +142,12 @@ func (d *eventDecoder) Buffer(n int) []byte {
}

func (d *eventDecoder) Decode() (publisher.Event, error) {
var (
to entry
err error
)
var to entry

d.unfolder.SetTarget(&to)
err := d.unfolder.SetTarget(&to)
if err != nil {
return publisher.Event{}, err
}
defer d.unfolder.Reset()

if d.useJSON {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/serialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestSerialize(t *testing.T) {
},
},
}
serialized, err := encoder.encode(&event)
serialized, err := encoder.encode(event)
if err != nil {
t.Fatalf("[%v] Couldn't encode event: %v", test.name, err)
}
Expand Down
20 changes: 3 additions & 17 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ type ackLoop struct {

func (l *ackLoop) run() {
var (
// log = l.broker.logger

// Buffer up event counter in ackCount. If ackCount > 0, acks will be set to
// the broker.acks channel for sending the ACKs while potentially receiving
// new batches from the broker event loop.
Expand All @@ -46,10 +44,11 @@ func (l *ackLoop) run() {
// loop, as the ack loop will not block on any channel
ackCount int
ackChan chan int
sig chan batchAckMsg
)

for {
nextBatchChan := l.ackChans.nextBatchChannel()

select {
case <-l.broker.done:
return
Expand All @@ -60,25 +59,12 @@ func (l *ackLoop) run() {
case chanList := <-l.broker.scheduledACKs:
l.ackChans.concat(&chanList)

case <-sig:
case <-nextBatchChan:
ackCount += l.handleBatchSig()
if ackCount > 0 {
ackChan = l.broker.ackChan
}
}

// log.Debug("ackloop INFO")
// log.Debug("ackloop: total events scheduled = ", l.totalSched)
// log.Debug("ackloop: total events ack = ", l.totalACK)
// log.Debug("ackloop: total batches scheduled = ", l.batchesSched)
// log.Debug("ackloop: total batches ack = ", l.batchesACKed)

sig = l.ackChans.channel()
// if l.sig == nil {
// log.Debug("ackloop: no ack scheduled")
// } else {
// log.Debug("ackloop: schedule ack: ", l.lst.head.seq)
// }
}
}

Expand Down
4 changes: 1 addition & 3 deletions libbeat/publisher/queue/memqueue/batchbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package memqueue

import "github.com/elastic/beats/v7/libbeat/publisher"

type queueEntry struct {
event interface{}
client clientState
Expand All @@ -36,7 +34,7 @@ func newBatchBuffer(sz int) *batchBuffer {
return b
}

func (b *batchBuffer) add(event *publisher.Event, st clientState) {
func (b *batchBuffer) add(event interface{}, st clientState) {
b.entries = append(b.entries, queueEntry{event, st})
}

Expand Down
Loading

0 comments on commit 90b5a55

Please sign in to comment.