diff --git a/outputs/lumberjack/lumberjack.go b/outputs/lumberjack/lumberjack.go index e014c80057b..49b8aedbfcd 100644 --- a/outputs/lumberjack/lumberjack.go +++ b/outputs/lumberjack/lumberjack.go @@ -86,19 +86,20 @@ func (lj *lumberjack) init( return err } + sendRetries := defaultSendRetries + if config.Max_retries != nil { + sendRetries = *config.Max_retries + } + var mode ConnectionMode if len(clients) == 1 { - mode, err = newSingleConnectionMode(clients[0], waitRetry, timeout) + mode, err = newSingleConnectionMode(clients[0], sendRetries, waitRetry, timeout) } else { loadBalance := config.LoadBalance == nil || *config.LoadBalance if loadBalance { - sendRetries := defaultSendRetries - if config.Max_retries != nil { - sendRetries = *config.Max_retries - } mode, err = newLoadBalancerMode(clients, sendRetries, waitRetry, timeout) } else { - mode, err = newFailOverConnectionMode(clients, waitRetry, timeout) + mode, err = newFailOverConnectionMode(clients, sendRetries, waitRetry, timeout) } } if err != nil { diff --git a/outputs/lumberjack/modes.go b/outputs/lumberjack/modes.go index c0d9f84e8d2..c7a1fe7509a 100644 --- a/outputs/lumberjack/modes.go +++ b/outputs/lumberjack/modes.go @@ -67,6 +67,10 @@ type singleConnectionMode struct { timeout time.Duration // connection timeout waitRetry time.Duration // wait time until reconnect + + // maximum number of configured send attempts. If set to 0, publisher will + // block until event has been successfully published. + maxAttempts int } // failOverConnectionMode connects to at most one host by random and swap to @@ -80,6 +84,10 @@ type failOverConnectionMode struct { timeout time.Duration // connection timeout waitRetry time.Duration // wait time until trying a new connection + + // maximum number of configured send attempts. If set to 0, publisher will + // block until event has been successfully published. + maxAttempts int } // loadBalancerMode balances the sending of events between multiple connections. @@ -108,14 +116,23 @@ type loadBalancerMode struct { timeout time.Duration // send/retry timeout. Every timeout is a failed send attempt waitRetry time.Duration // duration to wait during re-connection attempts - maxAttempts int // maximum number of configured send attempts + // maximum number of configured send attempts. If set to 0, publisher will + // block until event has been successfully published. + maxAttempts int // waitGroup + signaling channel for handling shutdown wg sync.WaitGroup done chan struct{} - work chan eventsMessage // work channel with new events to be published - retries chan eventsMessage // work channel for fail send attempts being forwarded to other workers + // channels for forwarding work items to workers. + // The work channel is used by publisher to insert new events + // into the load balancer. The work channel is synchronous blocking until timeout + // for one worker available. + // The retries channel is used to forward failed send attempts to other workers. + // The retries channel is buffered to mitigate possible deadlocks when all + // workers become unresponsive. + work chan eventsMessage + retries chan eventsMessage } type eventsMessage struct { @@ -133,12 +150,13 @@ var ( func newSingleConnectionMode( client ProtocolClient, - waitRetry time.Duration, - timeout time.Duration, + maxAttempts int, + waitRetry, timeout time.Duration, ) (*singleConnectionMode, error) { s := &singleConnectionMode{ - timeout: timeout, - conn: client, + timeout: timeout, + conn: client, + maxAttempts: maxAttempts, } _ = s.Connect() // try to connect, but ignore errors for now @@ -162,8 +180,10 @@ func (s *singleConnectionMode) PublishEvents( events []common.MapStr, ) error { published := 0 - for !s.closed { + fails := 0 + for !s.closed && (s.maxAttempts == 0 || fails < s.maxAttempts) { if err := s.Connect(); err != nil { + fails++ time.Sleep(s.waitRetry) continue } @@ -174,6 +194,7 @@ func (s *singleConnectionMode) PublishEvents( break } + fails = 0 published += n } @@ -181,6 +202,9 @@ func (s *singleConnectionMode) PublishEvents( outputs.SignalCompleted(trans) return nil } + + time.Sleep(s.waitRetry) + fails++ } outputs.SignalFailed(trans) @@ -189,13 +213,14 @@ func (s *singleConnectionMode) PublishEvents( func newFailOverConnectionMode( clients []ProtocolClient, - waitRetry time.Duration, - timeout time.Duration, + maxAttempts int, + waitRetry, timeout time.Duration, ) (*failOverConnectionMode, error) { f := &failOverConnectionMode{ - conns: clients, - timeout: timeout, - waitRetry: waitRetry, + conns: clients, + timeout: timeout, + waitRetry: waitRetry, + maxAttempts: maxAttempts, } // Try to connect preliminary, but ignore errors for now. @@ -264,8 +289,11 @@ func (f *failOverConnectionMode) PublishEvents( events []common.MapStr, ) error { published := 0 - for !f.closed { + fails := 0 + for !f.closed && (f.maxAttempts == 0 || fails < f.maxAttempts) { if err := f.Connect(f.active); err != nil { + fails++ + time.Sleep(f.waitRetry) continue } @@ -274,14 +302,7 @@ func (f *failOverConnectionMode) PublishEvents( conn := f.conns[f.active] n, err := conn.PublishEvents(events[published:]) if err != nil { - // TODO(sissel): Track how frequently we timeout and reconnect. If we're - // timing out too frequently, there's really no point in timing out since - // basically everything is slow or down. We'll want to ratchet up the - // timeout value slowly until things improve, then ratchet it down once - // things seem healthy. - time.Sleep(f.waitRetry) - - continue + break } published += n } @@ -290,6 +311,14 @@ func (f *failOverConnectionMode) PublishEvents( outputs.SignalCompleted(trans) return nil } + + // TODO(sissel): Track how frequently we timeout and reconnect. If we're + // timing out too frequently, there's really no point in timing out since + // basically everything is slow or down. We'll want to ratchet up the + // timeout value slowly until things improve, then ratchet it down once + // things seem healthy. + time.Sleep(f.waitRetry) + fails++ } outputs.SignalFailed(trans) @@ -307,7 +336,7 @@ func newLoadBalancerMode( maxAttempts: maxAttempts, work: make(chan eventsMessage), - retries: make(chan eventsMessage), + retries: make(chan eventsMessage, len(clients)*2), done: make(chan struct{}), } m.start(clients) @@ -363,37 +392,29 @@ func (m *loadBalancerMode) start(clients []ProtocolClient) { func (m *loadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) { published := 0 events := msg.events + send := 0 for published < len(events) { n, err := client.PublishEvents(events[published:]) if err != nil { // retry only non-confirmed subset of events in batch msg.events = msg.events[published:] + + // reset attempt count if subset of message has been send + if send > 0 { + msg.attemptsLeft = m.maxAttempts + 1 + } m.onFail(msg) return } published += n + send++ } outputs.SignalCompleted(msg.signaler) } func (m *loadBalancerMode) onFail(msg eventsMessage) { - for { - msg.attemptsLeft-- - if msg.attemptsLeft <= 0 { - outputs.SignalFailed(msg.signaler) - return - } - - select { - case <-m.done: - // shutting down -> mark message as failed and return - outputs.SignalFailed(msg.signaler) - return - case m.retries <- msg: // forward message to another worker - return - case <-time.After(m.timeout): - // another failed send - } + if ok := m.forwardEvent(m.retries, msg); !ok { + outputs.SignalFailed(msg.signaler) } } @@ -413,12 +434,33 @@ func (m *loadBalancerMode) PublishEvents( events: events, } - select { - case m.work <- msg: - case <-time.After(m.timeout): - // failed send attempt if no worker is available to pick up message - // within configured time limit. - m.onFail(msg) + if ok := m.forwardEvent(m.work, msg); !ok { + outputs.SignalFailed(msg.signaler) } return nil } + +func (m *loadBalancerMode) forwardEvent( + ch chan eventsMessage, + msg eventsMessage, +) bool { + if m.maxAttempts == 0 { + select { + case ch <- msg: + return true + case <-m.done: // shutdown + return false + } + } else { + for ; msg.attemptsLeft > 0; msg.attemptsLeft-- { + select { + case ch <- msg: + return true + case <-m.done: // shutdown + return false + case <-time.After(m.timeout): + } + } + } + return false +} diff --git a/outputs/lumberjack/modes_test.go b/outputs/lumberjack/modes_test.go index ba47de7929a..1a7c3ea675f 100644 --- a/outputs/lumberjack/modes_test.go +++ b/outputs/lumberjack/modes_test.go @@ -168,6 +168,7 @@ func TestSingleSend(t *testing.T) { connect: connectOK, publish: collectPublish(&collected), }, + 3, 0, 100*time.Millisecond, ) @@ -181,9 +182,10 @@ func TestSingleConnectFailConnect(t *testing.T) { &mockClient{ connected: false, close: closeOK, - connect: failConnect(5, errFail), + connect: failConnect(2, errFail), publish: collectPublish(&collected), }, + 3, 0, 100*time.Millisecond, ) @@ -207,6 +209,7 @@ func TestFailoverSingleSend(t *testing.T) { publish: collectPublish(&collected), }, }, + 3, 0, 100*time.Millisecond, ) @@ -233,6 +236,7 @@ func TestFailoverFlakyConnections(t *testing.T) { publish: publishTimeoutEvery(2, collectPublish(&collected)), }, }, + 3, 1*time.Millisecond, 100*time.Millisecond, )