From 1e9fa9fa17b36bf6c7e35057725f412d5840a4fe Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 17 Oct 2016 14:46:24 -0700 Subject: [PATCH] Unack message tracker for pre-fetched messages (#68) --- .../api/SimpleProducerConsumerStatTest.java | 20 ++-- .../UnAcknowledgedMessagesTimeoutTest.java | 40 ++++---- .../pulsar/client/impl/ConsumerBase.java | 12 --- .../pulsar/client/impl/ConsumerImpl.java | 93 +++++++++---------- .../client/impl/PartitionedConsumerImpl.java | 34 +------ .../client/impl/UnAckedMessageTracker.java | 64 ++++++++++--- 6 files changed, 124 insertions(+), 139 deletions(-) diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerStatTest.java index 6b52558270594..1470b097ec271 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -63,7 +63,7 @@ public Object[][] batchMessageDelayMsProvider() { @DataProvider(name = "batch_with_timeout") public Object[][] ackTimeoutSecProvider() { - return new Object[][] { { 0, 0 }, { 0, 1 }, { 1000, 0 }, { 1000, 1 } }; + return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } }; } @Test(dataProvider = "batch_with_timeout") @@ -117,8 +117,6 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs, int ackTimeoutS public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeoutSec) throws Exception { log.info("-- Starting {} test --", methodName); ConsumerConfiguration conf = new ConsumerConfiguration(); - // Cumulative Ack-counter works if ackTimeOutTimer-task is enabled - boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0; conf.setSubscriptionType(SubscriptionType.Exclusive); if (ackTimeoutSec > 0) { conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS); @@ -137,7 +135,7 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeout Producer producer = pulsarClient.createProducer("persistent://my-property/tp1/my-ns/my-topic2", producerConf); List> futures = Lists.newArrayList(); - int numMessages = 5001; + int numMessages = 50; // Asynchronously produce messages for (int i = 0; i < numMessages; i++) { final String message = "my-message-" + i; @@ -155,9 +153,6 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeout for (int i = 0; i < numMessages; i++) { msg = consumer.receive(5, TimeUnit.SECONDS); String receivedMessage = new String(msg.getData()); - if (i % 1000 == 0) { - log.info("Received message: [{}]", receivedMessage); - } String expectedMessage = "my-message-" + i; testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); } @@ -169,7 +164,7 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeout Thread.sleep(2000); consumer.close(); producer.close(); - validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck); + validatingLogInfo(consumer, producer, batchMessageDelayMs == 0 && ackTimeoutSec > 0); log.info("-- Exiting {} test --", methodName); } @@ -178,8 +173,6 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs, throws Exception { log.info("-- Starting {} test --", methodName); ConsumerConfiguration conf = new ConsumerConfiguration(); - // Cumulative Ack-counter works if ackTimeOutTimer-task is enabled - boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0; conf.setSubscriptionType(SubscriptionType.Exclusive); if (ackTimeoutSec > 0) { conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS); @@ -230,7 +223,7 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs, Thread.sleep(5000); consumer.close(); producer.close(); - validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck); + validatingLogInfo(consumer, producer, batchMessageDelayMs == 0 && ackTimeoutSec > 0); log.info("-- Exiting {} test --", methodName); } @@ -331,7 +324,8 @@ public void testSendTimeout(int batchMessageDelayMs) throws Exception { log.info("-- Exiting {} test --", methodName); } - public void validatingLogInfo(Consumer consumer, Producer producer, boolean verifyAckCount) throws InterruptedException { + public void validatingLogInfo(Consumer consumer, Producer producer, boolean verifyAckCount) + throws InterruptedException { // Waiting for recording last stat info Thread.sleep(1000); ConsumerStats cStat = consumer.getStats(); @@ -340,7 +334,7 @@ public void validatingLogInfo(Consumer consumer, Producer producer, boolean veri assertEquals(pStat.getTotalBytesSent(), cStat.getTotalBytesReceived()); assertEquals(pStat.getTotalMsgsSent(), pStat.getTotalAcksReceived()); if (verifyAckCount) { - assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent()); + assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent()); } } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java index 766db54cb6244..925371c3eaaf3 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java @@ -37,8 +37,6 @@ import com.yahoo.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.client.api.SubscriptionType; -import com.yahoo.pulsar.client.impl.ConsumerBase; -import com.yahoo.pulsar.client.impl.MessageIdImpl; public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase { private static final long testTimeout = 90000; // 1.5 min @@ -87,7 +85,7 @@ public void testExclusiveSingleAckedNormalTopic() throws Exception { log.info("Consumer received : " + new String(message.getData())); message = consumer.receive(500, TimeUnit.MILLISECONDS); } - long size = ((ConsumerBase) consumer).getUnAckedMessageTracker().size(); + long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); assertEquals(size, totalMessages / 2); @@ -108,7 +106,7 @@ public void testExclusiveSingleAckedNormalTopic() throws Exception { message = consumer.receive(500, TimeUnit.MILLISECONDS); } while (message != null); - size = ((ConsumerBase) consumer).getUnAckedMessageTracker().size(); + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); assertEquals(size, 0); assertEquals(hSet.size(), totalMessages); @@ -148,12 +146,12 @@ public void testExclusiveCumulativeAckedNormalTopic() throws Exception { log.info("Message ID details " + ((MessageIdImpl) message.getMessageId()).toString()); message = consumer.receive(500, TimeUnit.MILLISECONDS); } - long size = ((ConsumerBase) consumer).getUnAckedMessageTracker().size(); + long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); assertEquals(size, totalMessages); log.info("Comulative Ack sent for " + new String(lastMessage.getData())); log.info("Message ID details " + ((MessageIdImpl) lastMessage.getMessageId()).toString()); consumer.acknowledgeCumulative(lastMessage); - size = ((ConsumerBase) consumer).getUnAckedMessageTracker().size(); + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); assertEquals(size, 0); message = consumer.receive((int) (2 * ackTimeOutMillis), TimeUnit.MILLISECONDS); assertEquals(message, null); @@ -362,7 +360,7 @@ public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException, ConsumerConfiguration conf = new ConsumerConfiguration(); conf.setReceiverQueueSize(7); conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - ConsumerBase consumer = (ConsumerBase) pulsarClient.subscribe(topicName, subscriptionName, conf); + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.subscribe(topicName, subscriptionName, conf); // 3. producer publish messages for (int i = 0; i < totalMessages; i++) { @@ -371,25 +369,21 @@ public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException, producer.send(message.getBytes()); } - // 4. Consumer receives the message - assertEquals(consumer.getUnAckedMessageTracker().size(), 0); - consumer.receive(); - assertEquals(consumer.getUnAckedMessageTracker().size(), 1); - Thread.sleep(ackTimeOutMillis); - consumer.receive(); - assertEquals(consumer.getUnAckedMessageTracker().size(), 2); Thread.sleep((long) (ackTimeOutMillis * 1.1)); - // Timeout should have been triggered here - // 5. Trying to receive messages again - assertEquals(consumer.getUnAckedMessageTracker().size(), 0); - Message message1 = consumer.receive(); + for (int i = 0; i < totalMessages - 1; i++) { + Message msg = consumer.receive(); + consumer.acknowledge(msg); + } + assertEquals(consumer.getUnAckedMessageTracker().size(), 1); - Thread.sleep(ackTimeOutMillis); - Message message2 = consumer.receive(); - assertEquals(consumer.getUnAckedMessageTracker().size(), 2); - consumer.acknowledge(message1); - consumer.acknowledge(message2); + + Message msg = consumer.receive(); + consumer.acknowledge(msg); + assertEquals(consumer.getUnAckedMessageTracker().size(), 0); + + Thread.sleep((long) (ackTimeOutMillis * 1.1)); + assertEquals(consumer.getUnAckedMessageTracker().size(), 0); } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java index aa360f6df6195..21393b4d9cd49 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java @@ -52,7 +52,6 @@ enum ConsumerType { protected final ExecutorService listenerExecutor; final BlockingQueue incomingMessages; protected final ConcurrentLinkedQueue> pendingReceives; - protected final UnAckedMessageTracker unAckedMessageTracker; protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, ExecutorService listenerExecutor, CompletableFuture subscribeFuture, boolean useGrowableQueue) { @@ -72,17 +71,6 @@ protected ConsumerBase(PulsarClientImpl client, String topic, String subscriptio } this.listenerExecutor = listenerExecutor; this.pendingReceives = Queues.newConcurrentLinkedQueue(); - if (conf.getAckTimeoutMillis() != 0) { - this.unAckedMessageTracker = new UnAckedMessageTracker(); - this.unAckedMessageTracker.start(client, this, conf.getAckTimeoutMillis()); - } else { - this.unAckedMessageTracker = null; - } - - } - - public UnAckedMessageTracker getUnAckedMessageTracker() { - return unAckedMessageTracker; } @Override diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index 6e3d8c2afcb7c..3c1f5718d5a16 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -76,9 +76,10 @@ public class ConsumerImpl extends ConsumerBase { private volatile boolean waitingOnReceiveForZeroQueueSize = false; private final ReadWriteLock lock = new ReentrantReadWriteLock(); - + private final ReadWriteLock zeroQueueLock; + private final UnAckedMessageTracker unAckedMessageTracker; private final ConcurrentSkipListMap batchMessageAckTracker; private final ConsumerStats stats; @@ -103,14 +104,26 @@ public class ConsumerImpl extends ConsumerBase { } else { stats = ConsumerStats.CONSUMER_STATS_DISABLED; } + if (conf.getReceiverQueueSize() <= 1) { zeroQueueLock = new ReentrantReadWriteLock(); } else { zeroQueueLock = null; } + + if (conf.getAckTimeoutMillis() != 0) { + this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis()); + } else { + this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED; + } + grabCnx(); } + public UnAckedMessageTracker getUnAckedMessageTracker() { + return unAckedMessageTracker; + } + @Override public CompletableFuture unsubscribeAsync() { if (state.get() == State.Closing || state.get() == State.Closed) { @@ -126,9 +139,8 @@ public CompletableFuture unsubscribeAsync() { cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> { cnx.removeConsumer(consumerId); log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.close(); - } + batchMessageAckTracker.clear(); + unAckedMessageTracker.close(); unsubscribeFuture.complete(null); state.set(State.Closed); }).exceptionally(e -> { @@ -158,9 +170,6 @@ protected Message internalReceive() throws PulsarClientException { try { message = incomingMessages.take(); messageProcessed(message); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - } return message; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -191,9 +200,6 @@ protected CompletableFuture internalReceiveAsync() { receiveMessages(cnx(), 1); } else if (message != null) { messageProcessed(message); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - } result.complete(message); } @@ -232,9 +238,6 @@ private Message fetchSingleMessageFromBroker() throws PulsarClientException { } } while (true); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - } stats.updateNumMsgsReceived(message); return message; } catch (InterruptedException e) { @@ -256,9 +259,6 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien message = incomingMessages.poll(timeout, unit); if (message != null) { messageProcessed(message); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - } } return message; } catch (InterruptedException e) { @@ -274,7 +274,7 @@ private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, Messag // get entry before this message and ack that message on broker MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message); if (lowerKey != null) { - NavigableMap entriesUpto = batchMessageAckTracker.headMap(lowerKey, true); + NavigableMap entriesUpto = batchMessageAckTracker.headMap(lowerKey, true); for (Object key : entriesUpto.keySet()) { entriesUpto.remove(key); } @@ -320,8 +320,8 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp batchMessageAckTracker.keySet().removeIf(m -> (m.compareTo(message) <= 0)); } batchMessageAckTracker.remove(message); - // increment Acknowledge-msg counter with number of messages in batch only if AckType is Individual. - // CumulativeAckType is handled while sending ack to broker + // increment Acknowledge-msg counter with number of messages in batch only if AckType is Individual. + // CumulativeAckType is handled while sending ack to broker if (ackType == AckType.Individual) { stats.incrementNumAcksSent(batchSize); } @@ -348,7 +348,7 @@ private void updateBatchAckTracker(MessageIdImpl message, AckType ackType) { } MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message); if (lowerKey != null) { - NavigableMap entriesUpto = batchMessageAckTracker.headMap(lowerKey, true); + NavigableMap entriesUpto = batchMessageAckTracker.headMap(lowerKey, true); for (Object key : entriesUpto.keySet()) { entriesUpto.remove(key); } @@ -361,12 +361,11 @@ private void updateBatchAckTracker(MessageIdImpl message, AckType ackType) { log.debug("[{}] [{}] no messages to clean up prior to message {}", subscription, consumerId, message); } } - } /** * helper method that returns current state of data structure used to track acks for batch messages - * + * * @return true if all batch messages have been acknowledged */ public boolean isBatchingAckTrackerEmpty() { @@ -392,7 +391,6 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack // other messages in batch are still pending ack. return CompletableFuture.completedFuture(null); } - } // if we got a cumulative ack on non batch message, check if any earlier batch messages need to be removed // from batch message tracker @@ -415,18 +413,13 @@ private CompletableFuture sendAcknowledge(MessageId messageId, AckType ack public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { if (ackType == AckType.Individual) { - if (unAckedMessageTracker != null) { - unAckedMessageTracker.remove(msgId); - } + unAckedMessageTracker.remove(msgId); // increment counter by 1 for non-batch msg if (!(messageId instanceof BatchMessageIdImpl)) { stats.incrementNumAcksSent(1); } } else if (ackType == AckType.Cumulative) { - if (unAckedMessageTracker != null) { - int ackedMessages = unAckedMessageTracker.removeMessagesTill(msgId); - stats.incrementNumAcksSent(ackedMessages); - } + stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId)); } ackFuture.complete(null); } else { @@ -457,9 +450,8 @@ void connectionOpened(final ClientCnx cnx) { requestId).thenRun(() -> { synchronized (ConsumerImpl.this) { incomingMessages.clear(); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.clear(); - } + unAckedMessageTracker.clear(); + batchMessageAckTracker.clear(); if (changeToReadyState()) { log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", topic, subscription, cnx.channel().remoteAddress(), consumerId); @@ -542,9 +534,8 @@ void connectionFailed(PulsarClientException exception) { @Override public CompletableFuture closeAsync() { if (state.get() == State.Closing || state.get() == State.Closed) { - if (unAckedMessageTracker != null) { - unAckedMessageTracker.close(); - } + batchMessageAckTracker.clear(); + unAckedMessageTracker.close(); return CompletableFuture.completedFuture(null); } @@ -552,9 +543,7 @@ public CompletableFuture closeAsync() { log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription); state.set(State.Closed); batchMessageAckTracker.clear(); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.close(); - } + unAckedMessageTracker.close(); client.cleanupConsumer(this); return CompletableFuture.completedFuture(null); } @@ -577,9 +566,7 @@ public CompletableFuture closeAsync() { log.info("[{}] [{}] Closed consumer", topic, subscription); state.set(State.Closed); batchMessageAckTracker.clear(); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.close(); - } + unAckedMessageTracker.close(); closeFuture.complete(null); client.cleanupConsumer(this); } else { @@ -598,13 +585,13 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC MessageMetadata msgMetadata = null; ByteBuf payload = headersAndPayload; - + if (!verifyChecksum(headersAndPayload, messageId)) { // discard message with checksum error discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch); return; } - + try { msgMetadata = Commands.parseMessageMetadata(payload); } catch (Throwable t) { @@ -631,6 +618,7 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC // Enqueue the message so that it can be retrieved when application calls receive() // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. // if asyncReceive is waiting then notify callback without adding to incomingMessages queue + unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); boolean asyncReceivedWaiting = !pendingReceives.isEmpty(); if ((conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) && !asyncReceivedWaiting) { incomingMessages.add(message); @@ -701,10 +689,6 @@ void notifyPendingReceivedCallback(final MessageImpl message, Exception exceptio CompletableFuture receivedFuture = pendingReceives.poll(); if (exception == null) { checkNotNull(message, "received message can't be null"); - // add message to unAckedMessage tracker - if (unAckedMessageTracker != null) { - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - } if (receivedFuture != null) { if (conf.getReceiverQueueSize() == 0) { // return message to receivedCallback @@ -736,6 +720,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc batchMessage, bitSet.cardinality(), bitSet.length()); } batchMessageAckTracker.put(batchMessage, bitSet); + unAckedMessageTracker.add(batchMessage); try { for (int i = 0; i < batchSize; ++i) { if (log.isDebugEnabled()) { @@ -837,7 +822,7 @@ private boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageI topic, subscription, messageId.getLedgerId(), messageId.getEntryId(), Long.toHexString(checksum), Integer.toHexString(computedChecksum)); return false; - } + } } return true; @@ -880,10 +865,18 @@ public int numMessagesInQueue() { public void redeliverUnacknowledgedMessages() { ClientCnx cnx = cnx(); if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getNumber()) { - if (unAckedMessageTracker != null) { + int currentSize = 0; + synchronized (this) { + currentSize = incomingMessages.size(); + incomingMessages.clear(); + availablePermits.set(0); unAckedMessageTracker.clear(); + batchMessageAckTracker.clear(); } cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise()); + if (currentSize > 0) { + receiveMessages(cnx, currentSize); + } return; } if (cnx == null || (state.get() == State.Connecting)) { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java index e13a2ede724e3..dae3bf1411398 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java @@ -114,9 +114,6 @@ protected Message internalReceive() throws PulsarClientException { Message message; try { message = incomingMessages.take(); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - } return message; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -129,9 +126,6 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien Message message; try { message = incomingMessages.poll(timeout, unit); - if (unAckedMessageTracker != null && message != null) { - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - } return message; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -150,9 +144,6 @@ protected CompletableFuture internalReceiveAsync() { if (message == null) { pendingReceives.add(result); } else { - if (unAckedMessageTracker != null) { - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - } result.complete(message); } } catch (InterruptedException e) { @@ -179,11 +170,7 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack } else { ConsumerImpl consumer = consumers.get(((MessageIdImpl) messageId).getPartitionIndex()); - return consumer.doAcknowledge(messageId, ackType).thenRun(() -> { - if (unAckedMessageTracker != null) { - unAckedMessageTracker.remove((MessageIdImpl) messageId); - } - }); + return consumer.doAcknowledge(messageId, ackType); } } @@ -208,9 +195,6 @@ public CompletableFuture unsubscribeAsync() { if (completed.decrementAndGet() == 0) { if (unsubscribeFail.get() == null) { state.set(State.Closed); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.close(); - } unsubscribeFuture.complete(null); log.info("[{}] [{}] Unsubscribed Partitioned Consumer", topic, subscription); } else { @@ -234,9 +218,6 @@ public CompletableFuture unsubscribeAsync() { public CompletableFuture closeAsync() { if (state.get() == State.Closing || state.get() == State.Closed) { - if (unAckedMessageTracker != null) { - unAckedMessageTracker.close(); - } return CompletableFuture.completedFuture(null); } state.set(State.Closing); @@ -253,9 +234,6 @@ public CompletableFuture closeAsync() { if (completed.decrementAndGet() == 0) { if (closeFail.get() == null) { state.set(State.Closed); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.close(); - } closeFuture.complete(null); log.info("[{}] [{}] Closed Partitioned Consumer", topic, subscription); client.cleanupConsumer(this); @@ -314,9 +292,6 @@ void messageReceived(Message message) { if (!pendingReceives.isEmpty()) { CompletableFuture receivedFuture = pendingReceives.poll(); listenerExecutor.execute(() -> receivedFuture.complete(message)); - if (unAckedMessageTracker != null) { - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - } // unlock if it is already locked if (shouldLock) { lock.readLock().unlock(); @@ -371,6 +346,9 @@ private ConsumerConfiguration getInternalConsumerConfig() { internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize()); internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType()); internalConsumerConfig.setConsumerName(consumerName); + if (conf.getAckTimeoutMillis() != 0) { + internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS); + } internalConsumerConfig.setMessageListener((consumer, msg) -> { if (msg != null) { messageReceived(msg); @@ -382,13 +360,9 @@ private ConsumerConfiguration getInternalConsumerConfig() { @Override public void redeliverUnacknowledgedMessages() { - if (unAckedMessageTracker != null) { - unAckedMessageTracker.clear(); - } for (ConsumerImpl c : consumers) { c.redeliverUnacknowledgedMessages(); } - } /** diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/UnAckedMessageTracker.java index 6c78669ef50fd..f41e2d86245a7 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/UnAckedMessageTracker.java @@ -30,12 +30,54 @@ public class UnAckedMessageTracker implements Closeable { private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class); - private ConcurrentOpenHashSet currentSet = new ConcurrentOpenHashSet(); - private ConcurrentOpenHashSet oldOpenSet = new ConcurrentOpenHashSet(); - private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock readLock = readWriteLock.readLock(); - private final Lock writeLock = readWriteLock.writeLock(); - private Timeout timeout = null; + private ConcurrentOpenHashSet currentSet; + private ConcurrentOpenHashSet oldOpenSet; + private final ReentrantReadWriteLock readWriteLock; + private final Lock readLock; + private final Lock writeLock; + private Timeout timeout; + + public static final UnAckedMessageTrackerDisabled UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled(); + + private static class UnAckedMessageTrackerDisabled extends UnAckedMessageTracker { + @Override + public void clear() { + } + + @Override + public boolean add(MessageIdImpl m) { + return true; + } + + @Override + public boolean remove(MessageIdImpl m) { + return true; + } + + @Override + public int removeMessagesTill(MessageIdImpl msgId) { + return 0; + } + + @Override + public void close() { + } + } + + public UnAckedMessageTracker() { + readWriteLock = null; + readLock = null; + writeLock = null; + } + + public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis) { + currentSet = new ConcurrentOpenHashSet(); + oldOpenSet = new ConcurrentOpenHashSet(); + readWriteLock = new ReentrantReadWriteLock(); + readLock = readWriteLock.readLock(); + writeLock = readWriteLock.writeLock(); + start(client, consumerBase, ackTimeoutMillis); + } public void start(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis) { this.stop(); @@ -52,7 +94,7 @@ public void run(Timeout timeout) throws Exception { }, ackTimeoutMillis, TimeUnit.MILLISECONDS); } - protected void toggle() { + private void toggle() { writeLock.lock(); try { ConcurrentOpenHashSet temp = currentSet; @@ -84,7 +126,7 @@ public boolean add(MessageIdImpl m) { } - public boolean isEmpty() { + boolean isEmpty() { readLock.lock(); try { return currentSet.isEmpty() && oldOpenSet.isEmpty(); @@ -102,7 +144,7 @@ public boolean remove(MessageIdImpl m) { } } - public long size() { + long size() { readLock.lock(); try { return currentSet.size() + oldOpenSet.size(); @@ -111,7 +153,7 @@ public long size() { } } - public boolean isAckTimeout() { + private boolean isAckTimeout() { readLock.lock(); try { return !oldOpenSet.isEmpty(); @@ -135,7 +177,7 @@ public int removeMessagesTill(MessageIdImpl msgId) { } } - public void stop() { + private void stop() { writeLock.lock(); try { if (timeout != null && !timeout.isCancelled()) {