From dbb157793b90e203f01a630fb0f0abafbb337762 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Sat, 7 Oct 2023 18:01:02 +0800 Subject: [PATCH] [fix][test]Fix flaky test because the too short receive time (#21273) --- .../client/impl/TransactionEndToEndTest.java | 104 +++++++++--------- 1 file changed, 53 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 348fb04b7dd23..da8492e612f77 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -96,6 +96,8 @@ public class TransactionEndToEndTest extends TransactionTestBase { protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output"; protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test"; protected static final int NUM_PARTITIONS = 16; + private static final int waitTimeForCanReceiveMsgInSec = 5; + private static final int waitTimeForCannotReceiveMsgInSec = 5; @BeforeClass protected void setup() throws Exception { conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); @@ -173,7 +175,7 @@ private void testIndividualAckAbortFilterAckSetInPendingAckState() throws Except } // can't receive message anymore - assertNull(consumer.receive(2, TimeUnit.SECONDS)); + assertNull(consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS)); } @@ -240,14 +242,14 @@ private void testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean enabl .enableBatchIndexAcknowledgment(true) .subscribe(); - Message message = consumer.receive(3, TimeUnit.SECONDS); + Message message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); // abort txn1 txn1.abort().get(); // after txn1 aborted, consumer will receive messages txn1 contains int receiveCounter = 0; - while((message = consumer.receive(3, TimeUnit.SECONDS)) != null) { + while((message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS)) != null) { Assert.assertEquals(message.getValue().intValue(), receiveCounter); receiveCounter ++; } @@ -347,7 +349,7 @@ private void produceCommitTest(boolean enableBatch) throws Exception { } // Can't receive transaction messages before commit. - Message message = consumer.receive(300, TimeUnit.MILLISECONDS); + Message message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); txn1.commit().get(); @@ -355,13 +357,13 @@ private void produceCommitTest(boolean enableBatch) throws Exception { int receiveCnt = 0; for (int i = 0; i < txnMessageCnt; i++) { - message = consumer.receive(5, TimeUnit.SECONDS); + message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNotNull(message); receiveCnt ++; } Assert.assertEquals(txnMessageCnt, receiveCnt); - message = consumer.receive(300, TimeUnit.MILLISECONDS); + message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); // cleanup. @@ -398,13 +400,13 @@ public void produceAbortTest() throws Exception { Awaitility.await().until(consumer::isConnected); // Can't receive transaction messages before abort. - Message message = consumer.receive(300, TimeUnit.MILLISECONDS); + Message message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); txn.abort().get(); // Cant't receive transaction messages after abort. - message = consumer.receive(300, TimeUnit.MILLISECONDS); + message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); Awaitility.await().until(() -> { boolean flag = true; @@ -492,7 +494,7 @@ private void testAckWithTransactionReduceUnAckMessageCount(boolean enableBatch) Transaction txn = getTxn(); for (int i = 0; i < messageCount / 2; i++) { - Message message = consumer.receive(5, TimeUnit.SECONDS); + Message message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); consumer.acknowledgeAsync(message.getMessageId(), txn).get(); } @@ -572,14 +574,14 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, // consume and ack messages with txn for (int i = 0; i < messageCnt; i++) { - Message message = consumer.receive(5, TimeUnit.SECONDS); + Message message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNotNull(message); log.info("receive msgId: {}, count : {}", message.getMessageId(), i); consumer.acknowledgeAsync(message.getMessageId(), txn).get(); } // the messages are pending ack state and can't be received - Message message = consumer.receive(300, TimeUnit.MILLISECONDS); + Message message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); // 1) txn abort @@ -588,7 +590,7 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, // after transaction abort, the messages could be received Transaction commitTxn = getTxn(); for (int i = 0; i < messageCnt; i++) { - message = consumer.receive(2, TimeUnit.SECONDS); + message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNotNull(message); consumer.acknowledgeAsync(message.getMessageId(), commitTxn).get(); log.info("receive msgId: {}, count: {}", message.getMessageId(), i); @@ -598,7 +600,7 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, commitTxn.commit().get(); // after transaction commit, the messages can't be received - message = consumer.receive(300, TimeUnit.MILLISECONDS); + message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); Field field = TransactionImpl.class.getDeclaredField("state"); @@ -635,7 +637,7 @@ public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception { .topic(topicTwo).subscriptionName(sub).subscribe(); String content = "test"; producer.send(content); - assertEquals(consumer.receive(3, TimeUnit.SECONDS).getValue(), content); + assertEquals(consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getValue(), content); // cleanup. producer.close(); @@ -674,7 +676,7 @@ public void txnMessageAckTest() throws Exception { log.info("produce transaction messages finished"); // Can't receive transaction messages before commit. - Message message = consumer.receive(300, TimeUnit.MILLISECONDS); + Message message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); log.info("transaction messages can't be received before transaction committed"); @@ -683,7 +685,7 @@ public void txnMessageAckTest() throws Exception { int ackedMessageCount = 0; int receiveCnt = 0; for (int i = 0; i < messageCnt; i++) { - message = consumer.receive(5, TimeUnit.SECONDS); + message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNotNull(message); receiveCnt ++; if (i % 2 == 0) { @@ -693,7 +695,7 @@ public void txnMessageAckTest() throws Exception { } Assert.assertEquals(messageCnt, receiveCnt); - message = consumer.receive(300, TimeUnit.MILLISECONDS); + message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); String checkTopic = TopicName.get(topic).getPartition(0).toString(); @@ -705,14 +707,14 @@ public void txnMessageAckTest() throws Exception { receiveCnt = 0; for (int i = 0; i < messageCnt - ackedMessageCount; i++) { - message = consumer.receive(2, TimeUnit.SECONDS); + message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNotNull(message); consumer.acknowledge(message); receiveCnt ++; } Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt); - message = consumer.receive(300, TimeUnit.MILLISECONDS); + message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); topic = TopicName.get(topic).getPartition(0).toString(); @@ -803,7 +805,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri Message message = null; Thread.sleep(1000L); for (int i = 0; i < messageCnt; i++) { - message = consumer.receive(1, TimeUnit.SECONDS); + message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNotNull(message); if (i % 3 == 0) { consumer.acknowledgeCumulativeAsync(message.getMessageId(), abortTxn).get(); @@ -828,14 +830,14 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri } // the messages are pending ack state and can't be received - message = consumer.receive(300, TimeUnit.MILLISECONDS); + message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); abortTxn.abort().get(); consumer.redeliverUnacknowledgedMessages(); Transaction commitTxn = getTxn(); for (int i = 0; i < messageCnt; i++) { - message = consumer.receive(1, TimeUnit.SECONDS); + message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNotNull(message); if (i % 3 == 0) { consumer.acknowledgeCumulativeAsync(message.getMessageId(), commitTxn).get(); @@ -857,7 +859,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException); } - message = consumer.receive(300, TimeUnit.MILLISECONDS); + message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); } @@ -919,7 +921,7 @@ public void txnMetadataHandlerRecoverTest() throws Exception { Awaitility.await().until(consumer::isConnected); for (int i = 0; i < txnCnt * messageCnt; i++) { - Message message = consumer.receive(5, TimeUnit.SECONDS); + Message message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNotNull(message); } @@ -960,7 +962,7 @@ public void produceTxnMessageOrderTest() throws Exception { txn.commit().get(); for (int i = 0; i < 1000; i++) { - Message message = consumer.receive(5, TimeUnit.SECONDS); + Message message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNotNull(message); Assert.assertEquals(Integer.valueOf(new String(message.getData())), Integer.valueOf(i)); } @@ -1016,7 +1018,7 @@ public void produceAndConsumeCloseStateTxnTest() throws Exception { } - Message message = consumer.receive(5, TimeUnit.SECONDS); + Message message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get(); consumeTxn.commit().get(); try { @@ -1062,7 +1064,7 @@ public void produceAndConsumeCloseStateTxnTest() throws Exception { constructor.setAccessible(true); TransactionImpl timeoutTxnSkipClientTimeout = constructor.newInstance(pulsarClient, 5, - timeoutTxn.getTxnID().getLeastSigBits(), timeoutTxn.getTxnID().getMostSigBits()); + timeoutTxn.getTxnID().getLeastSigBits(), timeoutTxn.getTxnID().getMostSigBits()); try { timeoutTxnSkipClientTimeout.commit().get(); @@ -1092,7 +1094,7 @@ public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{ .newTransaction(new TransactionCoordinatorID(0), 1, null).get(); Awaitility.await().until(() -> { try { - getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get(); + getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get(); return false; } catch (Exception e) { return true; @@ -1125,17 +1127,17 @@ public void transactionTimeoutTest() throws Exception { Transaction consumeTimeoutTxn = pulsarClient .newTransaction() - .withTransactionTimeout(3, TimeUnit.SECONDS) + .withTransactionTimeout(7, TimeUnit.SECONDS) .build().get(); - Message message = consumer.receive(5, TimeUnit.SECONDS); + Message message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); consumer.acknowledgeAsync(message.getMessageId(), consumeTimeoutTxn).get(); - Message reReceiveMessage = consumer.receive(300, TimeUnit.MILLISECONDS); + Message reReceiveMessage = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); assertNull(reReceiveMessage); - reReceiveMessage = consumer.receive(5, TimeUnit.SECONDS); + reReceiveMessage = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); assertEquals(reReceiveMessage.getValue(), message.getValue()); @@ -1182,9 +1184,9 @@ public void txnTransactionRedeliverNullDispatcher(CommandAck.AckType ackType) th } Transaction txn = getTxn(); if (ackType == CommandAck.AckType.Individual) { - consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), txn); + consumer.acknowledgeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getMessageId(), txn); } else { - consumer.acknowledgeCumulativeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), txn); + consumer.acknowledgeCumulativeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getMessageId(), txn); } topic = TopicName.get(topic).toString(); boolean exist = false; @@ -1307,7 +1309,7 @@ public void testTxnTimeOutInClient() throws Exception{ .InvalidTxnStatusException); } try { - Message message = consumer.receive(5, TimeUnit.SECONDS); + Message message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); Assert.fail(); } catch (Exception e) { @@ -1349,7 +1351,7 @@ public void testCumulativeAckRedeliverMessages() throws Exception { Message message = null; for (int i = 0; i < transactionCumulativeAck; i++) { - message = consumer.receive(5, TimeUnit.SECONDS); + message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); } // receive transaction in order @@ -1372,7 +1374,7 @@ public void testCumulativeAckRedeliverMessages() throws Exception { // receive the rest of the message for (int i = 0; i < count; i++) { - message = consumer.receive(5, TimeUnit.SECONDS); + message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); } Transaction commitTransaction = getTxn(); @@ -1385,7 +1387,7 @@ public void testCumulativeAckRedeliverMessages() throws Exception { commitTransaction.commit().get(); // then redeliver will not receive any message - message = consumer.receive(300, TimeUnit.MILLISECONDS); + message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); assertNull(message); // cleanup. @@ -1460,7 +1462,7 @@ public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exce // receive the batch messages add to a list for (int i = 0; i < 5; i++) { - messageIds.add(consumer.receive(5, TimeUnit.SECONDS).getMessageId()); + messageIds.add(consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getMessageId()); } MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0); @@ -1520,7 +1522,7 @@ public void testSendTxnAckMessageToDLQ() throws Exception { .build().get(); // consumer receive the message the first time, redeliverCount = 0 - consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), transaction).get(); + consumer.acknowledgeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getMessageId(), transaction).get(); transaction.abort().get(); @@ -1528,17 +1530,17 @@ public void testSendTxnAckMessageToDLQ() throws Exception { .build().get(); // consumer receive the message the second time, redeliverCount = 1, also can be received - consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), transaction).get(); + consumer.acknowledgeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getMessageId(), transaction).get(); transaction.abort().get(); // consumer receive the message the third time, redeliverCount = 2, // the message will be sent to DLQ, can't receive - assertNull(consumer.receive(300, TimeUnit.MILLISECONDS)); + assertNull(consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS)); assertEquals(((ConsumerImpl) consumer).getAvailablePermits(), 3); - assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue())); + assertEquals(value, new String(deadLetterConsumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getValue())); // cleanup. consumer.close(); @@ -1584,7 +1586,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception { Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES) .build().get(); - Message message = consumer.receive(5, TimeUnit.SECONDS); + Message message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); assertEquals(value1, new String(message.getValue())); // consumer receive the batch message one the first time, redeliverCount = 0 consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); @@ -1594,7 +1596,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception { // consumer will receive the batch message two and then receive // the message one and message two again, redeliverCount = 1 for (int i = 0; i < 3; i ++) { - message = consumer.receive(5, TimeUnit.SECONDS); + message = consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); } transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES) @@ -1608,12 +1610,12 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception { // consumer receive the batch message the third time, redeliverCount = 2, // the message will be sent to DLQ, can't receive - assertNull(consumer.receive(300, TimeUnit.MILLISECONDS)); + assertNull(consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS)); assertEquals(((ConsumerImpl) consumer).getAvailablePermits(), 6); - assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue())); - assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue())); + assertEquals(value1, new String(deadLetterConsumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getValue())); + assertEquals(value2, new String(deadLetterConsumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getValue())); // cleanup. consumer.close(); @@ -1663,17 +1665,17 @@ public void testDelayedTransactionMessages() throws Exception { // Failover consumer will receive the messages immediately while // the shared consumer will get them after the delay - Message msg = sharedConsumer.receive(300, TimeUnit.MILLISECONDS); + Message msg = sharedConsumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); assertNull(msg); for (int i = 0; i < 10; i++) { - msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS); + msg = failoverConsumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); assertEquals(msg.getValue(), "msg-" + i); } Set receivedMsgs = new TreeSet<>(); for (int i = 0; i < 10; i++) { - msg = sharedConsumer.receive(10, TimeUnit.SECONDS); + msg = sharedConsumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS); receivedMsgs.add(msg.getValue()); }