Skip to content

Commit

Permalink
Account for missing registration (fixes #124)
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Nov 15, 2017
1 parent 43c9c47 commit b1a4a04
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 42 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ Fixed:
* [#120](https://github.com/mailgun/kafka-pixy/issues/120) Consumption from a
topic stopped for a group.
* [#123](https://github.com/mailgun/kafka-pixy/issues/123) Inexplicable offset
manager timeouts
manager timeouts.
* [#124](https://github.com/mailgun/kafka-pixy/issues/124) Subscription to a
topic fails indefinitely after ZooKeeper connection loss.

#### Version 0.14.0 (2017-09-11)

Expand Down
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
version = "1.0.1"

[[constraint]]
branch = "master"
branch = "maxim/develop"
name = "github.com/mailgun/kazoo-go"

[[constraint]]
Expand Down
14 changes: 9 additions & 5 deletions consumer/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,17 @@ func (ss *T) submitTopics(topics []string) error {
ss.registered = false
return nil
}
var err error

if ss.registered {
err = ss.groupMemberZNode.UpdateRegistration(topics)
} else {
err = ss.groupMemberZNode.Register(topics)
err := ss.groupMemberZNode.UpdateRegistration(topics)
if err != kazoo.ErrInstanceNotRegistered {
return errors.Wrap(err, "failed to update registration")
}
ss.registered = false
ss.actDesc.Log().Errorf("Registration disappeared")
}
for err != nil {

if err := ss.groupMemberZNode.Register(topics); err != nil {
return errors.Wrap(err, "failed to register")
}
ss.registered = true
Expand Down
32 changes: 32 additions & 0 deletions consumer/subscriber/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,38 @@ func (s *SubscriberSuite) TestRedundantUpdateBug(c *C) {
assertSubscription(c, ss1.Subscriptions(), membership, 3*time.Second)
}

func (s *SubscriberSuite) TestMissingSubscriptionBug(c *C) {
cfg1 := newConfig("m1")
ss1 := Spawn(s.ns.NewChild("m1"), "g1", cfg1, s.kazooClt)
defer ss1.Stop()
cfg2 := newConfig("m2")
ss2 := Spawn(s.ns.NewChild("m2"), "g1", cfg2, s.kazooClt)
defer ss2.Stop()

ss1.Topics() <- []string{"foo", "bar"}
ss2.Topics() <- []string{"foo", "bazz", "blah"}

membership := map[string][]string{
"m1": {"bar", "foo"},
"m2": {"bazz", "blah", "foo"}}
assertSubscription(c, ss1.Subscriptions(), membership, 3*time.Second)
assertSubscription(c, ss2.Subscriptions(), membership, 3*time.Second)

// Simulate subscription ephemeral node expiration by removing it manually.
gm := s.kazooClt.Consumergroup("g1").Instance("m1")
gm.Deregister()

// When
ss1.Topics() <- []string{"foo", "kaboom"}

// Then
membership = map[string][]string{
"m1": {"foo", "kaboom"},
"m2": {"bazz", "blah", "foo"}}
assertSubscription(c, ss1.Subscriptions(), membership, 3*time.Second)
assertSubscription(c, ss2.Subscriptions(), membership, 3*time.Second)
}

// When a group registrator claims a topic partitions it becomes its owner.
func (s *SubscriberSuite) TestClaimPartition(c *C) {
cfg := newConfig("m1")
Expand Down
3 changes: 2 additions & 1 deletion vendor/github.com/mailgun/kazoo-go/Makefile

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 23 additions & 10 deletions vendor/github.com/mailgun/kazoo-go/consumergroup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 47 additions & 3 deletions vendor/github.com/mailgun/kazoo-go/kazoo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b1a4a04

Please sign in to comment.