Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer: ensure that the management message (fin) is never "leaked" #2182

Merged
merged 11 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,15 @@ func (bp *brokerProducer) run() {
continue
}

if msg.flags&fin == fin {
// New broker producer that was caught up by the retry loop
bp.parent.retryMessage(msg, ErrShuttingDown)
delete(bp.currentRetries[msg.Topic], msg.Partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure those 3 lines are necessary because the bp.currentRetries should be "empty" for that particular topic partition because bp.needsRetry(msg) returned nil above:
https://github.com/Shopify/sarama/blob/bad67e5b089437bc73f8034a95017e77be71e8b0/async_producer.go#L903-L909

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true that bp.currentRetries[msg.Topic] is nil in this case (bp.currentRetries[msg.Topic][msg.Partition] is set to nil in the "syn" case above).
I decided to delete the key to ensure that the key does not exist to avoid confusion "key exist but nil" vs. "key does not exist" to mimic the other block of code above related to handling fin message.

TBH I'm not entirely sure about the purpose of the following code in handling syn case:

				if bp.currentRetries[msg.Topic] == nil {
					bp.currentRetries[msg.Topic] = make(map[int32]error)
				}
				bp.currentRetries[msg.Topic][msg.Partition] = nil

I don't see where it might be useful (for now).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I'm not entirely sure about the purpose of the following code in handling syn case:

				if bp.currentRetries[msg.Topic] == nil {
					bp.currentRetries[msg.Topic] = make(map[int32]error)
				}
				bp.currentRetries[msg.Topic][msg.Partition] = nil

I don't see where it might be useful (for now).

I believe when a syn message is seen the brokerProducer should:

  • initialize currentRetries to prevent a panic when accessing bp.currentRetries[topic][partition] (typically on first use of the brokerProducer).
  • clear a previous retries "state" for such topic partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @slaunay for the reply!
I don't see how this could lead to any panics 🤔
I've given another thought and I think it does not really matter (as far as I can tell) whether the retries "metadata" structure is cleared or not (I was trying to imagine what if the code is changed in the future so this branch is executed even if bp.currentRetries[msg.Topic][msg.Partition] is not nil). So I will update the code to minimise the changes.

Logger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dying state would make sense for the old "unhealthy" brokerProducer but might be confusing on a "healthy" brokerProducer.
When looking at the applicative logs, the user might see state change to [dying-1] and think that there is/was an issue.

The fix preserves the original retry logic of bubbling up the fin message so it would lean towards removing that log statement but if we want to know when we hit this (internal) condition we can use DebugLogger instead.

bp.broker.ID(), msg.retries, msg.Topic, msg.Partition)
continue
}

if bp.buffer.wouldOverflow(msg) {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
if err := bp.waitForSpace(msg, false); err != nil {
Expand Down
165 changes: 154 additions & 11 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,67 @@ import (

const TestMessage = "ABC THE MESSAGE"

func closeProducer(t *testing.T, p AsyncProducer) {
func closeProducerWithTimeout(t *testing.T, p AsyncProducer, timeout time.Duration) {
var wg sync.WaitGroup
p.AsyncClose()

closer := make(chan struct{})
timer := time.AfterFunc(timeout, func() {
t.Error("timeout")
close(closer)
})
defer timer.Stop()

wg.Add(2)
go func() {
for range p.Successes() {
t.Error("Unexpected message on Successes()")
defer wg.Done()
for {
select {
case <-closer:
return
case _, ok := <-p.Successes():
if !ok {
return
}
t.Error("Unexpected message on Successes()")
}
}
wg.Done()
}()
go func() {
for msg := range p.Errors() {
t.Error(msg.Err)
defer wg.Done()
for {
select {
case <-closer:
return
case msg, ok := <-p.Errors():
if !ok {
return
}
t.Error(msg.Err)
}
}
wg.Done()
}()
wg.Wait()
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
func closeProducer(t *testing.T, p AsyncProducer) {
closeProducerWithTimeout(t, p, 5*time.Minute)
}

func expectResultsWithTimeout(t *testing.T, p AsyncProducer, successes, errors int, timeout time.Duration) {
t.Helper()
expect := successes + errors
defer func() {
if successes != 0 || errors != 0 {
t.Error("Unexpected successes", successes, "or errors", errors)
}
}()
timer := time.NewTimer(timeout)
defer timer.Stop()
for expect > 0 {
select {
case <-timer.C:
return
case msg := <-p.Errors():
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
Expand All @@ -62,9 +98,10 @@ func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
}
}
}
if successes != 0 || errors != 0 {
t.Error("Unexpected successes", successes, "or errors", errors)
}
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
expectResultsWithTimeout(t, p, successes, errors, 5*time.Minute)
}

type testPartitioner chan *int32
Expand Down Expand Up @@ -693,6 +730,112 @@ func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
closeProducer(t, producer)
}

func TestAsyncProducerBrokerRestart(t *testing.T) {
// Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

var leaderLock sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the leaderLock is used to synchronize access to leader.BrokerID().
You can probably create a variable instead:

leaderID := leader.BrokerID()

as the same ID is used when the leader is closed and recreated.
You can even make it simpler by caching the metadataLeader response and just return it in the function:

metadataLeader := new(MetadataResponse)
...
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
  return metadataLeader
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to guard leader instance when accessing .Addr().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I thought like some other unit tests the same address was reused with leader = NewMockBrokerAddr(t, 2, leaderAddr) to stop/start but moving to a different port works too.


// The seed broker only handles Metadata request
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
leaderLock.Lock()
defer leaderLock.Unlock()
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return metadataLeader
})

emptyValues := 0

produceRequestTest := func(req *request) {
preq := req.body.(*ProduceRequest)
if batch := preq.records["my_topic"][0].RecordBatch; batch != nil {
for _, record := range batch.Records {
if len(record.Value) == 0 {
emptyValues++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As produceRequestTest is invoked from different goroutines incrementing emptyValues here and reading the value in the main test function goroutine is probably not "thread" safe.

Accessing emptyValues with atomic.AddInt32/atomic.LoadInt32 would be "thread" safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will update this.

}
}
}
if batch := preq.records["my_topic"][0].MsgSet; batch != nil {
for _, record := range batch.Messages {
if len(record.Msg.Value) == 0 {
emptyValues++
}
}
}
}

leader.setHandler(func(req *request) (res encoderWithHeader) {
produceRequestTest(req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more suitable name for that function might be:

Suggested change
produceRequestTest(req)
countRecordsWithEmptyValue(req)


time.Sleep(50 * time.Millisecond)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
return prodSuccess
})

config := NewTestConfig()
config.Producer.Retry.Backoff = time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being a unit test, we can probably speed the runtime by using the default 250ms backoff.
It still fails consistently on main but succeeds after ~600ms vs ~2.2s (with 1s backoff).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't play much with this setting, I just set this value reasonably high to ensure that the test is not flaky.

config.Producer.Flush.MaxMessages = 1
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 10

producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var wg sync.WaitGroup

pushMsg := func() {
defer wg.Done()
for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
time.Sleep(50 * time.Millisecond)
}
}

wg.Add(1)
go pushMsg()

for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can reproduce the failure even if the other time.Sleep in pushMsg and leader.setHandler are commented out but not when I comment this particular call.

👍 So this seems to be the crux of reproducing the race condition by sending records in bursts while restarting the (mock) broker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the whole point is to have different "generations" of the retries.


wg.Add(1)
go pushMsg()
}

leader.Close()
leaderLock.Lock()
leader = NewMockBroker(t, 2)
leaderLock.Unlock()
leader.setHandler(func(req *request) (res encoderWithHeader) {
produceRequestTest(req)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
})

wg.Wait()

expectResultsWithTimeout(t, producer, 40, 00, 10*time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
expectResultsWithTimeout(t, producer, 40, 00, 10*time.Second)
expectResultsWithTimeout(t, producer, 40, 0, 10*time.Second)


seedBroker.Close()
leader.Close()

closeProducerWithTimeout(t, producer, 5*time.Second)

if emptyValues > 0 {
t.Fatalf("%d empty values", emptyValues)
}
}

func TestAsyncProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

Expand Down