diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java index ad9ef4775c5b2..2012cd2b3170d 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java @@ -81,7 +81,7 @@ public void testSharedAckedNormalTopic() throws Exception { while (message != null) { String data = new String(message.getData()); log.info("Consumer received : " + data); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); @@ -102,7 +102,7 @@ public void testSharedAckedNormalTopic() throws Exception { String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); @@ -124,7 +124,7 @@ public void testSharedAckedNormalTopic() throws Exception { while (message != null) { String data = new String(message.getData()); log.info("Consumer received : " + data); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); @@ -140,7 +140,7 @@ public void testSharedAckedNormalTopic() throws Exception { String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } assertEquals(redelivered, 5); size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); @@ -178,7 +178,7 @@ public void testExclusiveAckedNormalTopic() throws Exception { while (message != null) { String data = new String(message.getData()); log.info("Consumer received : " + data); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); @@ -199,7 +199,7 @@ public void testExclusiveAckedNormalTopic() throws Exception { String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); @@ -221,7 +221,7 @@ public void testExclusiveAckedNormalTopic() throws Exception { while (message != null) { String data = new String(message.getData()); log.info("Consumer received : " + data); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); @@ -237,7 +237,7 @@ public void testExclusiveAckedNormalTopic() throws Exception { String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } assertEquals(redelivered, 10); size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); @@ -275,7 +275,7 @@ public void testFailoverAckedNormalTopic() throws Exception { while (message != null) { String data = new String(message.getData()); log.info("Consumer received : " + data); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); @@ -296,7 +296,7 @@ public void testFailoverAckedNormalTopic() throws Exception { String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); @@ -318,7 +318,7 @@ public void testFailoverAckedNormalTopic() throws Exception { while (message != null) { String data = new String(message.getData()); log.info("Consumer received : " + data); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); @@ -334,7 +334,7 @@ public void testFailoverAckedNormalTopic() throws Exception { String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } assertEquals(redelivered, 10); size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); @@ -381,7 +381,7 @@ public void testSharedAckedPartitionedTopic() throws Exception { while (message != null) { String data = new String(message.getData()); log.info("Consumer received : " + data); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } long size = getUnackedMessagesCountInPartitionedConsumer(consumer); @@ -403,7 +403,7 @@ public void testSharedAckedPartitionedTopic() throws Exception { String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } size = getUnackedMessagesCountInPartitionedConsumer(consumer); log.info(key + " Unacked Message Tracker size is " + size); @@ -425,7 +425,7 @@ public void testSharedAckedPartitionedTopic() throws Exception { while (message != null) { String data = new String(message.getData()); log.info("Consumer received : " + data); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } size = getUnackedMessagesCountInPartitionedConsumer(consumer); log.info(key + " Unacked Message Tracker size is " + size); @@ -441,7 +441,7 @@ public void testSharedAckedPartitionedTopic() throws Exception { String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); - message = consumer.receive(10, TimeUnit.MILLISECONDS); + message = consumer.receive(100, TimeUnit.MILLISECONDS); } assertEquals(redelivered, 5); size = getUnackedMessagesCountInPartitionedConsumer(consumer);