Skip to content

Commit

Permalink
Merge pull request #1040 from mailgun/maxim/test
Browse files Browse the repository at this point in the history
 Add full version matrix produce/consume test
  • Loading branch information
eapache authored Feb 13, 2018
2 parents 511b1ff + 6ce9e92 commit 44e7121
Show file tree
Hide file tree
Showing 22 changed files with 270 additions and 93 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ default: fmt vet errcheck test
test:
echo "" > coverage.txt
for d in `go list ./... | grep -v vendor`; do \
go test -v -timeout 60s -race -coverprofile=profile.out -covermode=atomic $$d; \
go test -p 1 -v -timeout 90s -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \
if [ -f profile.out ]; then \
cat profile.out >> coverage.txt; \
rm profile.out; \
Expand Down
7 changes: 1 addition & 6 deletions alter_configs_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ var (
'1', '0', '0', '0',
2, // a topic
0, 3, 'b', 'a', 'r', // topic name: foo
0, 0, 0, 2, //2 config
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4,
'1', '0', '0', '0',
0, 0, 0, 1, //2 config
0, 12, // 12 chars
'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
0, 4,
Expand Down Expand Up @@ -80,7 +76,6 @@ func TestAlterConfigsRequest(t *testing.T) {
Type: TopicResource,
Name: "bar",
ConfigEntries: map[string]*string{
"segment.ms": &configValue,
"retention.ms": &configValue,
},
},
Expand Down
5 changes: 4 additions & 1 deletion async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,12 @@ func TestAsyncProducer(t *testing.T) {
if msg.Metadata.(int) != i {
t.Error("Message metadata did not match")
}
case <-time.After(time.Second):
t.Errorf("Timeout waiting for msg #%d", i)
goto done
}
}

done:
closeProducer(t, producer)
leader.Close()
seedBroker.Close()
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func NewConfig() *Config {

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.Version = minVersion
c.Version = MinVersion
c.MetricRegistry = metrics.NewRegistry()

return c
Expand Down
77 changes: 25 additions & 52 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,82 +482,55 @@ feederLoop:

func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
var messages []*ConsumerMessage
var incomplete bool
prelude := true

for _, msgBlock := range msgSet.Messages {
for _, msg := range msgBlock.Messages() {
offset := msg.Offset
if msg.Msg.Version >= 1 {
baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
offset += baseOffset
}
if prelude && offset < child.offset {
if offset < child.offset {
continue
}
prelude = false

if offset >= child.offset {
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: offset,
Timestamp: msg.Msg.Timestamp,
BlockTimestamp: msgBlock.Msg.Timestamp,
})
child.offset = offset + 1
} else {
incomplete = true
}
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: offset,
Timestamp: msg.Msg.Timestamp,
BlockTimestamp: msgBlock.Msg.Timestamp,
})
child.offset = offset + 1
}
}

if incomplete || len(messages) == 0 {
if len(messages) == 0 {
return nil, ErrIncompleteResponse
}
return messages, nil
}

func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
var messages []*ConsumerMessage
var incomplete bool
prelude := true
originalOffset := child.offset

for _, rec := range batch.Records {
offset := batch.FirstOffset + rec.OffsetDelta
if prelude && offset < child.offset {
if offset < child.offset {
continue
}
prelude = false

if offset >= child.offset {
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: rec.Key,
Value: rec.Value,
Offset: offset,
Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
Headers: rec.Headers,
})
child.offset = offset + 1
} else {
incomplete = true
}
}

if incomplete {
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: rec.Key,
Value: rec.Value,
Offset: offset,
Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
Headers: rec.Headers,
})
child.offset = offset + 1
}
if len(messages) == 0 {
return nil, ErrIncompleteResponse
}

child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1
if child.offset <= originalOffset {
return nil, ErrConsumerOffsetNotAdvanced
}

return messages, nil
}

Expand Down
2 changes: 1 addition & 1 deletion fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
case 4:
return V0_11_0_0
default:
return minVersion
return MinVersion
}
}

Expand Down
2 changes: 1 addition & 1 deletion fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (r *FetchResponse) requiredVersion() KafkaVersion {
case 4:
return V0_11_0_0
default:
return minVersion
return MinVersion
}
}

Expand Down
167 changes: 166 additions & 1 deletion functional_consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package sarama

import (
"fmt"
"math"
"os"
"sort"
"sync"
"testing"
"time"
)

func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
Expand Down Expand Up @@ -46,7 +51,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
}
defer safeClose(t, c)

pc, err := c.ConsumePartition("test.1", 0, OffsetOldest)
pc, err := c.ConsumePartition("test.1", 0, offset)
if err != nil {
t.Fatal(err)
}
Expand All @@ -59,3 +64,163 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {

safeClose(t, pc)
}

// Makes sure that messages produced by all supported client versions/
// compression codecs (except LZ4) combinations can be consumed by all
// supported consumer versions. It relies on the KAFKA_VERSION environment
// variable to provide the version of the test Kafka cluster.
//
// Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded
// from this test case. It has a similar version matrix test case below that
// only checks versions from v0.10.0.0 until KAFKA_VERSION.
func TestVersionMatrix(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

// Produce lot's of message with all possible combinations of supported
// protocol versions and compressions for the except of LZ4.
testVersions := versionRange(V0_8_2_0)
allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100)

// When/Then
consumeMsgs(t, testVersions, producedMessages)
}

// Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
// test LZ4 should start with v0.10.0.0.
func TestVersionMatrixLZ4(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

// Produce lot's of message with all possible combinations of supported
// protocol versions starting with v0.10 (first where LZ4 was supported)
// and all possible compressions.
testVersions := versionRange(V0_10_0_0)
allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100)

// When/Then
consumeMsgs(t, testVersions, producedMessages)
}

func prodMsg2Str(prodMsg *ProducerMessage) string {
return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
}

func consMsg2Str(consMsg *ConsumerMessage) string {
return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
}

func versionRange(lower KafkaVersion) []KafkaVersion {
// Get the test cluster version from the environment. If there is nothing
// there then assume the highest.
upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
if err != nil {
upper = MaxVersion
}

versions := make([]KafkaVersion, 0, len(SupportedVersions))
for _, v := range SupportedVersions {
if !v.IsAtLeast(lower) {
continue
}
if !upper.IsAtLeast(v) {
return versions
}
versions = append(versions, v)
}
return versions
}

func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int) []*ProducerMessage {
var wg sync.WaitGroup
var producedMessagesMu sync.Mutex
var producedMessages []*ProducerMessage
for _, prodVer := range clientVersions {
for _, codec := range codecs {
prodCfg := NewConfig()
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec

p, err := NewSyncProducer(kafkaBrokers, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
continue
}
defer safeClose(t, p)
for i := 0; i < countPerVerCodec; i++ {
msg := &ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
}
wg.Add(1)
go func() {
defer wg.Done()
_, _, err := p.SendMessage(msg)
if err != nil {
t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
}
producedMessagesMu.Lock()
producedMessages = append(producedMessages, msg)
producedMessagesMu.Unlock()
}()
}
}
}
wg.Wait()

// Sort produced message in ascending offset order.
sort.Slice(producedMessages, func(i, j int) bool {
return producedMessages[i].Offset < producedMessages[j].Offset
})
t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
return producedMessages
}

func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
// Consume all produced messages with all client versions supported by the
// cluster.
consumerVersionLoop:
for _, consVer := range clientVersions {
t.Logf("*** Consuming with client version %s\n", consVer)
// Create a partition consumer that should start from the first produced
// message.
consCfg := NewConfig()
consCfg.Version = consVer
c, err := NewConsumer(kafkaBrokers, consCfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)
pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, pc)

// Consume as many messages as there have been produced and make sure that
// order is preserved.
for i, prodMsg := range producedMessages {
select {
case consMsg := <-pc.Messages():
if consMsg.Offset != prodMsg.Offset {
t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
continue consumerVersionLoop
}
if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
continue consumerVersionLoop
}
case <-time.After(3 * time.Second):
t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
}
}
}
}
9 changes: 9 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ const (
CompressionLZ4 CompressionCodec = 3
)

func (cc CompressionCodec) String() string {
return []string{
"none",
"gzip",
"snappy",
"lz4",
}[int(cc)]
}

// CompressionLevelDefault is the constant to use in CompressionLevel
// to have the default compression level for any codec. The value is picked
// that we don't use any existing compression levels.
Expand Down
2 changes: 1 addition & 1 deletion metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ func (r *MetadataRequest) version() int16 {
}

func (r *MetadataRequest) requiredVersion() KafkaVersion {
return minVersion
return MinVersion
}
2 changes: 1 addition & 1 deletion metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (r *MetadataResponse) version() int16 {
}

func (r *MetadataResponse) requiredVersion() KafkaVersion {
return minVersion
return MinVersion
}

// testing API
Expand Down
Loading

0 comments on commit 44e7121

Please sign in to comment.