Skip to content

Commit

Permalink
Allow Bus to buffer events in case listeners are not configured
Browse files Browse the repository at this point in the history
  • Loading branch information
vjsamuel committed Oct 4, 2018
1 parent 4247bc3 commit b5a2df8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Support for Kafka 2.0.0 in kafka output {pull}8399[8399]
- Add setting `setup.kibana.space.id` to support Kibana Spaces {pull}7942[7942]
- Add Beats Central Management {pull}8559[8559]
- Allow Bus to buffer events in case listeners are not configured. {pull}8527[8527]

*Auditbeat*

Expand Down
32 changes: 32 additions & 0 deletions libbeat/common/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type bus struct {
sync.RWMutex
name string
listeners []*listener
store chan Event
}

type listener struct {
Expand All @@ -65,11 +66,42 @@ func New(name string) Bus {
}
}

// NewBusWithStore allows to create a buffered bus when producers send data without
// listeners being subscribed to them. size determines the size of the buffer.
func NewBusWithStore(name string, size int) Bus {
return &bus{
name: name,
listeners: make([]*listener, 0),
store: make(chan Event, size),
}
}

func (b *bus) Publish(e Event) {
b.RLock()
defer b.RUnlock()

logp.Debug("bus", "%s: %+v", b.name, e)
if len(b.listeners) == 0 && b.store != nil {
b.store <- e
return
}

if b.store != nil && len(b.store) != 0 {
doBreak := false
for !doBreak {
select {
case eve := <-b.store:
for _, listener := range b.listeners {
if listener.interested(eve) {
listener.channel <- eve
}
}
default:
doBreak = true
}
}
}

for _, listener := range b.listeners {
if listener.interested(e) {
listener.channel <- e
Expand Down
16 changes: 16 additions & 0 deletions libbeat/common/bus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,19 @@ func TestListenerClose(t *testing.T) {
event = <-listener.Events()
assert.Equal(t, event, Event(nil))
}

func TestUnsubscribedBus(t *testing.T) {
bus := NewBusWithStore("name", 2)
bus.Publish(Event{"first": "event"})

listener := bus.Subscribe()
bus.Publish(Event{"second": "event"})
event := <-listener.Events()
event1 := <-listener.Events()
assert.Equal(t, event, Event{"first": "event"})
assert.Equal(t, event1, Event{"second": "event"})

bus.Publish(Event{"a": 1, "b": 2})
event2 := <-listener.Events()
assert.Equal(t, event2, Event{"a": 1, "b": 2})
}

0 comments on commit b5a2df8

Please sign in to comment.