Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the queue's public API accept interface{} instead of publisher.Event #31699

Merged
merged 12 commits into from
May 26, 2022
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668]
- Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420]
- Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502]
- The `queue.Batch` API now provides access to individual events instead of an array. {pull}31699[31699]

==== Bugfixes

Expand Down
11 changes: 5 additions & 6 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand All @@ -33,7 +32,7 @@ type testQueue struct {
}

type testProducer struct {
publish func(try bool, event publisher.Event) bool
publish func(try bool, event interface{}) bool
cancel func() int
}

Expand Down Expand Up @@ -69,14 +68,14 @@ func (q *testQueue) Get(sz int) (queue.Batch, error) {
return nil, nil
}

func (p *testProducer) Publish(event publisher.Event) bool {
func (p *testProducer) Publish(event interface{}) bool {
if p.publish != nil {
return p.publish(false, event)
}
return false
}

func (p *testProducer) TryPublish(event publisher.Event) bool {
func (p *testProducer) TryPublish(event interface{}) bool {
if p.publish != nil {
return p.publish(true, event)
}
Expand Down Expand Up @@ -115,7 +114,7 @@ func makeTestQueue() queue.Queue {
var producer *testProducer
p := blockingProducer(cfg)
producer = &testProducer{
publish: func(try bool, event publisher.Event) bool {
publish: func(try bool, event interface{}) bool {
if try {
return p.TryPublish(event)
}
Expand Down Expand Up @@ -147,7 +146,7 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer {
waiting := atomic.MakeInt(0)

return &testProducer{
publish: func(_ bool, _ publisher.Event) bool {
publish: func(_ bool, _ interface{}) bool {
waiting.Inc()
<-sig
return false
Expand Down
46 changes: 22 additions & 24 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package pipeline

import (
"sync"

"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)
Expand All @@ -29,7 +27,9 @@ type retryer interface {
}

type ttlBatch struct {
original queue.Batch
// The callback to inform the queue (and possibly the producer)
// that this batch has been acknowledged.
ack func()

// The internal hook back to the eventConsumer, used to implement the
// publisher.Batch retry interface.
Expand All @@ -43,44 +43,42 @@ type ttlBatch struct {
events []publisher.Event
}

var batchPool = sync.Pool{
New: func() interface{} {
return &ttlBatch{}
},
}

func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
if original == nil {
panic("empty batch")
}

b := batchPool.Get().(*ttlBatch)
*b = ttlBatch{
original: original,
retryer: retryer,
ttl: ttl,
events: original.Events(),
count := original.Count()
events := make([]publisher.Event, 0, count)
for i := 0; i < count; i++ {
event, ok := original.Event(i).(publisher.Event)
if ok {
// In Beats this conversion will always succeed because only
// publisher.Event objects are inserted into the queue, but
// there's no harm in making sure.
events = append(events, event)
}
}
return b
}

func releaseBatch(b *ttlBatch) {
*b = ttlBatch{} // clear batch
batchPool.Put(b)
b := &ttlBatch{
ack: original.ACK,
retryer: retryer,
ttl: ttl,
events: events,
}
return b
}

func (b *ttlBatch) Events() []publisher.Event {
return b.events
}

func (b *ttlBatch) ACK() {
b.original.ACK()
releaseBatch(b)
b.ack()
}

func (b *ttlBatch) Drop() {
b.original.ACK()
releaseBatch(b)
b.ack()
}

func (b *ttlBatch) Retry() {
Expand Down
3 changes: 1 addition & 2 deletions libbeat/publisher/queue/diskqueue/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_siz
if err != nil {
return err
}
events := batch.Events()
batch.ACK()
received = received + len(events)
received = received + batch.Count()
if received == num_events {
break
}
Expand Down
13 changes: 6 additions & 7 deletions libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package diskqueue
import (
"fmt"

"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -83,12 +82,12 @@ eventLoop:
// diskQueueBatch implementation of the queue.Batch interface
//

func (batch *diskQueueBatch) Events() []publisher.Event {
events := make([]publisher.Event, len(batch.frames))
for i, frame := range batch.frames {
events[i] = frame.event
}
return events
func (batch *diskQueueBatch) Count() int {
return len(batch.frames)
}

func (batch *diskQueueBatch) Event(i int) interface{} {
return batch.frames[i].event
}

func (batch *diskQueueBatch) ACK() {
Expand Down
9 changes: 4 additions & 5 deletions libbeat/publisher/queue/diskqueue/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package diskqueue

import (
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -50,21 +49,21 @@ type producerWriteRequest struct {
// diskQueueProducer implementation of the queue.Producer interface
//

func (producer *diskQueueProducer) Publish(event publisher.Event) bool {
func (producer *diskQueueProducer) Publish(event interface{}) bool {
return producer.publish(event, true)
}

func (producer *diskQueueProducer) TryPublish(event publisher.Event) bool {
func (producer *diskQueueProducer) TryPublish(event interface{}) bool {
return producer.publish(event, false)
}

func (producer *diskQueueProducer) publish(
event publisher.Event, shouldBlock bool,
event interface{}, shouldBlock bool,
) bool {
if producer.cancelled {
return false
}
serialized, err := producer.encoder.encode(&event)
serialized, err := producer.encoder.encode(event)
if err != nil {
producer.queue.logger.Errorf(
"Couldn't serialize incoming event: %v", err)
Expand Down
23 changes: 17 additions & 6 deletions libbeat/publisher/queue/diskqueue/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package diskqueue

import (
"bytes"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -82,7 +83,17 @@ func (e *eventEncoder) reset() {
e.folder = folder
}

func (e *eventEncoder) encode(event *publisher.Event) ([]byte, error) {
func (e *eventEncoder) encode(evt interface{}) ([]byte, error) {
event, ok := evt.(publisher.Event)
if !ok {
// In order to support events of varying type, the disk queue needs
// to know how to encode them. When we decide to do this, we'll need
// to add an encoder to the settings passed in when creating a disk
// queue. See https://github.com/elastic/elastic-agent-shipper/issues/41.
// For now, just return an error.
return nil, fmt.Errorf("disk queue only supports publisher.Event")
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
}

e.buf.Reset()

err := e.folder.Fold(entry{
Expand Down Expand Up @@ -131,12 +142,12 @@ func (d *eventDecoder) Buffer(n int) []byte {
}

func (d *eventDecoder) Decode() (publisher.Event, error) {
var (
to entry
err error
)
var to entry

d.unfolder.SetTarget(&to)
err := d.unfolder.SetTarget(&to)
if err != nil {
return publisher.Event{}, err
}
defer d.unfolder.Reset()

if d.useJSON {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/serialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestSerialize(t *testing.T) {
},
},
}
serialized, err := encoder.encode(&event)
serialized, err := encoder.encode(event)
if err != nil {
t.Fatalf("[%v] Couldn't encode event: %v", test.name, err)
}
Expand Down
20 changes: 3 additions & 17 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ type ackLoop struct {

func (l *ackLoop) run() {
var (
// log = l.broker.logger

// Buffer up event counter in ackCount. If ackCount > 0, acks will be set to
// the broker.acks channel for sending the ACKs while potentially receiving
// new batches from the broker event loop.
Expand All @@ -46,10 +44,11 @@ func (l *ackLoop) run() {
// loop, as the ack loop will not block on any channel
ackCount int
ackChan chan int
sig chan batchAckMsg
)

for {
nextBatchChan := l.ackChans.nextBatchChannel()

select {
case <-l.broker.done:
return
Expand All @@ -60,25 +59,12 @@ func (l *ackLoop) run() {
case chanList := <-l.broker.scheduledACKs:
l.ackChans.concat(&chanList)

case <-sig:
case <-nextBatchChan:
ackCount += l.handleBatchSig()
if ackCount > 0 {
ackChan = l.broker.ackChan
}
}

// log.Debug("ackloop INFO")
// log.Debug("ackloop: total events scheduled = ", l.totalSched)
// log.Debug("ackloop: total events ack = ", l.totalACK)
// log.Debug("ackloop: total batches scheduled = ", l.batchesSched)
// log.Debug("ackloop: total batches ack = ", l.batchesACKed)

sig = l.ackChans.channel()
// if l.sig == nil {
// log.Debug("ackloop: no ack scheduled")
// } else {
// log.Debug("ackloop: schedule ack: ", l.lst.head.seq)
// }
}
}

Expand Down
4 changes: 1 addition & 3 deletions libbeat/publisher/queue/memqueue/batchbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package memqueue

import "github.com/elastic/beats/v7/libbeat/publisher"

type queueEntry struct {
event interface{}
client clientState
Expand All @@ -36,7 +34,7 @@ func newBatchBuffer(sz int) *batchBuffer {
return b
}

func (b *batchBuffer) add(event *publisher.Event, st clientState) {
func (b *batchBuffer) add(event interface{}, st clientState) {
b.entries = append(b.entries, queueEntry{event, st})
}

Expand Down
Loading