-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simpler offset management, fixed minor race #1127
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eapache pls see comments below
offset_manager.go
Outdated
om.pomsMu.Lock() | ||
defer om.pomsMu.Unlock() | ||
|
||
for _, topicManagers := range om.poms { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should global commit errors be propagated to each individual POM or just to the first one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 all of them still, I think
CoordinatorHost: "127.0.0.1", | ||
CoordinatorPort: newCoordinator.Port(), | ||
}) | ||
// No error, no need to refresh coordinator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the logic to only RefreshCoordinator
after errors and not on every commit attempt.
pom.lock.Lock() | ||
defer pom.lock.Unlock() | ||
|
||
for pom.dirty { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think this has actually never worked properly, as you are creating a deadlock here. The state is blocked as long as pom.dirty
, but the flag will never actually change to false
nor will pom.clean
ever fire, because pom.updateCommitted
will never be able to acquire pom.lock.Lock()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦♂️ nice find
offset_manager.go
Outdated
poms map[string]map[int32]*partitionOffsetManager | ||
boms map[*Broker]*brokerOffsetManager | ||
broker *Broker | ||
brokerMu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For code style consistency I'd prefer these be called brokerLock
, pomsLock
, etc.
pom.lock.Lock() | ||
defer pom.lock.Unlock() | ||
|
||
for pom.dirty { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦♂️ nice find
offset_manager.go
Outdated
|
||
delete(om.boms, bom.broker) | ||
for _, topicManagers := range om.poms { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for organization I'd prefer handleResponse
or something as a separate method to mirror constructRequest
offset_manager.go
Outdated
om.asyncClosePOMs() | ||
|
||
// flush one last time | ||
for retries := om.conf.Metadata.Retry.Max; true; { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense to define a separate set of Retry configurations in Consumer.Offsets
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, the main reason why this would fail is because of a coordinator change which means we need to refresh metadata and try again. I can add more config options but it seems an overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good point, that wasn't obvious. However, the metadata refresh itself will retry this many times internal to the client, so by using this value here you're technically enabling the square of it as retries? You're also not sleeping the way the Metadata.Retry
struct might imply.
I still think this might make the most sense as Consumer.Offsets.MaxShutdownAttempts
or something. It's not the same thing as refreshing metadata.
select { | ||
case <-om.ticker.C: | ||
om.flushToBroker() | ||
om.releasePOMs(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a while to figure out what this was doing, and I'm not much a fan of this pattern. An updateSubscriptions
channel (as in the old BOM) seems easier to follow and less likely to result in races/deadlocks than polling a piece of state in all the POMs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about updateSubscriptions
but it's not making things any more trivial in my opinion. The problem is that the eviction of POMs is closely linked to the flush cycles and we still need to coordinate those on OffsetManager.Close
, i.e. keep retrying flush until either Retry.Max
is reached or until all POMs are clean. I don't see how to implement this with an updateSubscriptions
channel, we would have to do something like:
func (om *offsetManager) Close() error {
om.closeOnce.Do(func() {
// exit the mainLoop
close(om.closing)
<-om.closed
// mark all POMs as closed
om.asyncClosePOMs()
deadline := time.NewTimer(10*time.Second) // just an example
defer deadline.Stop()
retry := time.NewTicker(100*time.Millisecond) // just an example
defer retry.Stop()
FlushLoop:
for {
om.flushToBroker()
select {
case <-deadline.C:
break FlushLoop
case pom := <-om.updateSubscription:
om.pomsLock.Lock()
delete(om.poms[pom.topic], pom.partition)
if len(om.poms[pom.topic]) == 0 {
delete(om.poms, pom.topic)
}
remaining := len(om.poms)
om.pomsLock.Unlock()
if remaining == 0 {
break FlushLoop
}
case <-retry.C:
}
}
// TODO: abandon any remaining POMs
om.brokerLock.Lock()
om.broker = nil
om.brokerLock.Unlock()
})
return nil
}
Even with helper method, I don't think this is easier to follow than the polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For OffsetManager.Close
I think you're overthinking things. Instead of exiting the mainloop and them manually looping flush, just call pom.AsyncClose()
on all of them (as you already do), set a flag and then wait for a waitgroup. The mainloop can respect the flag to count down the number of remaining flushes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Letting mainLoop
doing the work after calling Close()
could delay things significantly, depending on CommitInterval
. I'm not a fan of this, I prefer exiting the mainLoop
to attempt an immediate final flush (with instant retries). The current approach will also simplify the integration with ClusterClient
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. It's not clear to me how this will simplify the ClusterClient
integration, but I don't feel too strongly about this.
case ErrNotLeaderForPartition, ErrLeaderNotAvailable, | ||
ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer: | ||
// not a critical error, we just need to redispatch | ||
om.releaseCoordinator(broker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In cases like this I wonder if we should be doing something to trigger another dispatch before the next timer tick? There's no point in waiting really.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not the end of the world if we miss a cycle and wait for mainLoop
to trigger the next flush. At the same time, we currently already retry immediately if triggered by Close
offset_manager.go
Outdated
om.pomsMu.Lock() | ||
defer om.pomsMu.Unlock() | ||
|
||
for _, topicManagers := range om.poms { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 all of them still, I think
I'm pretty happy with this now, if it suits your needs. CI has detected a race condition though. |
Nice, thanks! |
This is a necessary prerequisite for #1099, please also see the inline comments in the diff.