diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 030d74a014f29..ed4e70bfd2953 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -20,12 +20,12 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; @@ -126,7 +126,7 @@ public MessageDupUnknownException() { private final int maxNumberOfProducers; // Map used to track the inactive producer along with the timestamp of their last activity - private final Map inactiveProducers = new HashMap<>(); + private final Map inactiveProducers = new ConcurrentHashMap<>(); private final String replicatorPrefix; @@ -436,7 +436,7 @@ private boolean isDeduplicationEnabled() { /** * Topic will call this method whenever a producer connects. */ - public synchronized void producerAdded(String producerName) { + public void producerAdded(String producerName) { // Producer is no-longer inactive inactiveProducers.remove(producerName); } @@ -444,7 +444,7 @@ public synchronized void producerAdded(String producerName) { /** * Topic will call this method whenever a producer disconnects. */ - public synchronized void producerRemoved(String producerName) { + public void producerRemoved(String producerName) { // Producer is no-longer active inactiveProducers.put(producerName, System.currentTimeMillis()); } @@ -456,7 +456,7 @@ public synchronized void purgeInactiveProducers() { long minimumActiveTimestamp = System.currentTimeMillis() - TimeUnit.MINUTES .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()); - Iterator> mapIterator = inactiveProducers.entrySet().iterator(); + Iterator> mapIterator = inactiveProducers.entrySet().iterator(); boolean hasInactive = false; while (mapIterator.hasNext()) { java.util.Map.Entry entry = mapIterator.next(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index b4152b143ab6c..19583a4455ead 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -36,6 +36,7 @@ import io.netty.channel.EventLoopGroup; import java.lang.reflect.Field; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -174,7 +175,7 @@ public void testInactiveProducerRemove() throws Exception { Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers"); field.setAccessible(true); - Map inactiveProducers = (Map) field.get(messageDeduplication); + Map inactiveProducers = (ConcurrentHashMap) field.get(messageDeduplication); String producerName1 = "test1"; when(publishContext.getHighestSequenceId()).thenReturn(2L);