From aa544ea4eb992097df0fd77f8ef3bcb9492c86ba Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 25 Sep 2015 16:54:24 +0200 Subject: [PATCH 1/3] singleConnection mode drop after max send attempts Add support to drop message after failing multiple times to singleConnectionMode. The send failure is reported back to the publisher, for the publisher to decide to start a new attempt or fully drop the message. --- outputs/lumberjack/lumberjack.go | 11 ++++++----- outputs/lumberjack/modes.go | 18 +++++++++++------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/outputs/lumberjack/lumberjack.go b/outputs/lumberjack/lumberjack.go index e014c80057b..4d8e0d93842 100644 --- a/outputs/lumberjack/lumberjack.go +++ b/outputs/lumberjack/lumberjack.go @@ -86,16 +86,17 @@ func (lj *lumberjack) init( return err } + sendRetries := defaultSendRetries + if config.Max_retries != nil { + sendRetries = *config.Max_retries + } + var mode ConnectionMode if len(clients) == 1 { - mode, err = newSingleConnectionMode(clients[0], waitRetry, timeout) + mode, err = newSingleConnectionMode(clients[0], sendRetries, waitRetry, timeout) } else { loadBalance := config.LoadBalance == nil || *config.LoadBalance if loadBalance { - sendRetries := defaultSendRetries - if config.Max_retries != nil { - sendRetries = *config.Max_retries - } mode, err = newLoadBalancerMode(clients, sendRetries, waitRetry, timeout) } else { mode, err = newFailOverConnectionMode(clients, waitRetry, timeout) diff --git a/outputs/lumberjack/modes.go b/outputs/lumberjack/modes.go index c0d9f84e8d2..c987988a83f 100644 --- a/outputs/lumberjack/modes.go +++ b/outputs/lumberjack/modes.go @@ -65,8 +65,9 @@ type singleConnectionMode struct { closed bool // mode closed flag to break publisher loop - timeout time.Duration // connection timeout - waitRetry time.Duration // wait time until reconnect + timeout time.Duration // connection timeout + waitRetry time.Duration // wait time until reconnect + maxAttempts int // maximum number of configured send attempts } // failOverConnectionMode connects to at most one host by random and swap to @@ -133,12 +134,13 @@ var ( func newSingleConnectionMode( client ProtocolClient, - waitRetry time.Duration, - timeout time.Duration, + maxAttempts int, + waitRetry, timeout time.Duration, ) (*singleConnectionMode, error) { s := &singleConnectionMode{ - timeout: timeout, - conn: client, + timeout: timeout, + conn: client, + maxAttempts: maxAttempts, } _ = s.Connect() // try to connect, but ignore errors for now @@ -162,7 +164,8 @@ func (s *singleConnectionMode) PublishEvents( events []common.MapStr, ) error { published := 0 - for !s.closed { + fails := 0 + for !s.closed && (s.maxAttempts == 0 || fails < s.maxAttempts) { if err := s.Connect(); err != nil { time.Sleep(s.waitRetry) continue @@ -174,6 +177,7 @@ func (s *singleConnectionMode) PublishEvents( break } + fails = 0 published += n } From 32eb4a1db203e2e2124466ffba468c6f91c4e777 Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 25 Sep 2015 17:30:16 +0200 Subject: [PATCH 2/3] add limited send attempts to fail-over mode --- outputs/lumberjack/lumberjack.go | 2 +- outputs/lumberjack/modes.go | 42 ++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/outputs/lumberjack/lumberjack.go b/outputs/lumberjack/lumberjack.go index 4d8e0d93842..49b8aedbfcd 100644 --- a/outputs/lumberjack/lumberjack.go +++ b/outputs/lumberjack/lumberjack.go @@ -99,7 +99,7 @@ func (lj *lumberjack) init( if loadBalance { mode, err = newLoadBalancerMode(clients, sendRetries, waitRetry, timeout) } else { - mode, err = newFailOverConnectionMode(clients, waitRetry, timeout) + mode, err = newFailOverConnectionMode(clients, sendRetries, waitRetry, timeout) } } if err != nil { diff --git a/outputs/lumberjack/modes.go b/outputs/lumberjack/modes.go index c987988a83f..204b5870074 100644 --- a/outputs/lumberjack/modes.go +++ b/outputs/lumberjack/modes.go @@ -79,8 +79,9 @@ type failOverConnectionMode struct { closed bool // mode closed flag to break publisher loop - timeout time.Duration // connection timeout - waitRetry time.Duration // wait time until trying a new connection + timeout time.Duration // connection timeout + waitRetry time.Duration // wait time until trying a new connection + maxAttempts int // maximum number of configured send attempts } // loadBalancerMode balances the sending of events between multiple connections. @@ -167,6 +168,7 @@ func (s *singleConnectionMode) PublishEvents( fails := 0 for !s.closed && (s.maxAttempts == 0 || fails < s.maxAttempts) { if err := s.Connect(); err != nil { + fails++ time.Sleep(s.waitRetry) continue } @@ -185,6 +187,9 @@ func (s *singleConnectionMode) PublishEvents( outputs.SignalCompleted(trans) return nil } + + time.Sleep(s.waitRetry) + fails++ } outputs.SignalFailed(trans) @@ -193,13 +198,14 @@ func (s *singleConnectionMode) PublishEvents( func newFailOverConnectionMode( clients []ProtocolClient, - waitRetry time.Duration, - timeout time.Duration, + maxAttempts int, + waitRetry, timeout time.Duration, ) (*failOverConnectionMode, error) { f := &failOverConnectionMode{ - conns: clients, - timeout: timeout, - waitRetry: waitRetry, + conns: clients, + timeout: timeout, + waitRetry: waitRetry, + maxAttempts: maxAttempts, } // Try to connect preliminary, but ignore errors for now. @@ -268,8 +274,11 @@ func (f *failOverConnectionMode) PublishEvents( events []common.MapStr, ) error { published := 0 - for !f.closed { + fails := 0 + for !f.closed && (f.maxAttempts == 0 || fails < f.maxAttempts) { if err := f.Connect(f.active); err != nil { + fails++ + time.Sleep(f.waitRetry) continue } @@ -278,14 +287,7 @@ func (f *failOverConnectionMode) PublishEvents( conn := f.conns[f.active] n, err := conn.PublishEvents(events[published:]) if err != nil { - // TODO(sissel): Track how frequently we timeout and reconnect. If we're - // timing out too frequently, there's really no point in timing out since - // basically everything is slow or down. We'll want to ratchet up the - // timeout value slowly until things improve, then ratchet it down once - // things seem healthy. - time.Sleep(f.waitRetry) - - continue + break } published += n } @@ -294,6 +296,14 @@ func (f *failOverConnectionMode) PublishEvents( outputs.SignalCompleted(trans) return nil } + + // TODO(sissel): Track how frequently we timeout and reconnect. If we're + // timing out too frequently, there's really no point in timing out since + // basically everything is slow or down. We'll want to ratchet up the + // timeout value slowly until things improve, then ratchet it down once + // things seem healthy. + time.Sleep(f.waitRetry) + fails++ } outputs.SignalFailed(trans) From 8be6bab7fe5e66329c37cd402ddaee73b68bdd2a Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 25 Sep 2015 18:28:30 +0200 Subject: [PATCH 3/3] streamline lumberjack mode behavior on send failure have all send modes behave the same on failed send attempts: - drop message if maximum number of attempts reached - block infinitely (until closed) if max attempts is set to 0 Fix possible deadlock in load balancer when all workers start failing at same time by buffering retry messages. --- outputs/lumberjack/modes.go | 94 +++++++++++++++++++++----------- outputs/lumberjack/modes_test.go | 6 +- 2 files changed, 66 insertions(+), 34 deletions(-) diff --git a/outputs/lumberjack/modes.go b/outputs/lumberjack/modes.go index 204b5870074..c7a1fe7509a 100644 --- a/outputs/lumberjack/modes.go +++ b/outputs/lumberjack/modes.go @@ -65,9 +65,12 @@ type singleConnectionMode struct { closed bool // mode closed flag to break publisher loop - timeout time.Duration // connection timeout - waitRetry time.Duration // wait time until reconnect - maxAttempts int // maximum number of configured send attempts + timeout time.Duration // connection timeout + waitRetry time.Duration // wait time until reconnect + + // maximum number of configured send attempts. If set to 0, publisher will + // block until event has been successfully published. + maxAttempts int } // failOverConnectionMode connects to at most one host by random and swap to @@ -79,9 +82,12 @@ type failOverConnectionMode struct { closed bool // mode closed flag to break publisher loop - timeout time.Duration // connection timeout - waitRetry time.Duration // wait time until trying a new connection - maxAttempts int // maximum number of configured send attempts + timeout time.Duration // connection timeout + waitRetry time.Duration // wait time until trying a new connection + + // maximum number of configured send attempts. If set to 0, publisher will + // block until event has been successfully published. + maxAttempts int } // loadBalancerMode balances the sending of events between multiple connections. @@ -110,14 +116,23 @@ type loadBalancerMode struct { timeout time.Duration // send/retry timeout. Every timeout is a failed send attempt waitRetry time.Duration // duration to wait during re-connection attempts - maxAttempts int // maximum number of configured send attempts + // maximum number of configured send attempts. If set to 0, publisher will + // block until event has been successfully published. + maxAttempts int // waitGroup + signaling channel for handling shutdown wg sync.WaitGroup done chan struct{} - work chan eventsMessage // work channel with new events to be published - retries chan eventsMessage // work channel for fail send attempts being forwarded to other workers + // channels for forwarding work items to workers. + // The work channel is used by publisher to insert new events + // into the load balancer. The work channel is synchronous blocking until timeout + // for one worker available. + // The retries channel is used to forward failed send attempts to other workers. + // The retries channel is buffered to mitigate possible deadlocks when all + // workers become unresponsive. + work chan eventsMessage + retries chan eventsMessage } type eventsMessage struct { @@ -321,7 +336,7 @@ func newLoadBalancerMode( maxAttempts: maxAttempts, work: make(chan eventsMessage), - retries: make(chan eventsMessage), + retries: make(chan eventsMessage, len(clients)*2), done: make(chan struct{}), } m.start(clients) @@ -377,37 +392,29 @@ func (m *loadBalancerMode) start(clients []ProtocolClient) { func (m *loadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) { published := 0 events := msg.events + send := 0 for published < len(events) { n, err := client.PublishEvents(events[published:]) if err != nil { // retry only non-confirmed subset of events in batch msg.events = msg.events[published:] + + // reset attempt count if subset of message has been send + if send > 0 { + msg.attemptsLeft = m.maxAttempts + 1 + } m.onFail(msg) return } published += n + send++ } outputs.SignalCompleted(msg.signaler) } func (m *loadBalancerMode) onFail(msg eventsMessage) { - for { - msg.attemptsLeft-- - if msg.attemptsLeft <= 0 { - outputs.SignalFailed(msg.signaler) - return - } - - select { - case <-m.done: - // shutting down -> mark message as failed and return - outputs.SignalFailed(msg.signaler) - return - case m.retries <- msg: // forward message to another worker - return - case <-time.After(m.timeout): - // another failed send - } + if ok := m.forwardEvent(m.retries, msg); !ok { + outputs.SignalFailed(msg.signaler) } } @@ -427,12 +434,33 @@ func (m *loadBalancerMode) PublishEvents( events: events, } - select { - case m.work <- msg: - case <-time.After(m.timeout): - // failed send attempt if no worker is available to pick up message - // within configured time limit. - m.onFail(msg) + if ok := m.forwardEvent(m.work, msg); !ok { + outputs.SignalFailed(msg.signaler) } return nil } + +func (m *loadBalancerMode) forwardEvent( + ch chan eventsMessage, + msg eventsMessage, +) bool { + if m.maxAttempts == 0 { + select { + case ch <- msg: + return true + case <-m.done: // shutdown + return false + } + } else { + for ; msg.attemptsLeft > 0; msg.attemptsLeft-- { + select { + case ch <- msg: + return true + case <-m.done: // shutdown + return false + case <-time.After(m.timeout): + } + } + } + return false +} diff --git a/outputs/lumberjack/modes_test.go b/outputs/lumberjack/modes_test.go index ba47de7929a..1a7c3ea675f 100644 --- a/outputs/lumberjack/modes_test.go +++ b/outputs/lumberjack/modes_test.go @@ -168,6 +168,7 @@ func TestSingleSend(t *testing.T) { connect: connectOK, publish: collectPublish(&collected), }, + 3, 0, 100*time.Millisecond, ) @@ -181,9 +182,10 @@ func TestSingleConnectFailConnect(t *testing.T) { &mockClient{ connected: false, close: closeOK, - connect: failConnect(5, errFail), + connect: failConnect(2, errFail), publish: collectPublish(&collected), }, + 3, 0, 100*time.Millisecond, ) @@ -207,6 +209,7 @@ func TestFailoverSingleSend(t *testing.T) { publish: collectPublish(&collected), }, }, + 3, 0, 100*time.Millisecond, ) @@ -233,6 +236,7 @@ func TestFailoverFlakyConnections(t *testing.T) { publish: publishTimeoutEvery(2, collectPublish(&collected)), }, }, + 3, 1*time.Millisecond, 100*time.Millisecond, )