Skip to content

Commit

Permalink
Merge pull request elastic#114 from urso/enh/lumberjack-failover
Browse files Browse the repository at this point in the history
enhance lumberjack send mode
  • Loading branch information
andrewkroh committed Sep 25, 2015
2 parents af5634f + 8be6bab commit 2db3fb2
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 53 deletions.
13 changes: 7 additions & 6 deletions outputs/lumberjack/lumberjack.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,20 @@ func (lj *lumberjack) init(
return err
}

sendRetries := defaultSendRetries
if config.Max_retries != nil {
sendRetries = *config.Max_retries
}

var mode ConnectionMode
if len(clients) == 1 {
mode, err = newSingleConnectionMode(clients[0], waitRetry, timeout)
mode, err = newSingleConnectionMode(clients[0], sendRetries, waitRetry, timeout)
} else {
loadBalance := config.LoadBalance == nil || *config.LoadBalance
if loadBalance {
sendRetries := defaultSendRetries
if config.Max_retries != nil {
sendRetries = *config.Max_retries
}
mode, err = newLoadBalancerMode(clients, sendRetries, waitRetry, timeout)
} else {
mode, err = newFailOverConnectionMode(clients, waitRetry, timeout)
mode, err = newFailOverConnectionMode(clients, sendRetries, waitRetry, timeout)
}
}
if err != nil {
Expand Down
134 changes: 88 additions & 46 deletions outputs/lumberjack/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type singleConnectionMode struct {

timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until reconnect

// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
maxAttempts int
}

// failOverConnectionMode connects to at most one host by random and swap to
Expand All @@ -80,6 +84,10 @@ type failOverConnectionMode struct {

timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until trying a new connection

// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
maxAttempts int
}

// loadBalancerMode balances the sending of events between multiple connections.
Expand Down Expand Up @@ -108,14 +116,23 @@ type loadBalancerMode struct {
timeout time.Duration // send/retry timeout. Every timeout is a failed send attempt
waitRetry time.Duration // duration to wait during re-connection attempts

maxAttempts int // maximum number of configured send attempts
// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
maxAttempts int

// waitGroup + signaling channel for handling shutdown
wg sync.WaitGroup
done chan struct{}

work chan eventsMessage // work channel with new events to be published
retries chan eventsMessage // work channel for fail send attempts being forwarded to other workers
// channels for forwarding work items to workers.
// The work channel is used by publisher to insert new events
// into the load balancer. The work channel is synchronous blocking until timeout
// for one worker available.
// The retries channel is used to forward failed send attempts to other workers.
// The retries channel is buffered to mitigate possible deadlocks when all
// workers become unresponsive.
work chan eventsMessage
retries chan eventsMessage
}

type eventsMessage struct {
Expand All @@ -133,12 +150,13 @@ var (

func newSingleConnectionMode(
client ProtocolClient,
waitRetry time.Duration,
timeout time.Duration,
maxAttempts int,
waitRetry, timeout time.Duration,
) (*singleConnectionMode, error) {
s := &singleConnectionMode{
timeout: timeout,
conn: client,
timeout: timeout,
conn: client,
maxAttempts: maxAttempts,
}

_ = s.Connect() // try to connect, but ignore errors for now
Expand All @@ -162,8 +180,10 @@ func (s *singleConnectionMode) PublishEvents(
events []common.MapStr,
) error {
published := 0
for !s.closed {
fails := 0
for !s.closed && (s.maxAttempts == 0 || fails < s.maxAttempts) {
if err := s.Connect(); err != nil {
fails++
time.Sleep(s.waitRetry)
continue
}
Expand All @@ -174,13 +194,17 @@ func (s *singleConnectionMode) PublishEvents(
break
}

fails = 0
published += n
}

if published == len(events) {
outputs.SignalCompleted(trans)
return nil
}

time.Sleep(s.waitRetry)
fails++
}

outputs.SignalFailed(trans)
Expand All @@ -189,13 +213,14 @@ func (s *singleConnectionMode) PublishEvents(

func newFailOverConnectionMode(
clients []ProtocolClient,
waitRetry time.Duration,
timeout time.Duration,
maxAttempts int,
waitRetry, timeout time.Duration,
) (*failOverConnectionMode, error) {
f := &failOverConnectionMode{
conns: clients,
timeout: timeout,
waitRetry: waitRetry,
conns: clients,
timeout: timeout,
waitRetry: waitRetry,
maxAttempts: maxAttempts,
}

// Try to connect preliminary, but ignore errors for now.
Expand Down Expand Up @@ -264,8 +289,11 @@ func (f *failOverConnectionMode) PublishEvents(
events []common.MapStr,
) error {
published := 0
for !f.closed {
fails := 0
for !f.closed && (f.maxAttempts == 0 || fails < f.maxAttempts) {
if err := f.Connect(f.active); err != nil {
fails++
time.Sleep(f.waitRetry)
continue
}

Expand All @@ -274,14 +302,7 @@ func (f *failOverConnectionMode) PublishEvents(
conn := f.conns[f.active]
n, err := conn.PublishEvents(events[published:])
if err != nil {
// TODO(sissel): Track how frequently we timeout and reconnect. If we're
// timing out too frequently, there's really no point in timing out since
// basically everything is slow or down. We'll want to ratchet up the
// timeout value slowly until things improve, then ratchet it down once
// things seem healthy.
time.Sleep(f.waitRetry)

continue
break
}
published += n
}
Expand All @@ -290,6 +311,14 @@ func (f *failOverConnectionMode) PublishEvents(
outputs.SignalCompleted(trans)
return nil
}

// TODO(sissel): Track how frequently we timeout and reconnect. If we're
// timing out too frequently, there's really no point in timing out since
// basically everything is slow or down. We'll want to ratchet up the
// timeout value slowly until things improve, then ratchet it down once
// things seem healthy.
time.Sleep(f.waitRetry)
fails++
}

outputs.SignalFailed(trans)
Expand All @@ -307,7 +336,7 @@ func newLoadBalancerMode(
maxAttempts: maxAttempts,

work: make(chan eventsMessage),
retries: make(chan eventsMessage),
retries: make(chan eventsMessage, len(clients)*2),
done: make(chan struct{}),
}
m.start(clients)
Expand Down Expand Up @@ -363,37 +392,29 @@ func (m *loadBalancerMode) start(clients []ProtocolClient) {
func (m *loadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
published := 0
events := msg.events
send := 0
for published < len(events) {
n, err := client.PublishEvents(events[published:])
if err != nil {
// retry only non-confirmed subset of events in batch
msg.events = msg.events[published:]

// reset attempt count if subset of message has been send
if send > 0 {
msg.attemptsLeft = m.maxAttempts + 1
}
m.onFail(msg)
return
}
published += n
send++
}
outputs.SignalCompleted(msg.signaler)
}

func (m *loadBalancerMode) onFail(msg eventsMessage) {
for {
msg.attemptsLeft--
if msg.attemptsLeft <= 0 {
outputs.SignalFailed(msg.signaler)
return
}

select {
case <-m.done:
// shutting down -> mark message as failed and return
outputs.SignalFailed(msg.signaler)
return
case m.retries <- msg: // forward message to another worker
return
case <-time.After(m.timeout):
// another failed send
}
if ok := m.forwardEvent(m.retries, msg); !ok {
outputs.SignalFailed(msg.signaler)
}
}

Expand All @@ -413,12 +434,33 @@ func (m *loadBalancerMode) PublishEvents(
events: events,
}

select {
case m.work <- msg:
case <-time.After(m.timeout):
// failed send attempt if no worker is available to pick up message
// within configured time limit.
m.onFail(msg)
if ok := m.forwardEvent(m.work, msg); !ok {
outputs.SignalFailed(msg.signaler)
}
return nil
}

func (m *loadBalancerMode) forwardEvent(
ch chan eventsMessage,
msg eventsMessage,
) bool {
if m.maxAttempts == 0 {
select {
case ch <- msg:
return true
case <-m.done: // shutdown
return false
}
} else {
for ; msg.attemptsLeft > 0; msg.attemptsLeft-- {
select {
case ch <- msg:
return true
case <-m.done: // shutdown
return false
case <-time.After(m.timeout):
}
}
}
return false
}
6 changes: 5 additions & 1 deletion outputs/lumberjack/modes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestSingleSend(t *testing.T) {
connect: connectOK,
publish: collectPublish(&collected),
},
3,
0,
100*time.Millisecond,
)
Expand All @@ -181,9 +182,10 @@ func TestSingleConnectFailConnect(t *testing.T) {
&mockClient{
connected: false,
close: closeOK,
connect: failConnect(5, errFail),
connect: failConnect(2, errFail),
publish: collectPublish(&collected),
},
3,
0,
100*time.Millisecond,
)
Expand All @@ -207,6 +209,7 @@ func TestFailoverSingleSend(t *testing.T) {
publish: collectPublish(&collected),
},
},
3,
0,
100*time.Millisecond,
)
Expand All @@ -233,6 +236,7 @@ func TestFailoverFlakyConnections(t *testing.T) {
publish: publishTimeoutEvery(2, collectPublish(&collected)),
},
},
3,
1*time.Millisecond,
100*time.Millisecond,
)
Expand Down

0 comments on commit 2db3fb2

Please sign in to comment.