Skip to content

Commit

Permalink
Add a sync client (#7934)
Browse files Browse the repository at this point in the history
In beat each data collector need to initialize his own beat.Client to
have access to the pipeline. The current pipeline implementation is
completely asynchronous, meaning when you publish something to the
queue, you don't know if it will be send or when it will be send.

Some system like aws lambda requires to be in sync, when the method
return we expect the events to be send. This PR allow to change the
behavior to have a sync publish that leverage the pipeline callbacks.

Notes: it also changes the client interface, since publish and publishAll can
return an error.

Usage:

```
sc, err := NewSyncClient(pipeline, beat.ClientConfig{})
if !assert.NoError(t, err) {
  return
}

err := sc.PublishAll()
if err != nil {
...
}

sc.Wait() // block until the publish is done.
defer sc.Close() // this call will also block
```
  • Loading branch information
ph committed Oct 24, 2018
1 parent 1bfaa7c commit b22945b
Show file tree
Hide file tree
Showing 2 changed files with 351 additions and 0 deletions.
132 changes: 132 additions & 0 deletions x-pack/beatless/core/sync_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package core

import (
"sync"

"github.com/elastic/beats/libbeat/beat"
)

// Client implements the interface used by all the beatless function, we only implement a synchronous
// client. This interface superseed the core beat.Client interface inside beatless because our publish
// and publishAll methods can return an error.
type Client interface {
// Publish accepts a unique events and will publish it to the pipeline.
Publish(beat.Event) error

// PublishAll accepts a list of multiple events and will publish them to the pipeline.
PublishAll([]beat.Event) error

// Close closes the current client, no events will be accepted, this method can block if we still
// need to ACK on events.
Close()

// Wait blocks until the publisher pipeline send the ACKS for all the events.
Wait()
}

// SyncClient wraps an existing beat.Client and provide a sync interface.
type SyncClient struct {
// Chain callbacks already defined in the original ClientConfig
ackCount func(int)
ackEvents func([]interface{})
ackLastEvent func(interface{})

client beat.Client
wg sync.WaitGroup
}

// NewSyncClient creates a new sync clients from the provided configuration, existing ACKs handlers
// defined in the configuration will be proxied by this object.
func NewSyncClient(pipeline beat.Pipeline, cfg beat.ClientConfig) (*SyncClient, error) {
s := &SyncClient{}

// Proxy any callbacks to the original client.
//
// Notes: it's not supported to have multiple callback defined, but to support any configuration
// we map all of them.
if cfg.ACKCount != nil {
s.ackCount = cfg.ACKCount
cfg.ACKCount = s.onACKCount
}

if cfg.ACKEvents != nil {
s.ackEvents = cfg.ACKEvents
cfg.ACKEvents = s.onACKEvents
}

if cfg.ACKLastEvent != nil {
s.ackLastEvent = cfg.ACKLastEvent
cfg.ACKLastEvent = nil
cfg.ACKEvents = s.onACKEvents
}

// No calls is defined on the target on the config but we still need to track
// the ack to unblock.
hasACK := cfg.ACKCount != nil || cfg.ACKEvents != nil || cfg.ACKLastEvent != nil
if !hasACK {
cfg.ACKCount = s.onACKCount
}

c, err := pipeline.ConnectWith(cfg)
if err != nil {
return nil, err
}

s.client = c

return s, nil
}

// Publish publishes one event to the pipeline and return.
func (s *SyncClient) Publish(event beat.Event) error {
s.wg.Add(1)
s.client.Publish(event)
return nil
}

// PublishAll publish a slice of events to the pipeline and return.
func (s *SyncClient) PublishAll(events []beat.Event) error {
s.wg.Add(len(events))
s.client.PublishAll(events)
return nil
}

// Close closes the wrapped beat.Client.
func (s *SyncClient) Close() error {
s.wg.Wait()
return s.client.Close()
}

// Wait waits until we received a ACK for every events that were sent, this is useful in the
// context of serverless, because when the handler return the execution of the process is suspended.
func (s *SyncClient) Wait() {
s.wg.Wait()
}

// AckEvents receives an array with all the event acked for this client.
func (s *SyncClient) onACKEvents(data []interface{}) {
count := len(data)
if count == 0 {
return
}

s.onACKCount(count)
if s.ackEvents != nil {
s.ackEvents(data)
}

if s.ackLastEvent != nil {
s.ackLastEvent(data[len(data)-1])
}
}

func (s *SyncClient) onACKCount(c int) {
s.wg.Add(c * -1)
if s.ackCount != nil {
s.ackCount(c)
}
}
219 changes: 219 additions & 0 deletions x-pack/beatless/core/sync_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package core

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
)

type dummyClient struct {
Received chan int
}

func newDummyClient() *dummyClient {
return &dummyClient{Received: make(chan int)}
}

func (c *dummyClient) Publish(event beat.Event) {
c.Received <- 1
}

func (c *dummyClient) PublishAll(events []beat.Event) {
c.Received <- len(events)
}

func (c *dummyClient) Close() error {
close(c.Received)
return nil
}

type dummyPipeline struct {
client beat.Client
}

func newDummyPipeline(client beat.Client) *dummyPipeline {
return &dummyPipeline{client: client}
}

func (d *dummyPipeline) Connect() (beat.Client, error) {
return d.client, nil
}

func (d *dummyPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
return d.client, nil
}

func (d *dummyPipeline) SetACKHandler(ackhandler beat.PipelineACKHandler) error {
return nil
}

func TestSyncClient(t *testing.T) {
receiver := func(c *dummyClient, sc *SyncClient) {
select {
case i := <-c.Received:
sc.onACKEvents(make([]interface{}, i))
return
}
}

t.Run("Publish", func(t *testing.T) {
c := newDummyClient()

pipeline := newDummyPipeline(c)
sc, err := NewSyncClient(pipeline, beat.ClientConfig{})
if !assert.NoError(t, err) {
return
}
defer sc.Close()

go receiver(c, sc)

err = sc.Publish(beat.Event{})
if !assert.NoError(t, err) {
return
}
sc.Wait()
})

t.Run("PublishAll single ACK", func(t *testing.T) {
c := newDummyClient()

pipeline := newDummyPipeline(c)
sc, err := NewSyncClient(pipeline, beat.ClientConfig{})
if !assert.NoError(t, err) {
return
}
defer sc.Close()

go receiver(c, sc)

err = sc.PublishAll(make([]beat.Event, 10))
if !assert.NoError(t, err) {
return
}
sc.Wait()
})

t.Run("PublishAll multiple independant ACKs", func(t *testing.T) {
c := newDummyClient()

pipeline := newDummyPipeline(c)
sc, err := NewSyncClient(pipeline, beat.ClientConfig{})
if !assert.NoError(t, err) {
return
}
defer sc.Close()

go func(c *dummyClient, sc *SyncClient) {
select {
case <-c.Received:
// simulate multiple acks
sc.onACKEvents(make([]interface{}, 5))
sc.onACKEvents(make([]interface{}, 5))
return
}
}(c, sc)

err = sc.PublishAll(make([]beat.Event, 10))
if !assert.NoError(t, err) {
return
}
sc.Wait()
})
}

func TestCallbacksPropagation(t *testing.T) {
testCallback := func(done <-chan struct{}, config beat.ClientConfig, events []beat.Event) {
c := newDummyClient()

pipeline := newDummyPipeline(c)
sc, err := NewSyncClient(pipeline, config)
if !assert.NoError(t, err) {
return
}
defer sc.Close()

go func(c *dummyClient, sc *SyncClient, events []beat.Event) {
select {
case <-c.Received:
elements := make([]interface{}, len(events))
for i, e := range events {
elements[i] = e.Private
}
sc.onACKEvents(elements)
return
}
}(c, sc, events)

err = sc.PublishAll(events)
if !assert.NoError(t, err) {
return
}

sc.Wait()
select {
case <-done:
}
}

t.Run("propagate ACKCount", func(t *testing.T) {
done := make(chan struct{})

callback := func(count int) {
assert.Equal(t, 2, count)
close(done)
}

clientConfig := beat.ClientConfig{
ACKCount: callback,
}

testCallback(done, clientConfig, make([]beat.Event, 2))
})

t.Run("propagate ACKEvents", func(t *testing.T) {
done := make(chan struct{})

callback := func(data []interface{}) {
assert.Equal(t, 2, len(data))
close(done)
}

clientConfig := beat.ClientConfig{
ACKEvents: callback,
}

testCallback(done, clientConfig, make([]beat.Event, 2))
})

t.Run("propagate ACKLastEvent", func(t *testing.T) {
done := make(chan struct{})

type s struct{ test string }

semaphore := &s{test: "hello"}

events := []beat.Event{
beat.Event{},
beat.Event{
Private: semaphore,
},
}
callback := func(data interface{}) {
assert.Equal(t, semaphore, data)
close(done)
}

clientConfig := beat.ClientConfig{
ACKLastEvent: callback,
}

testCallback(done, clientConfig, events)
})
}

0 comments on commit b22945b

Please sign in to comment.