Skip to content

Commit

Permalink
Merge pull request #86 from mailgun/maxim/develop
Browse files Browse the repository at this point in the history
Handle closing of the multiplexer input channel (fixes #85)
  • Loading branch information
horkhe authored Jan 7, 2017
2 parents bdbaa08 + 546ae68 commit 1d5974e
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 140 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

* [#81](https://github.com/mailgun/kafka-pixy/pull/81) Add capability to
proxy to multiple Kafka/ZooKeeper clusters.
* [#83](https://github.com/mailgun/kafka-pixy/pull/83) Panic in partition
multiplexer.
* [#85](https://github.com/mailgun/kafka-pixy/pull/85) Panic in partition
multiplexer.
* [#16](https://github.com/mailgun/kafka-pixy/issues/16) A YAML configuration
file can be passed to Kafka-Pixy with `--config` command line parameter. A
[default](https://github.com/mailgun/kafka-pixy/blob/master/default.yaml)
Expand Down
172 changes: 84 additions & 88 deletions consumer/multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ import (
"github.com/mailgun/kafka-pixy/actor"
"github.com/mailgun/kafka-pixy/consumer"
"github.com/mailgun/kafka-pixy/none"
"github.com/mailgun/log"
)

// T fetches messages from inputs and multiplexes them to the output, giving
// preferences to inputs with higher lag. Multiplexes assumes ownership over
// inputs in the sense that it decides when an new input instance needs to
// started, or the old one stopped.
type T struct {
actorID *actor.ID
spawnInF SpawnInF
fetchedInputs map[int32]*fetchedInput
output Out
isRunning bool
stopCh chan none.T
wg sync.WaitGroup
actorID *actor.ID
spawnInF SpawnInF
inputs map[int32]*input
output Out
isRunning bool
stopCh chan none.T
wg sync.WaitGroup
}

// In defines an interface of a multiplexer input.
Expand Down Expand Up @@ -52,18 +53,19 @@ type SpawnInF func(partition int32) In
// New creates a new multiplexer instance.
func New(namespace *actor.ID, spawnInF SpawnInF) *T {
return &T{
actorID: namespace.NewChild("mux"),
fetchedInputs: make(map[int32]*fetchedInput),
spawnInF: spawnInF,
stopCh: make(chan none.T),
actorID: namespace.NewChild("mux"),
inputs: make(map[int32]*input),
spawnInF: spawnInF,
stopCh: make(chan none.T),
}
}

// input represents a multiplexer input along with a message to be fetched from
// that input next.
type fetchedInput struct {
in In
nextMsg *consumer.Message
type input struct {
In
partition int32
nextMsg *consumer.Message
}

// IsRunning returns `true` if multiplexer is running pumping events from the
Expand All @@ -89,40 +91,39 @@ func (m *T) WireUp(output Out, assigned []int32) {
}
// If output is not provided, then stop all inputs and return.
if output == nil {
for partition, input := range m.fetchedInputs {
for p, in := range m.inputs {
wg.Add(1)
go func(fin *fetchedInput) {
go func(in *input) {
defer wg.Done()
fin.in.Stop()
}(input)
delete(m.fetchedInputs, partition)
in.Stop()
}(in)
delete(m.inputs, p)
}
wg.Wait()
return
}
// Stop inputs that are not assigned anymore.
for partition, fin := range m.fetchedInputs {
if !hasPartition(partition, assigned) {
for p, in := range m.inputs {
if !hasPartition(p, assigned) {
m.stopIfRunning()
wg.Add(1)
go func(fin *fetchedInput) {
go func(in *input) {
defer wg.Done()
fin.in.Stop()
}(fin)
delete(m.fetchedInputs, partition)
in.Stop()
}(in)
delete(m.inputs, p)
}
}
wg.Wait()
// Spawn newly assigned inputs, but stop multiplexer before spawning the
// first input.
for _, partition := range assigned {
if _, ok := m.fetchedInputs[partition]; !ok {
for _, p := range assigned {
if _, ok := m.inputs[p]; !ok {
m.stopIfRunning()
in := m.spawnInF(partition)
m.fetchedInputs[partition] = &fetchedInput{in, nil}
m.inputs[p] = &input{In: m.spawnInF(p), partition: p}
}
}
if !m.IsRunning() && len(m.fetchedInputs) > 0 {
if !m.IsRunning() && len(m.inputs) > 0 {
m.start()
}
}
Expand All @@ -146,66 +147,78 @@ func (m *T) stopIfRunning() {
}

func (m *T) run() {
sortedIns := makeSortedIns(m.fetchedInputs)
inputCount := len(sortedIns)
// Prepare a list of reflective select cases that is used when there are no
// messages available from any of the inputs and we need to wait on all
// of them for the first message to be fetched. Yes, reflection is slow but
// it is only used in a corner case.
reset:
inputCount := len(m.inputs)
if inputCount == 0 {
return
}
sortedIns := makeSortedIns(m.inputs)
// Prepare a list of reflective select cases. It is used when none of the
// inputs has fetched messages and we need to wait on all of them. Yes,
// reflection is slow, but it is only used when there is nothing to
// consume anyway.
selectCases := make([]reflect.SelectCase, inputCount+1)
for i, input := range sortedIns {
selectCases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(input.in.Messages())}
for i, in := range sortedIns {
selectCases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(in.Messages())}
}
selectCases[inputCount] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(m.stopCh)}

inputIdx := -1
for {
// Collect next messages from all inputs.
// Collect next messages from inputs that have them available.
isAtLeastOneAvailable := false
for _, input := range sortedIns {
if input.nextMsg != nil {
for _, in := range sortedIns {
if in.nextMsg != nil {
isAtLeastOneAvailable = true
continue
}
select {
case msg := <-input.in.Messages():
input.nextMsg = msg
case msg, ok := <-in.Messages():
// If a channel of an input is closed, then the input should be
// removed from the list of multiplexed inputs.
if !ok {
log.Infof("<%s> input channel closed: partition=%d", in.partition)
delete(m.inputs, in.partition)
goto reset
}
in.nextMsg = msg
isAtLeastOneAvailable = true
default:
}
}
// If none of the inputs has a message available, then wait until some
// of them does or a stop signal is received.
// If none of the inputs has a message available, then wait until
// a message is fetched on any of them or a stop signal is received.
if !isAtLeastOneAvailable {
selected, value, _ := reflect.Select(selectCases)
idx, value, _ := reflect.Select(selectCases)
// Check if it is a stop signal.
if selected == inputCount {
if idx == inputCount {
return
}
sortedIns[selected].nextMsg = value.Interface().(*consumer.Message)
sortedIns[idx].nextMsg = value.Interface().(*consumer.Message)
}
// At this point there is at least one next message available.
// At this point there is at least one message available.
inputIdx = selectInput(inputIdx, sortedIns)
// Wait for read or a stop signal.
// Block until the output reads the next message of the selected input
// or a stop signal is received.
select {
case <-m.stopCh:
return
case m.output.Messages() <- sortedIns[inputIdx].nextMsg:
sortedIns[inputIdx].in.Acks() <- sortedIns[inputIdx].nextMsg
sortedIns[inputIdx].Acks() <- sortedIns[inputIdx].nextMsg
sortedIns[inputIdx].nextMsg = nil
}
}
}

// makeSortedIns given a partition->input map returns a slice of all the inputs
// from the map sorted in ascending order of partition ids.
func makeSortedIns(inputs map[int32]*fetchedInput) []*fetchedInput {
func makeSortedIns(inputs map[int32]*input) []*input {
partitions := make([]int32, 0, len(inputs))
for p := range inputs {
partitions = append(partitions, p)
}
sort.Sort(Int32Slice(partitions))
sortedIns := make([]*fetchedInput, len(inputs))
sortedIns := make([]*input, len(inputs))
for i, p := range partitions {
sortedIns[i] = inputs[p]
}
Expand All @@ -221,49 +234,32 @@ func hasPartition(partition int32, partitions []int32) bool {
}

// selectInput picks an input that should be multiplexed next. It prefers the
// inputs with the largest lag. If there is more then one input with the largest
// lag then it picks the one that index is following the lastInputIdx.
func selectInput(lastInputIdx int, sortedIns []*fetchedInput) int {
maxLag, maxLagIdx, maxLagCount := findMaxLag(sortedIns)
if maxLagCount == 1 {
return maxLagIdx
}
inputCount := len(sortedIns)
for i := 1; i < inputCount; i++ {
maxLagIdx = (lastInputIdx + i) % inputCount
input := sortedIns[maxLagIdx]
// inputs with the largest lag. If there is more then one input with the same
// largest lag, then it picks the one that has index following prevSelectedIdx.
func selectInput(prevSelectedIdx int, sortedIns []*input) int {
maxLag := int64(-1)
selectedIdx := -1
for i, input := range sortedIns {
if input.nextMsg == nil {
continue
}
inputLag := input.nextMsg.HighWaterMark - input.nextMsg.Offset
if inputLag == maxLag {
break
lag := input.nextMsg.HighWaterMark - input.nextMsg.Offset
if lag > maxLag {
maxLag = lag
selectedIdx = i
continue
}
}
return maxLagIdx
}

// findMaxLag traverses though the specified messages ignoring nil ones and,
// returns the value of the max lag among them, along with the index of the
// first message with the max lag value and the total count of messages that
// have max lag.
func findMaxLag(sortedIns []*fetchedInput) (maxLag int64, maxLagIdx, maxLagCount int) {
maxLag = -1
maxLagIdx = -1
for i, input := range sortedIns {
if input.nextMsg == nil {
if lag < maxLag {
continue
}
if selectedIdx > prevSelectedIdx {
continue
}
inputLag := input.nextMsg.HighWaterMark - input.nextMsg.Offset
if inputLag > maxLag {
maxLagIdx = i
maxLag = inputLag
maxLagCount = 1
} else if inputLag == maxLag {
maxLagCount += 1
if i > prevSelectedIdx {
selectedIdx = i
}
}
return maxLag, maxLagIdx, maxLagCount
return selectedIdx
}

type Int32Slice []int32
Expand Down
Loading

0 comments on commit 1d5974e

Please sign in to comment.