diff --git a/async_producer_test.go b/async_producer_test.go index 93690b2bf..403456839 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -285,7 +285,10 @@ func TestAsyncProducerFailureRetry(t *testing.T) { closeProducer(t, producer) } +// If a Kafka broker becomes unavailable and then returns back in service, then +// producer reconnects to it and continues sending messages. func TestAsyncProducerBrokerBounce(t *testing.T) { + // Given seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 2) leaderAddr := leader.Addr() @@ -295,30 +298,34 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + config := NewConfig() - config.Producer.Flush.Messages = 10 + config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Retry.Backoff = 0 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + leader.Returns(prodSuccess) + expectResults(t, producer, 1, 0) - for i := 0; i < 10; i++ { - producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} - } + // When: a broker connection gets reset by a broker (network glitch, restart, you name it). leader.Close() // producer should get EOF leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again - prodSuccess := new(ProduceResponse) - prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + // Then: a produced message goes through the new broker connection. + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} leader.Returns(prodSuccess) - expectResults(t, producer, 10, 0) - seedBroker.Close() - leader.Close() + expectResults(t, producer, 1, 0) closeProducer(t, producer) + seedBroker.Close() + leader.Close() } func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { diff --git a/consumer_test.go b/consumer_test.go index 840a26367..cad709b53 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -447,13 +447,13 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { } //redirect partition 1 back to main leader - fetchResponse := new(FetchResponse) - fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition) - tmp.Returns(fetchResponse) metadataResponse = new(MetadataResponse) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) + fetchResponse := new(FetchResponse) + fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition) + tmp.Returns(fetchResponse) time.Sleep(5 * time.Millisecond) // now send one message to each partition to make sure everything is primed @@ -493,10 +493,6 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { case <-c0.Errors(): case <-c1.Errors(): } - // send it back to the same broker - seedBroker.Returns(metadataResponse) - - time.Sleep(5 * time.Millisecond) select { case <-c0.Messages(): diff --git a/mockbroker_test.go b/mockbroker_test.go index d131cb0c7..2c670b798 100644 --- a/mockbroker_test.go +++ b/mockbroker_test.go @@ -1,39 +1,66 @@ package sarama import ( + "bytes" "encoding/binary" - "errors" + "fmt" "io" "net" "strconv" + "sync" "testing" "time" + + "github.com/davecgh/go-spew/spew" +) + +const ( + expectationTimeout = 250 * time.Millisecond ) -// mockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that -// accepts a single connection. It reads Kafka requests from that connection and returns each response -// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response -// the server sleeps for 250ms instead of reading a request). +type requestHandlerFunc func(req *request) (res encoder) + +// mockBroker is a mock Kafka broker. It consists of a TCP server on a +// kernel-selected localhost port that can accept many connections. It reads +// Kafka requests from that connection and passes them to the user specified +// handler function (see SetHandler) that generates respective responses. If +// the handler has not been explicitly specified then the broker returns +// responses set by the Returns function in the exact order they were provided. +// (if a response has a len of 0, nothing is sent, and the client request will +// timeout in this case). // -// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs -// waiting for a response, the test panics. +// When running tests with one of these, it is strongly recommended to specify +// a timeout to `go test` so that if the broker hangs waiting for a response, +// the test panics. // -// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that -// automatically as a convenience. +// It is not necessary to prefix message length or correlation ID to your +// response bytes, the server does that automatically as a convenience. type mockBroker struct { brokerID int32 port int32 - stopper chan bool + closing chan none + stopper chan none expectations chan encoder listener net.Listener t *testing.T latency time.Duration + handler requestHandlerFunc + handlerMux sync.Mutex } func (b *mockBroker) SetLatency(latency time.Duration) { b.latency = latency } +// SetHandler sets the specified function as the request handler. Whenever +// a mock broker reads a request from the wire it passes the request to the +// function and sends back whatever the handler function returns. +func (b *mockBroker) SetHandler(handler requestHandlerFunc) { + b.handlerMux.Lock() + b.handler = handler + b.handlerMux.Unlock() +} + func (b *mockBroker) BrokerID() int32 { return b.brokerID } @@ -47,80 +74,129 @@ func (b *mockBroker) Addr() string { } func (b *mockBroker) Close() { + close(b.expectations) if len(b.expectations) > 0 { - b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations)) + buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID())) + for e := range b.expectations { + _, _ = buf.WriteString(spew.Sdump(e)) + } + b.t.Error(buf.String()) } - close(b.expectations) + close(b.closing) <-b.stopper } -func (b *mockBroker) serverLoop() (ok bool) { - var ( - err error - conn net.Conn - ) - +func (b *mockBroker) serverLoop() { defer close(b.stopper) - if conn, err = b.listener.Accept(); err != nil { - return b.serverError(err, conn) + var err error + var conn net.Conn + + go func() { + <-b.closing + safeClose(b.t, b.listener) + }() + + wg := &sync.WaitGroup{} + i := 0 + for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() { + wg.Add(1) + go b.handleRequests(conn, i, wg) + i++ } - reqHeader := make([]byte, 4) + wg.Wait() + Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err) +} + +func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) { + defer wg.Done() + defer func() { + _ = conn.Close() + }() + Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx) + var err error + + abort := make(chan none) + defer close(abort) + go func() { + select { + case <-b.closing: + _ = conn.Close() + case <-abort: + } + }() + resHeader := make([]byte, 8) - for expectation := range b.expectations { - _, err = io.ReadFull(conn, reqHeader) + for { + req, err := decodeRequest(conn) if err != nil { - return b.serverError(err, conn) - } - body := make([]byte, binary.BigEndian.Uint32(reqHeader)) - if len(body) < 10 { - return b.serverError(errors.New("Kafka request too short."), conn) - } - if _, err = io.ReadFull(conn, body); err != nil { - return b.serverError(err, conn) + Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req)) + b.serverError(err) + break } if b.latency > 0 { time.Sleep(b.latency) } - response, err := encode(expectation) + res := b.requestHandler()(req) + Logger.Printf("*** mockbroker/%d/%d: served %+v -> %+v", b.brokerID, idx, req, res) + + encodedRes, err := encode(res) if err != nil { - return false + b.serverError(err) + break } - if len(response) == 0 { + if len(encodedRes) == 0 { continue } - binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4)) - binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:])) + binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4)) + binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID)) if _, err = conn.Write(resHeader); err != nil { - return b.serverError(err, conn) + b.serverError(err) + break } - if _, err = conn.Write(response); err != nil { - return b.serverError(err, conn) + if _, err = conn.Write(encodedRes); err != nil { + b.serverError(err) + break } } - if err = conn.Close(); err != nil { - return b.serverError(err, nil) - } - if err = b.listener.Close(); err != nil { - b.t.Error(err) - return false - } - return true + Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err) } -func (b *mockBroker) serverError(err error, conn net.Conn) bool { - b.t.Error(err) - if conn != nil { - if err := conn.Close(); err != nil { - b.t.Error(err) +func (b *mockBroker) requestHandler() requestHandlerFunc { + b.handlerMux.Lock() + defer b.handlerMux.Unlock() + return b.handler +} + +func (b *mockBroker) defaultRequestHandler(req *request) (res encoder) { + select { + case res, ok := <-b.expectations: + if !ok { + return nil } + return res + case <-time.After(expectationTimeout): + return nil } - if err := b.listener.Close(); err != nil { - b.t.Error(err) +} + +func (b *mockBroker) serverError(err error) { + isConnectionClosedError := false + if _, ok := err.(*net.OpError); ok { + isConnectionClosedError = true + } else if err == io.EOF { + isConnectionClosedError = true + } else if err.Error() == "use of closed network connection" { + isConnectionClosedError = true } - return false + + if isConnectionClosedError { + return + } + + b.t.Errorf(err.Error()) } // newMockBroker launches a fake Kafka broker. It takes a *testing.T as provided by the @@ -136,17 +212,19 @@ func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker { var err error broker := &mockBroker{ - stopper: make(chan bool), + closing: make(chan none), + stopper: make(chan none), t: t, brokerID: brokerID, expectations: make(chan encoder, 512), } + broker.handler = broker.defaultRequestHandler broker.listener, err = net.Listen("tcp", addr) if err != nil { t.Fatal(err) } - Logger.Printf("mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String()) + Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String()) _, portStr, err := net.SplitHostPort(broker.listener.Addr().String()) if err != nil { t.Fatal(err)