From bb357161d4f091aa17dc4f32f9a16f72324a9d0b Mon Sep 17 00:00:00 2001 From: Rajan Date: Fri, 7 Oct 2016 10:17:25 -0700 Subject: [PATCH] Accept received-msg-ack from different consumer than received-consumer on shared-subscription (#52) --- .../yahoo/pulsar/broker/service/Consumer.java | 21 +++++- .../api/SimpleProducerConsumerTest.java | 69 +++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 18fd7af525d07..425c3484d5242 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -261,7 +261,7 @@ void messageAcked(CommandAck ack) { checkArgument(ack.getAckType() == AckType.Individual); // Only ack a single message - pendingAcks.remove(position); + removePendingAcks(position); subscription.acknowledgeMessage(position, AckType.Individual); } else { subscription.acknowledgeMessage(position, ack.getAckType()); @@ -328,6 +328,25 @@ public int hashCode() { return consumerName.hashCode() + 31 * cnx.hashCode(); } + /** + * first try to remove ack-position from the current_consumer's pendingAcks. + * if ack-message doesn't present into current_consumer's pendingAcks + * a. try to remove from other connected subscribed consumers (It happens when client + * tries to acknowledge message through different consumer under the same subscription) + * + * + * @param position + */ + private void removePendingAcks(PositionImpl position) { + if (!pendingAcks.remove(position)) { + for (Consumer consumer : subscription.getConsumers()) { + if (!consumer.equals(this) && consumer.getPendingAcks().remove(position)) { + break; + } + } + } + } + public ConcurrentOpenHashSet getPendingAcks() { return pendingAcks; } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java index 0c213bfd4fa60..cf4d73dc1127f 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java @@ -860,6 +860,75 @@ public void testSendCallBack() throws Exception { assertEquals(message.getBytes().length, msgLength.get()); } } + + /** + * consume message from consumer1 and send acknowledgement from different consumer subscribed under same + * subscription-name + * + * @throws Exception + */ + @Test + public void testSharedConsumerAckDifferentConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(5); + conf.setSubscriptionType(SubscriptionType.Shared); + Consumer consumer1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name", + conf); + Consumer consumer2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name", + conf); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message msg = null; + Set consumerMsgSet1 = Sets.newHashSet(); + Set consumerMsgSet2 = Sets.newHashSet(); + for (int i = 0; i < 5; i++) { + msg = consumer1.receive(1, TimeUnit.SECONDS); + consumerMsgSet1.add(msg); + + } + for (int i = 0; i < 5; i++) { + msg = consumer2.receive(1, TimeUnit.SECONDS); + consumerMsgSet2.add(msg); + + } + consumerMsgSet1.stream().forEach(m -> { + try { + consumer2.acknowledge(m); + } catch (PulsarClientException e) { + fail(); + } + }); + consumerMsgSet2.stream().forEach(m -> { + try { + consumer1.acknowledge(m); + } catch (PulsarClientException e) { + fail(); + } + }); + + consumer1.redeliverUnacknowledgedMessages(); + consumer2.redeliverUnacknowledgedMessages(); + + try { + if (consumer1.receive(1, TimeUnit.SECONDS) != null || consumer2.receive(1, TimeUnit.SECONDS) != null) { + fail(); + } + } finally { + consumer1.close(); + consumer2.close(); + } + + log.info("-- Exiting {} test --", methodName); + } private void receiveAsync(Consumer consumer, int totalMessage, int currentMessage, CountDownLatch latch, final Set consumeMsg, ExecutorService executor) throws PulsarClientException {