diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index 6bd11de5a2f88..5a48253ebf006 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -37,7 +37,6 @@ import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.awaitility.Awaitility; import org.mockito.AdditionalAnswers; import org.mockito.Mockito; import org.slf4j.Logger; @@ -193,42 +192,4 @@ public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer() } Assert.assertEquals(numPartitions * numMessages, receivedCount); } - - @Test - public void testBatchReceiveAckTimeout() - throws PulsarAdminException, PulsarClientException { - String topicName = newTopicName(); - int numPartitions = 2; - int numMessages = 100000; - admin.topics().createPartitionedTopic(topicName, numPartitions); - - @Cleanup - Producer producer = pulsarClient.newProducer(Schema.INT64) - .topic(topicName) - .enableBatching(false) - .blockIfQueueFull(true) - .create(); - - @Cleanup - Consumer consumer = pulsarClient - .newConsumer(Schema.INT64) - .topic(topicName) - .receiverQueueSize(numMessages) - .batchReceivePolicy( - BatchReceivePolicy.builder().maxNumMessages(1).timeout(2, TimeUnit.SECONDS).build() - ).ackTimeout(1000, TimeUnit.MILLISECONDS) - .subscriptionName(methodName) - .subscribe(); - - producer.newMessage() - .value(1l) - .send(); - - // first batch receive - Assert.assertEquals(consumer.batchReceive().size(), 1); - // Not ack, trigger redelivery this message. - Awaitility.await().untilAsserted(() -> { - Assert.assertEquals(consumer.batchReceive().size(), 1); - }); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index da84afc567176..a972173b5c746 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import java.lang.reflect.Field; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -32,17 +33,17 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -225,7 +226,6 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException final String topic = "testDoNotRedeliveryMarkDeleteMessages"; final String subName = "my-sub"; - @Cleanup Consumer consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName(subName) @@ -233,7 +233,6 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException .ackTimeout(1, TimeUnit.SECONDS) .subscribe(); - @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .enableBatching(false) @@ -262,15 +261,12 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{ final String topic = "testRedeliveryAddEpoch"; final String subName = "my-sub"; - - @Cleanup - ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + ConsumerBase consumer = ((ConsumerBase) pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subName) .subscriptionType(SubscriptionType.Failover) .subscribe()); - @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(enableBatch) @@ -279,9 +275,14 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{ String test1 = "Pulsar1"; String test2 = "Pulsar2"; String test3 = "Pulsar3"; + producer.send(test1); + + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopics() + .get(TopicName.get("persistent://public/default/" + topic).toString()).get().get(); + PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = + (PersistentDispatcherSingleActiveConsumer) persistentTopic.getSubscription(subName).getDispatcher(); consumer.setConsumerEpoch(1); - producer.send(test1); Message message = consumer.receive(3, TimeUnit.SECONDS); assertNull(message); consumer.redeliverUnacknowledgedMessages(); @@ -308,113 +309,18 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{ message = consumer.receive(3, TimeUnit.SECONDS); assertNull(message); - ConnectionHandler connectionHandler = consumer.getConnectionHandler(); - connectionHandler.cnx().channel().close(); - - consumer.grabCnx(); - - message = consumer.receive(3, TimeUnit.SECONDS); - assertNotNull(message); - assertEquals(message.getValue(), test3); - } - - @Test(dataProvider = "enableBatch") - public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception { - final String topic = "testRedeliveryAddEpochAndPermits"; - final String subName = "my-sub"; - // set receive queue size is 4, and first send 4 messages, - // then call redeliver messages, assert receive msg num. - int receiveQueueSize = 4; - - @Cleanup - ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) - .topic(topic) - .receiverQueueSize(receiveQueueSize) - .autoScaledReceiverQueueSizeEnabled(false) - .subscriptionName(subName) - .subscriptionType(SubscriptionType.Failover) - .subscribe()); - - @Cleanup - Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(enableBatch) - .create(); - - consumer.setConsumerEpoch(1); - for (int i = 0; i < receiveQueueSize; i++) { - producer.send("pulsar" + i); - } - assertNull(consumer.receive(1, TimeUnit.SECONDS)); - - consumer.redeliverUnacknowledgedMessages(); - for (int i = 0; i < receiveQueueSize; i++) { - Message msg = consumer.receive(); - assertEquals("pulsar" + i, msg.getValue()); - } - } - - @Test(dataProvider = "enableBatch") - public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Exception{ - final String topic = "testBatchReceiveRedeliveryAddEpoch"; - final String subName = "my-sub"; - - @Cleanup - ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) - .topic(topic) - .subscriptionName(subName) - .batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000, TimeUnit.MILLISECONDS).build()) - .subscriptionType(SubscriptionType.Failover) - .subscribe()); - - @Cleanup - Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(enableBatch) - .create(); - - String test1 = "Pulsar1"; - String test2 = "Pulsar2"; - String test3 = "Pulsar3"; - - consumer.setConsumerEpoch(1); - producer.send(test1); - - Messages messages; - Message message; - - messages = consumer.batchReceive(); - assertEquals(messages.size(), 0); - consumer.redeliverUnacknowledgedMessages(); - messages = consumer.batchReceive(); - assertEquals(messages.size(), 1); - message = messages.iterator().next(); - consumer.acknowledgeCumulativeAsync(message).get(); - assertEquals(message.getValue(), test1); - - consumer.setConsumerEpoch(3); - producer.send(test2); - messages = consumer.batchReceive(); - assertEquals(messages.size(), 0); - consumer.redeliverUnacknowledgedMessages(); - messages = consumer.batchReceive(); - assertEquals(messages.size(), 1); - message = messages.iterator().next(); - assertEquals(message.getValue(), test2); - consumer.acknowledgeCumulativeAsync(message).get(); + Field field = consumer.getClass().getDeclaredField("connectionHandler"); + field.setAccessible(true); + ConnectionHandler connectionHandler = (ConnectionHandler) field.get(consumer); - consumer.setConsumerEpoch(6); - producer.send(test3); - messages = consumer.batchReceive(); - assertEquals(messages.size(), 0); + field = connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER"); + field.setAccessible(true); - ConnectionHandler connectionHandler = consumer.getConnectionHandler(); connectionHandler.cnx().channel().close(); - consumer.grabCnx(); - messages = consumer.batchReceive(); - assertEquals(messages.size(), 1); - message = messages.iterator().next(); + ((ConsumerImpl) consumer).grabCnx(); + message = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(message); assertEquals(message.getValue(), test3); } @@ -423,21 +329,19 @@ public static Object[][] enableBatch() { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } + @Test(dataProvider = "enableBatch") public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exception{ final String topic = "testMultiConsumerRedeliveryAddEpoch"; final String subName = "my-sub"; admin.topics().createPartitionedTopic(topic, 5); final int messageNumber = 50; - - @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subName) .subscriptionType(SubscriptionType.Failover) .subscribe(); - @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(enableBatch) @@ -478,66 +382,4 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce message = consumer.receive(5, TimeUnit.SECONDS); assertNull(message); } - - @Test(dataProvider = "enableBatch", invocationCount = 10) - public void testMultiConsumerBatchRedeliveryAddEpoch(boolean enableBatch) throws Exception{ - - final String topic = "testMultiConsumerBatchRedeliveryAddEpoch"; - final String subName = "my-sub"; - admin.topics().createPartitionedTopic(topic, 5); - final int messageNumber = 50; - - @Cleanup - Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic(topic) - .batchReceivePolicy(BatchReceivePolicy.builder().timeout(2, TimeUnit.SECONDS).build()) - .subscriptionName(subName) - .subscriptionType(SubscriptionType.Failover) - .subscribe(); - - @Cleanup - Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(enableBatch) - .create(); - - for (int i = 0; i < messageNumber; i++) { - producer.send("" + i); - } - - int receiveNum = 0; - while (receiveNum < messageNumber) { - receiveNum += consumer.batchReceive().size(); - } - - // redeliverUnacknowledgedMessages once - consumer.redeliverUnacknowledgedMessages(); - - receiveNum = 0; - while (receiveNum < messageNumber) { - Messages messages = consumer.batchReceive(); - receiveNum += messages.size(); - for (Message message : messages) { - assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 1); - } - } - - // can't receive message again - assertEquals(consumer.batchReceive().size(), 0); - - // redeliverUnacknowledgedMessages twice - consumer.redeliverUnacknowledgedMessages(); - - receiveNum = 0; - while (receiveNum < messageNumber) { - Messages messages = consumer.batchReceive(); - receiveNum += messages.size(); - for (Message message : messages) { - assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 2); - } - } - - // can't receive message again - assertEquals(consumer.batchReceive().size(), 0); - } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index ec4051a359f49..7bc1e3d1a185d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -907,7 +907,7 @@ protected void notifyPendingBatchReceivedCallBack() { if (opBatchReceive == null) { return; } - notifyPendingBatchReceivedCallBack(opBatchReceive.future); + notifyPendingBatchReceivedCallBack(opBatchReceive); } private boolean hasNextBatchReceive() { @@ -932,7 +932,7 @@ private OpBatchReceive nextBatchReceive() { return opBatchReceive; } - protected final void notifyPendingBatchReceivedCallBack(CompletableFuture> batchReceiveFuture) { + protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatchReceive) { MessagesImpl messages = getNewMessagesImpl(); Message msgPeeked = incomingMessages.peek(); while (msgPeeked != null && messages.canAdd(msgPeeked)) { @@ -944,7 +944,8 @@ protected final void notifyPendingBatchReceivedCallBack(CompletableFuture> future, Messages messages) { @@ -1172,7 +1173,7 @@ protected boolean isValidConsumerEpoch(MessageImpl message) { || getSubType() == CommandSubscribe.SubType.Exclusive) && message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH && message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) { - log.info("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], " + log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], " + "consumerEpoch : [{}]", topic, message.getMessageId(), message.getConsumerEpoch(), consumerEpoch); message.release(); message.recycle(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 3063426ea69af..e95633da6040a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -64,7 +64,6 @@ import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.Getter; -import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; @@ -454,6 +453,9 @@ protected Message internalReceive() throws PulsarClientException { } message = incomingMessages.take(); messageProcessed(message); + if (!isValidConsumerEpoch(message)) { + return internalReceive(); + } return beforeConsume(message); } catch (InterruptedException e) { stats.incrementNumReceiveFailed(); @@ -461,6 +463,10 @@ protected Message internalReceive() throws PulsarClientException { } } + private boolean isValidConsumerEpoch(Message message) { + return isValidConsumerEpoch((MessageImpl) message); + } + @Override protected CompletableFuture> internalReceiveAsync() { CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler(); @@ -473,6 +479,11 @@ protected CompletableFuture> internalReceiveAsync() { cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { messageProcessed(message); + if (!isValidConsumerEpoch(message)) { + pendingReceives.add(result); + cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); + return; + } result.complete(beforeConsume(message)); } }); @@ -483,6 +494,7 @@ protected CompletableFuture> internalReceiveAsync() { @Override protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarClientException { Message message; + long callTime = System.nanoTime(); try { if (incomingMessages.isEmpty()) { expectMoreIncomingMessages(); @@ -492,6 +504,15 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC return null; } messageProcessed(message); + if (!isValidConsumerEpoch(message)) { + long executionTime = System.nanoTime() - callTime; + long timeoutInNanos = unit.toNanos(timeout); + if (executionTime >= timeoutInNanos) { + return null; + } else { + return internalReceive(timeoutInNanos - executionTime, TimeUnit.NANOSECONDS); + } + } return beforeConsume(message); } catch (InterruptedException e) { State state = getState(); @@ -525,7 +546,22 @@ protected CompletableFuture> internalBatchReceiveAsync() { CompletableFuture> result = cancellationHandler.createFuture(); internalPinnedExecutor.execute(() -> { if (hasEnoughMessagesForBatchReceive()) { - notifyPendingBatchReceivedCallBack(result); + MessagesImpl messages = getNewMessagesImpl(); + Message msgPeeked = incomingMessages.peek(); + while (msgPeeked != null && messages.canAdd(msgPeeked)) { + Message msg = incomingMessages.poll(); + if (msg != null) { + messageProcessed(msg); + if (!isValidConsumerEpoch(msg)) { + msgPeeked = incomingMessages.peek(); + continue; + } + Message interceptMsg = beforeConsume(msg); + messages.add(interceptMsg); + } + msgPeeked = incomingMessages.peek(); + } + result.complete(messages); } else { expectMoreIncomingMessages(); OpBatchReceive opBatchReceive = OpBatchReceive.of(result); @@ -1189,10 +1225,6 @@ private void executeNotifyCallback(final MessageImpl message) { // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. // if asyncReceive is waiting then notify callback without adding to incomingMessages queue internalPinnedExecutor.execute(() -> { - if (!isValidConsumerEpoch(message)) { - increaseAvailablePermits(cnx()); - return; - } if (hasNextPendingReceive()) { notifyPendingReceivedCallback(message, null); } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { @@ -1861,66 +1893,64 @@ public int numMessagesInQueue() { return incomingMessages.size(); } - public CompletableFuture internalRedeliverUnacknowledgedMessages() { - return CompletableFuture.runAsync(() -> { - // First : synchronized in order to handle consumer reconnect produce race condition, when broker receive - // redeliverUnacknowledgedMessages and consumer have not be created and then receive reconnect epoch - // change the broker is smaller than the client epoch, this will cause client epoch smaller - // than broker epoch forever. client will not receive message anymore. - // Second : we should synchronized `ClientCnx cnx = cnx()` to prevent use old cnx to - // send redeliverUnacknowledgedMessages to a old broker - synchronized (ConsumerImpl.this) { - ClientCnx cnx = cnx(); - // V1 don't support redeliverUnacknowledgedMessages - if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) { - if ((getState() == State.Connecting)) { - log.warn("[{}] Client Connection needs to be established " - + "for redelivery of unacknowledged messages", this); - } else { - log.warn("[{}] Reconnecting the client to redeliver the messages.", this); - cnx.ctx().close(); - } - - return; + @Override + public void redeliverUnacknowledgedMessages() { + // First : synchronized in order to handle consumer reconnect produce race condition, when broker receive + // redeliverUnacknowledgedMessages and consumer have not be created and + // then receive reconnect epoch change the broker is smaller than the client epoch, this will cause client epoch + // smaller than broker epoch forever. client will not receive message anymore. + // Second : we should synchronized `ClientCnx cnx = cnx()` to + // prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker + synchronized (ConsumerImpl.this) { + ClientCnx cnx = cnx(); + // V1 don't support redeliverUnacknowledgedMessages + if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) { + if ((getState() == State.Connecting)) { + log.warn("[{}] Client Connection needs to be established " + + "for redelivery of unacknowledged messages", this); + } else { + log.warn("[{}] Reconnecting the client to redeliver the messages.", this); + cnx.ctx().close(); } - // we should increase epoch every time, because MultiTopicsConsumerImpl also increase it, - // we need to keep both epochs the same - if (conf.getSubscriptionType() == SubscriptionType.Failover - || conf.getSubscriptionType() == SubscriptionType.Exclusive) { - CONSUMER_EPOCH.incrementAndGet(this); + return; + } + + // clear local message + int currentSize = 0; + currentSize = incomingMessages.size(); + clearIncomingMessages(); + unAckedMessageTracker.clear(); + + // we should increase epoch every time, because MultiTopicsConsumerImpl also increase it, + // we need to keep both epochs the same + if (conf.getSubscriptionType() == SubscriptionType.Failover + || conf.getSubscriptionType() == SubscriptionType.Exclusive) { + CONSUMER_EPOCH.incrementAndGet(this); + } + // is channel is connected, we should send redeliver command to broker + if (cnx != null && isConnected(cnx)) { + cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages( + consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise()); + if (currentSize > 0) { + increaseAvailablePermits(cnx, currentSize); } - // clear local message - int currentSize = incomingMessages.size(); - clearIncomingMessages(); - unAckedMessageTracker.clear(); - // is channel is connected, we should send redeliver command to broker - if (cnx != null && isConnected(cnx)) { - cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages( - consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise()); - if (currentSize > 0) { - increaseAvailablePermits(cnx, currentSize); - } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic, - consumerName, currentSize); - } - } else { - log.warn("[{}] Send redeliver messages command but the client is reconnect or close, " - + "so don't need to send redeliver command to broker", this); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic, + consumerName, currentSize); } + } else { + log.warn("[{}] Send redeliver messages command but the client is reconnect or close, " + + "so don't need to send redeliver command to broker", this); } - }, internalPinnedExecutor); + } } - @SneakyThrows - @Override - public void redeliverUnacknowledgedMessages() { - try { - internalRedeliverUnacknowledgedMessages().get(); - } catch (ExecutionException e) { - throw e.getCause(); - } + public int clearIncomingMessagesAndGetMessageNumber() { + int messagesNumber = incomingMessages.size(); + clearIncomingMessages(); + unAckedMessageTracker.clear(); + return messagesNumber; } @Override @@ -1980,7 +2010,7 @@ protected void updateAutoScaleReceiverQueueHint() { @Override protected void completeOpBatchReceive(OpBatchReceive op) { - notifyPendingBatchReceivedCallBack(op.future); + notifyPendingBatchReceivedCallBack(op); } private CompletableFuture> getRedeliveryMessageIdData(List messageIds) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 3c50d85db4cac..1853be55f961f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -48,7 +48,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; @@ -342,6 +341,14 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { } } + // If message consumer epoch is smaller than consumer epoch present that + // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid. + // so we should release this message and receive again + private boolean isValidConsumerEpoch(Message message) { + return isValidConsumerEpoch(((MessageImpl) (((TopicMessageImpl) message)) + .getMessage())); + } + @Override public int minReceiverQueueSize() { int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize); @@ -364,6 +371,11 @@ protected Message internalReceive() throws PulsarClientException { message = incomingMessages.take(); decreaseIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); + if (!isValidConsumerEpoch(message)) { + resumeReceivingFromPausedConsumersIfNeeded(); + message.release(); + return internalReceive(); + } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); return message; @@ -375,6 +387,8 @@ protected Message internalReceive() throws PulsarClientException { @Override protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarClientException { Message message; + + long callTime = System.nanoTime(); try { if (incomingMessages.isEmpty()) { expectMoreIncomingMessages(); @@ -383,6 +397,16 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC if (message != null) { decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); + if (!isValidConsumerEpoch(message)) { + long executionTime = System.nanoTime() - callTime; + long timeoutInNanos = unit.toNanos(timeout); + if (executionTime >= timeoutInNanos) { + return null; + } else { + resumeReceivingFromPausedConsumersIfNeeded(); + return internalReceive(timeoutInNanos - executionTime, TimeUnit.NANOSECONDS); + } + } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); } resumeReceivingFromPausedConsumersIfNeeded(); @@ -413,7 +437,22 @@ protected CompletableFuture> internalBatchReceiveAsync() { CompletableFuture> result = cancellationHandler.createFuture(); internalPinnedExecutor.execute(() -> { if (hasEnoughMessagesForBatchReceive()) { - notifyPendingBatchReceivedCallBack(result); + MessagesImpl messages = getNewMessagesImpl(); + Message msgPeeked = incomingMessages.peek(); + while (msgPeeked != null && messages.canAdd(msgPeeked)) { + Message msg = incomingMessages.poll(); + if (msg != null) { + decreaseIncomingMessageSize(msg); + if (!isValidConsumerEpoch(msg)) { + msgPeeked = incomingMessages.peek(); + continue; + } + Message interceptMsg = beforeConsume(msg); + messages.add(interceptMsg); + } + msgPeeked = incomingMessages.peek(); + } + result.complete(messages); } else { expectMoreIncomingMessages(); OpBatchReceive opBatchReceive = OpBatchReceive.of(result); @@ -663,24 +702,17 @@ private ConsumerConfigurationData getInternalConsumerConfig() { return internalConsumerConfig; } - @SneakyThrows @Override public void redeliverUnacknowledgedMessages() { - List> futures = new ArrayList<>(consumers.size()); internalPinnedExecutor.execute(() -> { CONSUMER_EPOCH.incrementAndGet(this); consumers.values().stream().forEach(consumer -> { - futures.add(consumer.internalRedeliverUnacknowledgedMessages()); + consumer.redeliverUnacknowledgedMessages(); consumer.unAckedChunkedMessageIdSequenceMap.clear(); }); clearIncomingMessages(); unAckedMessageTracker.clear(); }); - try { - FutureUtil.waitForAll(futures).get(); - } catch (ExecutionException e) { - throw e.getCause(); - } resumeReceivingFromPausedConsumersIfNeeded(); } @@ -715,7 +747,7 @@ protected void updateAutoScaleReceiverQueueHint() { @Override protected void completeOpBatchReceive(OpBatchReceive op) { - notifyPendingBatchReceivedCallBack(op.future); + notifyPendingBatchReceivedCallBack(op); resumeReceivingFromPausedConsumersIfNeeded(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java index 64248da921f96..1405e279f8fe5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java @@ -60,7 +60,6 @@ public UnAckedMessageRedeliveryTracker(PulsarClientImpl client, ConsumerBase @Override public void run(Timeout t) throws Exception { writeLock.lock(); - Set messageIds = null; try { HashSet headPartition = redeliveryTimePartitions.removeFirst(); if (!headPartition.isEmpty()) { @@ -72,13 +71,9 @@ public void run(Timeout t) throws Exception { } headPartition.clear(); redeliveryTimePartitions.addLast(headPartition); - messageIds = getRedeliveryMessages(consumerBase); + triggerRedelivery(consumerBase); } finally { writeLock.unlock(); - if (messageIds != null && !messageIds.isEmpty()) { - consumerBase.onAckTimeoutSend(messageIds); - consumerBase.redeliverUnacknowledgedMessages(messageIds); - } timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS); } } @@ -98,29 +93,35 @@ private void addAckTimeoutMessages(UnackMessageIdWrapper messageIdWrapper) { } } - private Set getRedeliveryMessages(ConsumerBase consumerBase) { + private void triggerRedelivery(ConsumerBase consumerBase) { if (ackTimeoutMessages.isEmpty()) { - return null; + return; } Set messageIds = TL_MESSAGE_IDS_SET.get(); messageIds.clear(); - long now = System.currentTimeMillis(); - ackTimeoutMessages.forEach((messageId, timestamp) -> { - if (timestamp <= now) { - addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); - messageIds.add(messageId); + try { + long now = System.currentTimeMillis(); + ackTimeoutMessages.forEach((messageId, timestamp) -> { + if (timestamp <= now) { + addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); + messageIds.add(messageId); + } + }); + if (!messageIds.isEmpty()) { + log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size()); + Iterator iterator = messageIds.iterator(); + while (iterator.hasNext()) { + MessageId messageId = iterator.next(); + ackTimeoutMessages.remove(messageId); + } } - }); - if (!messageIds.isEmpty()) { - log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size()); - Iterator iterator = messageIds.iterator(); - while (iterator.hasNext()) { - MessageId messageId = iterator.next(); - ackTimeoutMessages.remove(messageId); + } finally { + if (messageIds.size() > 0) { + consumerBase.onAckTimeoutSend(messageIds); + consumerBase.redeliverUnacknowledgedMessages(messageIds); } } - return messageIds; } @Override