Skip to content

Commit

Permalink
Fix offset_manager for Kafka 0.9.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Dec 12, 2015
1 parent 87ec8d7 commit 4c213c2
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 12 deletions.
10 changes: 0 additions & 10 deletions functional_offset_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package sarama

import (
"os"
"testing"
)

func TestFuncOffsetManager(t *testing.T) {
checkKafkaVersion(t, "0.8.2")
if os.Getenv("KAFKA_VERSION") == "0.9.0.0" {
t.Skip("Offset manager is broken with kafka 0.9 at the moment.")
}

setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -24,10 +18,6 @@ func TestFuncOffsetManager(t *testing.T) {
t.Fatal(err)
}

if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
}

pom1, err := offsetManager.ManagePartition("test.1", 0)
if err != nil {
t.Fatal(err)
Expand Down
5 changes: 3 additions & 2 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,9 @@ func (bom *brokerOffsetManager) flushToBroker() {

func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: -1,
}

for s := range bom.subscriptions {
Expand Down

0 comments on commit 4c213c2

Please sign in to comment.