From e61acc12b0b8a26b3be685e40cb0c5baec6bff05 Mon Sep 17 00:00:00 2001 From: Rajan Date: Fri, 20 Jan 2017 15:00:42 -0800 Subject: [PATCH] Fix consumer redelivery should not clear availablePermits (#170) --- .../api/SimpleProducerConsumerTest.java | 83 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 3 +- 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java index 04264a24fd887..c8bcf69e80a7c 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java @@ -1773,4 +1773,87 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile } } + @Test + public void testRedeliveryFailOverConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + + final int receiverQueueSize = 10; + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(receiverQueueSize); + conf.setSubscriptionType(SubscriptionType.Failover); + // Only subscribe consumer + ConsumerImpl consumer = (ConsumerImpl) pulsarClient + .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic", + producerConf); + + // (1) First round to produce-consume messages + int consumeMsgInParts = 4; + for (int i = 0; i < receiverQueueSize; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + Thread.sleep(10); + } + // (1.a) consume first consumeMsgInParts msgs and trigger redeliver + Message msg = null; + List messages1 = Lists.newArrayList(); + for (int i = 0; i < consumeMsgInParts; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages1.add(msg); + consumer.acknowledge(msg); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + assertEquals(messages1.size(), consumeMsgInParts); + consumer.redeliverUnacknowledgedMessages(); + + // (1.b) consume second consumeMsgInParts msgs and trigger redeliver + messages1.clear(); + for (int i = 0; i < consumeMsgInParts; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages1.add(msg); + consumer.acknowledge(msg); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + assertEquals(messages1.size(), consumeMsgInParts); + consumer.redeliverUnacknowledgedMessages(); + + // (2) Second round to produce-consume messages + for (int i = 0; i < receiverQueueSize; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + Thread.sleep(100); + } + + int remainingMsgs = (2 * receiverQueueSize) - (2 * consumeMsgInParts); + messages1.clear(); + for (int i = 0; i < remainingMsgs; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + messages1.add(msg); + consumer.acknowledge(msg); + log.info("Received message: " + new String(msg.getData())); + } else { + break; + } + } + assertEquals(messages1.size(), remainingMsgs); + + producer.close(); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + + } + } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index ded3d7b5251d5..2b18bb8dcc71d 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -902,13 +902,12 @@ public void redeliverUnacknowledgedMessages() { synchronized (this) { currentSize = incomingMessages.size(); incomingMessages.clear(); - availablePermits.set(0); unAckedMessageTracker.clear(); batchMessageAckTracker.clear(); } cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise()); if (currentSize > 0) { - sendFlowPermitsToBroker(cnx, currentSize); + increaseAvailablePermits(cnx, currentSize); } if (log.isDebugEnabled()) { log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic,