Skip to content

Commit

Permalink
Merge pull request #84 from mailgun/maxim/develop
Browse files Browse the repository at this point in the history
Fix nil interface gotcha (fixes #83)
  • Loading branch information
horkhe authored Dec 21, 2016
2 parents 8ada027 + a3a9ba5 commit bdbaa08
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 34 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
[default](https://github.com/mailgun/kafka-pixy/blob/master/default.yaml)
configuration file is provided for reference.

Fixed:

* [#83](https://github.com/mailgun/kafka-pixy/issues/83) Panic in partition
multiplexer.

#### Version 0.11.1 (2016-08-11)

Bug fix release.
Expand Down
6 changes: 6 additions & 0 deletions consumer/groupcsm/groupcsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ func (gc *T) runRebalancing(actorID *actor.ID, topicConsumers map[string]*topicc
// rewireMuxAsync calls muxInputs in another goroutine.
func (gc *T) rewireMuxAsync(topic string, wg *sync.WaitGroup, mux *multiplexer.T, tc *topiccsm.T, assigned []int32) {
actor.Spawn(gc.supActorID.NewChild("rewire", topic), wg, func() {
if tc == nil {
// Parameter output is of interface type, therefore nil should be
// passed explicitly.
mux.WireUp(nil, nil)
return
}
mux.WireUp(tc, assigned)
})
}
Expand Down
68 changes: 37 additions & 31 deletions consumer/multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
// inputs in the sense that it decides when an new input instance needs to
// started, or the old one stopped.
type T struct {
actorID *actor.ID
spawnInF SpawnInF
inputs map[int32]*fetchedInput
output Out
isRunning bool
stopCh chan none.T
wg sync.WaitGroup
actorID *actor.ID
spawnInF SpawnInF
fetchedInputs map[int32]*fetchedInput
output Out
isRunning bool
stopCh chan none.T
wg sync.WaitGroup
}

// In defines an interface of a multiplexer input.
Expand Down Expand Up @@ -52,10 +52,10 @@ type SpawnInF func(partition int32) In
// New creates a new multiplexer instance.
func New(namespace *actor.ID, spawnInF SpawnInF) *T {
return &T{
actorID: namespace.NewChild("mux"),
inputs: make(map[int32]*fetchedInput),
spawnInF: spawnInF,
stopCh: make(chan none.T),
actorID: namespace.NewChild("mux"),
fetchedInputs: make(map[int32]*fetchedInput),
spawnInF: spawnInF,
stopCh: make(chan none.T),
}
}

Expand All @@ -72,51 +72,57 @@ func (m *T) IsRunning() bool {
return m.isRunning
}

// WireUp ensures that inputs of all assigned partitions are spawned and
// multiplexed to the specified output. It stops inputs for partitions that are
// no longer assigned, spawns inputs for newly assigned partitions, and
// restarts the multiplexer, if either output or any of inputs has changed.
// WireUp ensures that assigned inputs are spawned and multiplexed to the
// specified output. It stops inputs for partitions that are no longer
// assigned, spawns inputs for newly assigned partitions, and restarts the
// multiplexer, if either output or any of inputs has changed.
//
// The multiplexer may be stopped if either output or all inputs are removed.
//
// WARNING: do not ever pass (*T)(nil) in output, that will cause panic.
func (m *T) WireUp(output Out, assigned []int32) {
var wg sync.WaitGroup

if m.output != output {
m.stopIfRunning()
m.output = output
}

// If output is not provided, then stop all inputs and return.
if output == nil {
for partition, input := range m.inputs {
for partition, input := range m.fetchedInputs {
wg.Add(1)
go func(input *fetchedInput) {
go func(fin *fetchedInput) {
defer wg.Done()
input.in.Stop()
fin.in.Stop()
}(input)
delete(m.inputs, partition)
delete(m.fetchedInputs, partition)
}
wg.Wait()
return
}

for partition, input := range m.inputs {
// Stop inputs that are not assigned anymore.
for partition, fin := range m.fetchedInputs {
if !hasPartition(partition, assigned) {
m.stopIfRunning()
wg.Add(1)
go func(input *fetchedInput) {
go func(fin *fetchedInput) {
defer wg.Done()
input.in.Stop()
}(input)
delete(m.inputs, partition)
fin.in.Stop()
}(fin)
delete(m.fetchedInputs, partition)
}
}
wg.Wait()

// Spawn newly assigned inputs, but stop multiplexer before spawning the
// first input.
for _, partition := range assigned {
if _, ok := m.inputs[partition]; !ok {
if _, ok := m.fetchedInputs[partition]; !ok {
m.stopIfRunning()
m.inputs[partition] = &fetchedInput{m.spawnInF(partition), nil}
in := m.spawnInF(partition)
m.fetchedInputs[partition] = &fetchedInput{in, nil}
}
}
if !m.IsRunning() && len(m.inputs) > 0 {
if !m.IsRunning() && len(m.fetchedInputs) > 0 {
m.start()
}
}
Expand All @@ -140,7 +146,7 @@ func (m *T) stopIfRunning() {
}

func (m *T) run() {
sortedIns := makeSortedIns(m.inputs)
sortedIns := makeSortedIns(m.fetchedInputs)
inputCount := len(sortedIns)
// Prepare a list of reflective select cases that is used when there are no
// messages available from any of the inputs and we need to wait on all
Expand Down
6 changes: 3 additions & 3 deletions consumer/partitioncsm/partitioncsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package partitioncsm

import (
"fmt"
"sync"

"github.com/Shopify/sarama"
"github.com/mailgun/kafka-pixy/actor"
"github.com/mailgun/kafka-pixy/config"
Expand All @@ -11,7 +13,6 @@ import (
"github.com/mailgun/kafka-pixy/consumer/offsetmgr"
"github.com/mailgun/kafka-pixy/none"
"github.com/mailgun/log"
"sync"
)

var (
Expand Down Expand Up @@ -122,8 +123,7 @@ func (pc *T) run() {
firstMessageFetched := false
for {
var msg *consumer.Message
// Wait for a fetched message to to provided by the controlled
// partition consumer.
// Wait for a fetched message to be provided by the message stream.
for {
select {
case msg = <-ms.Messages():
Expand Down

0 comments on commit bdbaa08

Please sign in to comment.