From f45ad3258a3f5799daf6eab31fedff6cb9ba52d8 Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 8 Feb 2016 19:26:41 +0100 Subject: [PATCH] Add logstash async unit tests --- libbeat/outputs/logstash/async.go | 10 +- libbeat/outputs/logstash/async_test.go | 129 ++++++++++++++++++++++++ libbeat/outputs/logstash/client_test.go | 85 +++++++++++----- libbeat/outputs/logstash/sync_test.go | 12 +++ libbeat/outputs/logstash/transport.go | 1 + 5 files changed, 210 insertions(+), 27 deletions(-) create mode 100644 libbeat/outputs/logstash/async_test.go diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 456682a2acb..60bc0763324 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -6,7 +6,6 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" ) type asyncClient struct { @@ -66,7 +65,7 @@ func newAsyncLumberjackClient( } func (c *asyncClient) Connect(timeout time.Duration) error { - logp.Debug("logstash", "connect (async)") + debug("connect (async)") err := c.TransportClient.Connect(timeout) if err == nil { c.startACK() @@ -75,7 +74,7 @@ func (c *asyncClient) Connect(timeout time.Duration) error { } func (c *asyncClient) Close() error { - logp.Debug("logstash", "close (async) connection") + debug("close (async) connection") c.stopACK() return c.closeTransport() } @@ -251,6 +250,7 @@ func (c *asyncClient) startACK() { } func (c *asyncClient) stopACK() { + debug("stop ackLoop") close(c.done) c.wg.Wait() close(c.ch) @@ -261,6 +261,8 @@ func (c *asyncClient) ackLoop() { defer c.wg.Done() defer debug("finished ackLoop") + debug("start ackLoop") + for { var err error var msg ackMessage @@ -272,6 +274,8 @@ func (c *asyncClient) ackLoop() { return } + debug("new ack message") + inPartial := false switch msg.tag { case tagComplete: diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go new file mode 100644 index 00000000000..5ec0dac9718 --- /dev/null +++ b/libbeat/outputs/logstash/async_test.go @@ -0,0 +1,129 @@ +package logstash + +import ( + "fmt" + + "sync" + "testing" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs/mode" +) + +type testAsyncDriver struct { + client mode.AsyncProtocolClient + ch chan testDriverCommand + returns []testClientReturn + wg sync.WaitGroup +} + +func TestAsyncSendZero(t *testing.T) { + testSendZero(t, makeAsyncTestClient) +} + +func TestAsyncSimpleEvent(t *testing.T) { + testSimpleEvent(t, makeAsyncTestClient) +} + +func TestAsyncStructuredEvent(t *testing.T) { + testStructuredEvent(t, makeAsyncTestClient) +} + +func TestAsyncCloseAfterWindowSize(t *testing.T) { + testCloseAfterWindowSize(t, makeAsyncTestClient) +} + +func TestAsyncMultiFailMaxTimeouts(t *testing.T) { + testMultiFailMaxTimeouts(t, makeAsyncTestClient) +} + +func makeAsyncTestClient(conn TransportClient) testClientDriver { + return newAsyncTestDriver(newAsyncTestClient(conn)) +} + +func newAsyncTestClient(conn TransportClient) *asyncClient { + c, err := newAsyncLumberjackClient(conn, 3, testMaxWindowSize, 100*time.Millisecond) + if err != nil { + panic(err) + } + return c +} + +func newAsyncTestDriver(client mode.AsyncProtocolClient) *testAsyncDriver { + driver := &testAsyncDriver{ + client: client, + ch: make(chan testDriverCommand, 1), + returns: nil, + } + + resp := make(chan testClientReturn, 1) + + driver.wg.Add(1) + go func() { + defer driver.wg.Done() + + for { + cmd, ok := <-driver.ch + if !ok { + return + } + + switch cmd.code { + case driverCmdQuit: + return + case driverCmdConnect: + driver.client.Connect(1 * time.Second) + case driverCmdClose: + driver.client.Close() + case driverCmdPublish: + cb := func(events []common.MapStr, err error) { + fmt.Printf("response: batch=%v, err=%v\n", len(events), err) + + n := len(cmd.events) - len(events) + ret := testClientReturn{n, err} + resp <- ret + } + + fmt.Printf("publish events: batch=%v", len(cmd.events)) + err := driver.client.AsyncPublishEvents(cb, cmd.events) + fmt.Println("async publish returned with: ", err) + + if err != nil { + driver.returns = append(driver.returns, testClientReturn{0, err}) + } else { + r := <-resp + driver.returns = append(driver.returns, r) + } + } + } + }() + + return driver +} + +func (t *testAsyncDriver) Close() { + t.ch <- testDriverCommand{code: driverCmdClose} +} + +func (t *testAsyncDriver) Connect() { + t.ch <- testDriverCommand{code: driverCmdConnect} +} + +func (t *testAsyncDriver) Stop() { + if t.ch != nil { + t.ch <- testDriverCommand{code: driverCmdQuit} + t.wg.Wait() + close(t.ch) + t.client.Close() + t.ch = nil + } +} + +func (t *testAsyncDriver) Publish(events []common.MapStr) { + t.ch <- testDriverCommand{code: driverCmdPublish, events: events} +} + +func (t *testAsyncDriver) Returns() []testClientReturn { + return t.returns +} diff --git a/libbeat/outputs/logstash/client_test.go b/libbeat/outputs/logstash/client_test.go index a619d012ed6..eabe9ce02af 100644 --- a/libbeat/outputs/logstash/client_test.go +++ b/libbeat/outputs/logstash/client_test.go @@ -14,10 +14,15 @@ import ( const ( driverCmdQuit = iota + driverCmdConnect + driverCmdClose driverCmdPublish ) type testClientDriver interface { + Connect() + Close() + Stop() Publish(events []common.MapStr) Returns() []testClientReturn @@ -57,14 +62,19 @@ func testSendZero(t *testing.T, factory clientFactory) { server := newMockServerTCP(t, 1*time.Second, "") defer server.Close() - sock, transp, err := server.connectPair(1 * time.Second) + transp, err := server.transp() if err != nil { - t.Fatalf("Failed to connect server and client: %v", err) + t.Fatalf("Failed to create transport client: %v", err) } + defer transp.Close() client := factory(transp) + defer client.Stop() + + await := server.await() + client.Connect() + sock := <-await defer sock.Close() - defer transp.Close() client.Publish(make([]common.MapStr, 0)) @@ -81,16 +91,24 @@ func testSendZero(t *testing.T, factory clientFactory) { func testSimpleEvent(t *testing.T, factory clientFactory) { enableLogging([]string{"*"}) server := newMockServerTCP(t, 1*time.Second, "") + defer server.Close() - sock, transp, err := server.connectPair(1 * time.Second) + transp, err := server.transp() if err != nil { - t.Fatalf("Failed to connect server and client: %v", err) + t.Fatalf("Failed to create transport client: %v", err) } - client := factory(transp) - conn := &mockConn{sock, streambuf.New(nil)} defer transp.Close() + + client := factory(transp) + defer client.Stop() + + await := server.await() + client.Connect() + sock := <-await defer sock.Close() + conn := &mockConn{sock, streambuf.New(nil)} + event := common.MapStr{"name": "me", "line": 10} client.Publish([]common.MapStr{event}) @@ -120,16 +138,24 @@ func testSimpleEvent(t *testing.T, factory clientFactory) { func testStructuredEvent(t *testing.T, factory clientFactory) { enableLogging([]string{"*"}) server := newMockServerTCP(t, 1*time.Second, "") + defer server.Close() - sock, transp, err := server.connectPair(1 * time.Second) + transp, err := server.transp() if err != nil { - t.Fatalf("Failed to connect server and client: %v", err) + t.Fatalf("Failed to create transport client: %v", err) } - client := factory(transp) - conn := &mockConn{sock, streambuf.New(nil)} defer transp.Close() + + client := factory(transp) + defer client.Stop() + + await := server.await() + client.Connect() + sock := <-await defer sock.Close() + conn := &mockConn{sock, streambuf.New(nil)} + event := common.MapStr{ "name": "test", "struct": common.MapStr{ @@ -173,17 +199,24 @@ func testStructuredEvent(t *testing.T, factory clientFactory) { func testCloseAfterWindowSize(t *testing.T, factory clientFactory) { enableLogging([]string{"*"}) server := newMockServerTCP(t, 100*time.Millisecond, "") + defer server.Close() - sock, transp, err := server.connectPair(100 * time.Millisecond) + transp, err := server.transp() if err != nil { - t.Fatalf("Failed to connect server and client: %v", err) + t.Fatalf("Failed to create transport client: %v", err) } - client := factory(transp) - conn := &mockConn{sock, streambuf.New(nil)} defer transp.Close() - defer sock.Close() + + client := factory(transp) defer client.Stop() + await := server.await() + client.Connect() + sock := <-await + defer sock.Close() + + conn := &mockConn{sock, streambuf.New(nil)} + client.Publish([]common.MapStr{common.MapStr{ "message": "hello world", }}) @@ -198,25 +231,29 @@ func testCloseAfterWindowSize(t *testing.T, factory clientFactory) { func testMultiFailMaxTimeouts(t *testing.T, factory clientFactory) { enableLogging([]string{"*"}) - server := newMockServerTCP(t, 100*time.Millisecond, "") + N := 8 + + server := newMockServerTCP(t, 1*time.Second, "") + defer server.Close() + transp, err := server.transp() if err != nil { - t.Fatalf("Failed to connect server and client: %v", err) + t.Fatalf("Failed to create transport client: %v", err) } + defer transp.Close() - N := 8 client := factory(transp) - defer transp.Close() defer client.Stop() event := common.MapStr{"name": "me", "line": 10} for i := 0; i < N; i++ { - await := server.await() - err = transp.Connect(100 * time.Millisecond) - if err != nil { - t.Fatalf("Transport client Failed to connect: %v", err) + if transp.IsConnected() { + t.Fatal("client should not be connected") } + + await := server.await() + client.Connect() sock := <-await conn := &mockConn{sock, streambuf.New(nil)} diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index 5ff5bffffd6..ed9a985cbc7 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -85,6 +85,10 @@ func newClientTestDriver(client mode.ProtocolClient) *testSyncDriver { switch cmd.code { case driverCmdQuit: return + case driverCmdConnect: + driver.client.Connect(1 * time.Second) + case driverCmdClose: + driver.client.Close() case driverCmdPublish: events, err := driver.client.PublishEvents(cmd.events) n := len(cmd.events) - len(events) @@ -96,6 +100,14 @@ func newClientTestDriver(client mode.ProtocolClient) *testSyncDriver { return driver } +func (t *testSyncDriver) Close() { + t.ch <- testDriverCommand{code: driverCmdClose} +} + +func (t *testSyncDriver) Connect() { + t.ch <- testDriverCommand{code: driverCmdConnect} +} + func (t *testSyncDriver) Stop() { if t.ch != nil { t.ch <- testDriverCommand{code: driverCmdQuit} diff --git a/libbeat/outputs/logstash/transport.go b/libbeat/outputs/logstash/transport.go index c095a37dbbd..1277dcf3a33 100644 --- a/libbeat/outputs/logstash/transport.go +++ b/libbeat/outputs/logstash/transport.go @@ -74,6 +74,7 @@ func (c *tcpClient) Connect(timeout time.Duration) error { c.conn = conn c.connected = true + debug("connected") return nil }