Skip to content

Commit

Permalink
Fix issues with output reloading (#17381)
Browse files Browse the repository at this point in the history
* Refactoring: extracting common fields into worker struct

* More refactoring

* Address goroutine leak in publisher

* Workaround: add Connection: close header to prevent FD leaks

* Adding CHANGELOG entry

* Adding IdleConnTimeout setting

* Close idle connections when ES client is closed

* When closing worker, make sure to cancel in-flight batches

* Cancel batch + guard

* [WIP] Adding output reload test

* More WIP

* Update test

* Try to get test passing for client first

* Make workqueue shared

* Making tests pass

* Clean up

* Moving SeedFlag var to correct place

* Clarifying comments

* Reducing the number of quick iterations

* Reducing quick iterations even more

* Trying just 1 iteration

* Setting out to nil after sending batch if paused

* Restoring old order of operations in Set()

* proposal

* Do not copy mutex

* Remove debugging statements

* Bumping up testing/quick max count on TestOutputReload

* Removing commented out helper function

* Simplifying retryer now that workqueue is used across output loads

* Renaming parameter

* Removing debugging variable

* Reducing lower bound of random # of batches

* Removing sigUnWait from controller

* Removing unused field

Co-authored-by: urso <steffen.siering@elastic.co>
  • Loading branch information
ycombinator and urso authored Apr 21, 2020
1 parent 96e8e12 commit 5cc5654
Show file tree
Hide file tree
Showing 10 changed files with 540 additions and 257 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix bug with `monitoring.cluster_uuid` setting not always being exposed via GET /state Beats API. {issue}16732[16732] {pull}17420[17420]
- Fix building on FreeBSD by removing build flags from `add_cloudfoundry_metadata` processor. {pull}17486[17486]
- Do not rotate log files on startup when interval is configured and rotateonstartup is disabled. {pull}17613[17613]
- Fix goroutine leak and Elasticsearch output file descriptor leak when output reloading is in use. {issue}10491[10491] {pull}17381[17381]

*Auditbeat*

Expand Down
17 changes: 16 additions & 1 deletion libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ type ConnectionSettings struct {
Parameters map[string]string
CompressionLevel int
EscapeHTML bool
Timeout time.Duration

Timeout time.Duration
IdleConnTimeout time.Duration
}

// NewConnection returns a new Elasticsearch client
func NewConnection(s ConnectionSettings) (*Connection, error) {
s = settingsWithDefaults(s)

u, err := url.Parse(s.URL)
if err != nil {
return nil, fmt.Errorf("failed to parse elasticsearch URL: %v", err)
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
DialTLS: tlsDialer.Dial,
TLSClientConfig: s.TLS.ToConfig(),
Proxy: proxy,
IdleConnTimeout: s.IdleConnTimeout,
},
Timeout: s.Timeout,
},
Expand All @@ -132,6 +137,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
}, nil
}

func settingsWithDefaults(s ConnectionSettings) ConnectionSettings {
settings := s
if settings.IdleConnTimeout == 0 {
settings.IdleConnTimeout = 1 * time.Minute
}

return settings
}

// NewClients returns a list of Elasticsearch clients based on the given
// configuration. It accepts the same configuration parameters as the Elasticsearch
// output, except for the output specific configuration options. If multiple hosts
Expand Down Expand Up @@ -266,6 +280,7 @@ func (conn *Connection) Ping() (string, error) {

// Close closes a connection.
func (conn *Connection) Close() error {
conn.HTTP.CloseIdleConnections()
return nil
}

Expand Down
70 changes: 54 additions & 16 deletions libbeat/publisher/pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

type Batch struct {
type Batch interface {
publisher.Batch

reduceTTL() bool
}

type batch struct {
original queue.Batch
ctx *batchContext
ttl int
Expand All @@ -38,17 +44,17 @@ type batchContext struct {

var batchPool = sync.Pool{
New: func() interface{} {
return &Batch{}
return &batch{}
},
}

func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch {
func newBatch(ctx *batchContext, original queue.Batch, ttl int) *batch {
if original == nil {
panic("empty batch")
}

b := batchPool.Get().(*Batch)
*b = Batch{
b := batchPool.Get().(*batch)
*b = batch{
original: original,
ctx: ctx,
ttl: ttl,
Expand All @@ -57,45 +63,47 @@ func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch {
return b
}

func releaseBatch(b *Batch) {
*b = Batch{} // clear batch
func releaseBatch(b *batch) {
*b = batch{} // clear batch
batchPool.Put(b)
}

func (b *Batch) Events() []publisher.Event {
func (b *batch) Events() []publisher.Event {
return b.events
}

func (b *Batch) ACK() {
b.ctx.observer.outBatchACKed(len(b.events))
func (b *batch) ACK() {
if b.ctx != nil {
b.ctx.observer.outBatchACKed(len(b.events))
}
b.original.ACK()
releaseBatch(b)
}

func (b *Batch) Drop() {
func (b *batch) Drop() {
b.original.ACK()
releaseBatch(b)
}

func (b *Batch) Retry() {
func (b *batch) Retry() {
b.ctx.retryer.retry(b)
}

func (b *Batch) Cancelled() {
func (b *batch) Cancelled() {
b.ctx.retryer.cancelled(b)
}

func (b *Batch) RetryEvents(events []publisher.Event) {
func (b *batch) RetryEvents(events []publisher.Event) {
b.updEvents(events)
b.Retry()
}

func (b *Batch) CancelledEvents(events []publisher.Event) {
func (b *batch) CancelledEvents(events []publisher.Event) {
b.updEvents(events)
b.Cancelled()
}

func (b *Batch) updEvents(events []publisher.Event) {
func (b *batch) updEvents(events []publisher.Event) {
l1 := len(b.events)
l2 := len(events)
if l1 > l2 {
Expand All @@ -105,3 +113,33 @@ func (b *Batch) updEvents(events []publisher.Event) {

b.events = events
}

// reduceTTL reduces the time to live for all events that have no 'guaranteed'
// sending requirements. reduceTTL returns true if the batch is still alive.
func (b *batch) reduceTTL() bool {
if b.ttl <= 0 {
return true
}

b.ttl--
if b.ttl > 0 {
return true
}

// filter for evens with guaranteed send flags
events := b.events[:0]
for _, event := range b.events {
if event.Guaranteed() {
events = append(events, event)
}
}
b.events = events

if len(b.events) > 0 {
b.ttl = -1 // we need infinite retry for all events left in this batch
return true
}

// all events have been dropped:
return false
}
7 changes: 5 additions & 2 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {

var (
out workQueue
batch *Batch
batch Batch
paused = true
)

Expand All @@ -154,7 +154,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
}

paused = c.paused()
if !paused && c.out != nil && batch != nil {
if c.out != nil && batch != nil {
out = c.out.workQueue
} else {
out = nil
Expand Down Expand Up @@ -195,6 +195,9 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
handleSignal(sig)
case out <- batch:
batch = nil
if paused {
out = nil
}
}
}
}
Expand Down
33 changes: 17 additions & 16 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand All @@ -34,7 +35,8 @@ type outputController struct {
monitors Monitors
observer outputObserver

queue queue.Queue
queue queue.Queue
workQueue workQueue

retryer *retryer
consumer *eventConsumer
Expand All @@ -50,7 +52,7 @@ type outputGroup struct {
timeToLive int // event lifetime
}

type workQueue chan *Batch
type workQueue chan publisher.Batch

// outputWorker instances pass events from the shared workQueue to the outputs.Client
// instances.
Expand All @@ -62,18 +64,19 @@ func newOutputController(
beat beat.Info,
monitors Monitors,
observer outputObserver,
b queue.Queue,
queue queue.Queue,
) *outputController {
c := &outputController{
beat: beat,
monitors: monitors,
observer: observer,
queue: b,
beat: beat,
monitors: monitors,
observer: observer,
queue: queue,
workQueue: makeWorkQueue(),
}

ctx := &batchContext{}
c.consumer = newEventConsumer(monitors.Logger, b, ctx)
c.retryer = newRetryer(monitors.Logger, observer, nil, c.consumer)
c.consumer = newEventConsumer(monitors.Logger, queue, ctx)
c.retryer = newRetryer(monitors.Logger, observer, c.workQueue, c.consumer)
ctx.observer = observer
ctx.retryer = c.retryer

Expand All @@ -86,27 +89,26 @@ func (c *outputController) Close() error {
c.consumer.sigPause()
c.consumer.close()
c.retryer.close()
close(c.workQueue)

if c.out != nil {
for _, out := range c.out.outputs {
out.Close()
}
close(c.out.workQueue)
}

return nil
}

func (c *outputController) Set(outGrp outputs.Group) {
// create new outputGroup with shared work queue
// create new output group with the shared work queue
clients := outGrp.Clients
queue := makeWorkQueue()
worker := make([]outputWorker, len(clients))
for i, client := range clients {
worker[i] = makeClientWorker(c.observer, queue, client)
worker[i] = makeClientWorker(c.observer, c.workQueue, client)
}
grp := &outputGroup{
workQueue: queue,
workQueue: c.workQueue,
outputs: worker,
timeToLive: outGrp.Retry + 1,
batchSize: outGrp.BatchSize,
Expand All @@ -119,7 +121,6 @@ func (c *outputController) Set(outGrp outputs.Group) {
c.retryer.sigOutputRemoved()
}
}
c.retryer.updOutput(queue)
for range clients {
c.retryer.sigOutputAdded()
}
Expand All @@ -141,7 +142,7 @@ func (c *outputController) Set(outGrp outputs.Group) {
}

func makeWorkQueue() workQueue {
return workQueue(make(chan *Batch, 0))
return workQueue(make(chan publisher.Batch, 0))
}

// Reload the output
Expand Down
Loading

0 comments on commit 5cc5654

Please sign in to comment.