Skip to content

Commit

Permalink
fix consumer interceptors
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiaolin committed Jun 7, 2021
1 parent fca0631 commit 4c37463
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,7 @@ feederLoop:
}

for i, msg := range msgs {
for _, interceptor := range child.conf.Consumer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
child.interceptors(msg)
messageSelect:
select {
case <-child.dying:
Expand All @@ -484,6 +482,7 @@ feederLoop:
child.broker.acks.Done()
remainingLoop:
for _, msg = range msgs[i:] {
child.interceptors(msg)
select {
case child.messages <- msg:
case <-child.dying:
Expand Down Expand Up @@ -715,6 +714,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
return messages, nil
}

func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
for _, interceptor := range child.conf.Consumer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
}

type brokerConsumer struct {
consumer *consumer
broker *Broker
Expand Down

0 comments on commit 4c37463

Please sign in to comment.