diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java index e7f86cfa2fe49..8b460262f5b1e 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java @@ -371,9 +371,13 @@ public CompletableFuture closeAsync() { if (!isConnected()) { log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName); - state.set(State.Closed); - client.cleanupProducer(this); - pendingMessages.forEach(msg -> msg.cmd.release()); + synchronized (this) { + state.set(State.Closed); + client.cleanupProducer(this); + pendingMessages.forEach(msg -> msg.cmd.release()); + pendingMessages.clear(); + } + return CompletableFuture.completedFuture(null); } @@ -398,9 +402,13 @@ public CompletableFuture closeAsync() { if (exception == null || !cnx.ctx().channel().isActive()) { // Either we've received the success response for the close producer command from the broker, or the // connection did break in the meantime. In any case, the producer is gone. - log.info("[{}] [{}] Closed Producer", topic, producerName); - state.set(State.Closed); - pendingMessages.forEach(msg -> msg.cmd.release()); + synchronized (ProducerImpl.this) { + log.info("[{}] [{}] Closed Producer", topic, producerName); + state.set(State.Closed); + pendingMessages.forEach(msg -> msg.cmd.release()); + pendingMessages.clear(); + } + closeFuture.complete(null); client.cleanupProducer(this); } else {