Skip to content

Commit

Permalink
Synchronized producer cleanup during close operation
Browse files Browse the repository at this point in the history
Fixes apache#33: Intermittent test failure on BacklogQuotaManagerTest.testAheadProducerOnHoldTimeout
  • Loading branch information
merlimat committed Sep 22, 2016
1 parent 7d317bd commit d5a4f0d
Showing 1 changed file with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,13 @@ public CompletableFuture<Void> closeAsync() {

if (!isConnected()) {
log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName);
state.set(State.Closed);
client.cleanupProducer(this);
pendingMessages.forEach(msg -> msg.cmd.release());
synchronized (this) {
state.set(State.Closed);
client.cleanupProducer(this);
pendingMessages.forEach(msg -> msg.cmd.release());
pendingMessages.clear();
}

return CompletableFuture.completedFuture(null);
}

Expand All @@ -398,9 +402,13 @@ public CompletableFuture<Void> closeAsync() {
if (exception == null || !cnx.ctx().channel().isActive()) {
// Either we've received the success response for the close producer command from the broker, or the
// connection did break in the meantime. In any case, the producer is gone.
log.info("[{}] [{}] Closed Producer", topic, producerName);
state.set(State.Closed);
pendingMessages.forEach(msg -> msg.cmd.release());
synchronized (ProducerImpl.this) {
log.info("[{}] [{}] Closed Producer", topic, producerName);
state.set(State.Closed);
pendingMessages.forEach(msg -> msg.cmd.release());
pendingMessages.clear();
}

closeFuture.complete(null);
client.cleanupProducer(this);
} else {
Expand Down

0 comments on commit d5a4f0d

Please sign in to comment.