Skip to content

Commit

Permalink
[fix][broker] make closing producer thread-safe while updating recent…
Browse files Browse the repository at this point in the history
…ly closed producer (apache#21355)
  • Loading branch information
rdhabalia authored and vraulji committed Oct 16, 2023
1 parent 3cd07b4 commit 1be0afb
Showing 1 changed file with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -56,6 +55,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -186,7 +186,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final BrokerService service;
private final SchemaRegistryService schemaService;
private final String listenerName;
private final HashMap<Long, Long> recentlyClosedProducers;
private final Map<Long, Long> recentlyClosedProducers;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private final boolean enableSubscriptionPatternEvaluation;
Expand Down Expand Up @@ -291,7 +291,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.recentlyClosedProducers = new HashMap<>();
this.recentlyClosedProducers = new ConcurrentHashMap<>();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
Expand Down Expand Up @@ -2984,7 +2984,6 @@ protected void interceptCommand(BaseCommand command) throws InterceptException {

@Override
public void closeProducer(Producer producer) {
assert ctx.executor().inEventLoop();
// removes producer-connection from map and send close command to producer
safelyRemoveProducer(producer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
Expand Down

0 comments on commit 1be0afb

Please sign in to comment.