Skip to content

Commit

Permalink
Add logstash async unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Feb 9, 2016
1 parent b73155c commit f45ad32
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 27 deletions.
10 changes: 7 additions & 3 deletions libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type asyncClient struct {
Expand Down Expand Up @@ -66,7 +65,7 @@ func newAsyncLumberjackClient(
}

func (c *asyncClient) Connect(timeout time.Duration) error {
logp.Debug("logstash", "connect (async)")
debug("connect (async)")
err := c.TransportClient.Connect(timeout)
if err == nil {
c.startACK()
Expand All @@ -75,7 +74,7 @@ func (c *asyncClient) Connect(timeout time.Duration) error {
}

func (c *asyncClient) Close() error {
logp.Debug("logstash", "close (async) connection")
debug("close (async) connection")
c.stopACK()
return c.closeTransport()
}
Expand Down Expand Up @@ -251,6 +250,7 @@ func (c *asyncClient) startACK() {
}

func (c *asyncClient) stopACK() {
debug("stop ackLoop")
close(c.done)
c.wg.Wait()
close(c.ch)
Expand All @@ -261,6 +261,8 @@ func (c *asyncClient) ackLoop() {
defer c.wg.Done()
defer debug("finished ackLoop")

debug("start ackLoop")

for {
var err error
var msg ackMessage
Expand All @@ -272,6 +274,8 @@ func (c *asyncClient) ackLoop() {
return
}

debug("new ack message")

inPartial := false
switch msg.tag {
case tagComplete:
Expand Down
129 changes: 129 additions & 0 deletions libbeat/outputs/logstash/async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package logstash

import (
"fmt"

"sync"
"testing"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs/mode"
)

type testAsyncDriver struct {
client mode.AsyncProtocolClient
ch chan testDriverCommand
returns []testClientReturn
wg sync.WaitGroup
}

func TestAsyncSendZero(t *testing.T) {
testSendZero(t, makeAsyncTestClient)
}

func TestAsyncSimpleEvent(t *testing.T) {
testSimpleEvent(t, makeAsyncTestClient)
}

func TestAsyncStructuredEvent(t *testing.T) {
testStructuredEvent(t, makeAsyncTestClient)
}

func TestAsyncCloseAfterWindowSize(t *testing.T) {
testCloseAfterWindowSize(t, makeAsyncTestClient)
}

func TestAsyncMultiFailMaxTimeouts(t *testing.T) {
testMultiFailMaxTimeouts(t, makeAsyncTestClient)
}

func makeAsyncTestClient(conn TransportClient) testClientDriver {
return newAsyncTestDriver(newAsyncTestClient(conn))
}

func newAsyncTestClient(conn TransportClient) *asyncClient {
c, err := newAsyncLumberjackClient(conn, 3, testMaxWindowSize, 100*time.Millisecond)
if err != nil {
panic(err)
}
return c
}

func newAsyncTestDriver(client mode.AsyncProtocolClient) *testAsyncDriver {
driver := &testAsyncDriver{
client: client,
ch: make(chan testDriverCommand, 1),
returns: nil,
}

resp := make(chan testClientReturn, 1)

driver.wg.Add(1)
go func() {
defer driver.wg.Done()

for {
cmd, ok := <-driver.ch
if !ok {
return
}

switch cmd.code {
case driverCmdQuit:
return
case driverCmdConnect:
driver.client.Connect(1 * time.Second)
case driverCmdClose:
driver.client.Close()
case driverCmdPublish:
cb := func(events []common.MapStr, err error) {
fmt.Printf("response: batch=%v, err=%v\n", len(events), err)

n := len(cmd.events) - len(events)
ret := testClientReturn{n, err}
resp <- ret
}

fmt.Printf("publish events: batch=%v", len(cmd.events))
err := driver.client.AsyncPublishEvents(cb, cmd.events)
fmt.Println("async publish returned with: ", err)

if err != nil {
driver.returns = append(driver.returns, testClientReturn{0, err})
} else {
r := <-resp
driver.returns = append(driver.returns, r)
}
}
}
}()

return driver
}

func (t *testAsyncDriver) Close() {
t.ch <- testDriverCommand{code: driverCmdClose}
}

func (t *testAsyncDriver) Connect() {
t.ch <- testDriverCommand{code: driverCmdConnect}
}

func (t *testAsyncDriver) Stop() {
if t.ch != nil {
t.ch <- testDriverCommand{code: driverCmdQuit}
t.wg.Wait()
close(t.ch)
t.client.Close()
t.ch = nil
}
}

func (t *testAsyncDriver) Publish(events []common.MapStr) {
t.ch <- testDriverCommand{code: driverCmdPublish, events: events}
}

func (t *testAsyncDriver) Returns() []testClientReturn {
return t.returns
}
85 changes: 61 additions & 24 deletions libbeat/outputs/logstash/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ import (

const (
driverCmdQuit = iota
driverCmdConnect
driverCmdClose
driverCmdPublish
)

type testClientDriver interface {
Connect()
Close()

Stop()
Publish(events []common.MapStr)
Returns() []testClientReturn
Expand Down Expand Up @@ -57,14 +62,19 @@ func testSendZero(t *testing.T, factory clientFactory) {
server := newMockServerTCP(t, 1*time.Second, "")
defer server.Close()

sock, transp, err := server.connectPair(1 * time.Second)
transp, err := server.transp()
if err != nil {
t.Fatalf("Failed to connect server and client: %v", err)
t.Fatalf("Failed to create transport client: %v", err)
}
defer transp.Close()

client := factory(transp)
defer client.Stop()

await := server.await()
client.Connect()
sock := <-await
defer sock.Close()
defer transp.Close()

client.Publish(make([]common.MapStr, 0))

Expand All @@ -81,16 +91,24 @@ func testSendZero(t *testing.T, factory clientFactory) {
func testSimpleEvent(t *testing.T, factory clientFactory) {
enableLogging([]string{"*"})
server := newMockServerTCP(t, 1*time.Second, "")
defer server.Close()

sock, transp, err := server.connectPair(1 * time.Second)
transp, err := server.transp()
if err != nil {
t.Fatalf("Failed to connect server and client: %v", err)
t.Fatalf("Failed to create transport client: %v", err)
}
client := factory(transp)
conn := &mockConn{sock, streambuf.New(nil)}
defer transp.Close()

client := factory(transp)
defer client.Stop()

await := server.await()
client.Connect()
sock := <-await
defer sock.Close()

conn := &mockConn{sock, streambuf.New(nil)}

event := common.MapStr{"name": "me", "line": 10}
client.Publish([]common.MapStr{event})

Expand Down Expand Up @@ -120,16 +138,24 @@ func testSimpleEvent(t *testing.T, factory clientFactory) {
func testStructuredEvent(t *testing.T, factory clientFactory) {
enableLogging([]string{"*"})
server := newMockServerTCP(t, 1*time.Second, "")
defer server.Close()

sock, transp, err := server.connectPair(1 * time.Second)
transp, err := server.transp()
if err != nil {
t.Fatalf("Failed to connect server and client: %v", err)
t.Fatalf("Failed to create transport client: %v", err)
}
client := factory(transp)
conn := &mockConn{sock, streambuf.New(nil)}
defer transp.Close()

client := factory(transp)
defer client.Stop()

await := server.await()
client.Connect()
sock := <-await
defer sock.Close()

conn := &mockConn{sock, streambuf.New(nil)}

event := common.MapStr{
"name": "test",
"struct": common.MapStr{
Expand Down Expand Up @@ -173,17 +199,24 @@ func testStructuredEvent(t *testing.T, factory clientFactory) {
func testCloseAfterWindowSize(t *testing.T, factory clientFactory) {
enableLogging([]string{"*"})
server := newMockServerTCP(t, 100*time.Millisecond, "")
defer server.Close()

sock, transp, err := server.connectPair(100 * time.Millisecond)
transp, err := server.transp()
if err != nil {
t.Fatalf("Failed to connect server and client: %v", err)
t.Fatalf("Failed to create transport client: %v", err)
}
client := factory(transp)
conn := &mockConn{sock, streambuf.New(nil)}
defer transp.Close()
defer sock.Close()

client := factory(transp)
defer client.Stop()

await := server.await()
client.Connect()
sock := <-await
defer sock.Close()

conn := &mockConn{sock, streambuf.New(nil)}

client.Publish([]common.MapStr{common.MapStr{
"message": "hello world",
}})
Expand All @@ -198,25 +231,29 @@ func testCloseAfterWindowSize(t *testing.T, factory clientFactory) {
func testMultiFailMaxTimeouts(t *testing.T, factory clientFactory) {
enableLogging([]string{"*"})

server := newMockServerTCP(t, 100*time.Millisecond, "")
N := 8

server := newMockServerTCP(t, 1*time.Second, "")
defer server.Close()

transp, err := server.transp()
if err != nil {
t.Fatalf("Failed to connect server and client: %v", err)
t.Fatalf("Failed to create transport client: %v", err)
}
defer transp.Close()

N := 8
client := factory(transp)
defer transp.Close()
defer client.Stop()

event := common.MapStr{"name": "me", "line": 10}

for i := 0; i < N; i++ {
await := server.await()
err = transp.Connect(100 * time.Millisecond)
if err != nil {
t.Fatalf("Transport client Failed to connect: %v", err)
if transp.IsConnected() {
t.Fatal("client should not be connected")
}

await := server.await()
client.Connect()
sock := <-await
conn := &mockConn{sock, streambuf.New(nil)}

Expand Down
12 changes: 12 additions & 0 deletions libbeat/outputs/logstash/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func newClientTestDriver(client mode.ProtocolClient) *testSyncDriver {
switch cmd.code {
case driverCmdQuit:
return
case driverCmdConnect:
driver.client.Connect(1 * time.Second)
case driverCmdClose:
driver.client.Close()
case driverCmdPublish:
events, err := driver.client.PublishEvents(cmd.events)
n := len(cmd.events) - len(events)
Expand All @@ -96,6 +100,14 @@ func newClientTestDriver(client mode.ProtocolClient) *testSyncDriver {
return driver
}

func (t *testSyncDriver) Close() {
t.ch <- testDriverCommand{code: driverCmdClose}
}

func (t *testSyncDriver) Connect() {
t.ch <- testDriverCommand{code: driverCmdConnect}
}

func (t *testSyncDriver) Stop() {
if t.ch != nil {
t.ch <- testDriverCommand{code: driverCmdQuit}
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/logstash/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (c *tcpClient) Connect(timeout time.Duration) error {

c.conn = conn
c.connected = true
debug("connected")
return nil
}

Expand Down

0 comments on commit f45ad32

Please sign in to comment.