Skip to content

Commit

Permalink
[improve][test] Optimize TransactionEndToEndTest (#18522)
Browse files Browse the repository at this point in the history
## Motivation
1. fix flaky test #18466 caused by txn async send method
2. decrease run time by optimizing receive method 
## Modification
1. fix flaky test
   * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` 
This also can be resolved by #17836 and #18486 later.
2. decrease run time by optimizing receive method 
    * modify
 `    Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
                   Assert.assertNull(message);` to
                   `  Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
                            Assert.assertNull(message);`
   * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);`
   * keep other `consumer.receive(x, y)` no change.
  • Loading branch information
liangyepianzhou authored Nov 24, 2022
1 parent 13d3e1d commit f3ac2e6
Showing 1 changed file with 36 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,32 +146,29 @@ private void produceCommitTest(boolean enableBatch) throws Exception {
int messageCnt = 1000;
for (int i = 0; i < messageCnt; i++) {
if (i % 5 == 0) {
producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();
} else {
producer.newMessage(txn2).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
producer.newMessage(txn2).value(("Hello Txn - " + i).getBytes(UTF_8)).send();
}
txnMessageCnt++;
}

// Can't receive transaction messages before commit.
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);

txn1.commit().get();
txn2.commit().get();

int receiveCnt = 0;
for (int i = 0; i < txnMessageCnt; i++) {
message = consumer.receive();
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
receiveCnt ++;
}
Assert.assertEquals(txnMessageCnt, receiveCnt);

message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);

message = consumer.receive(5, TimeUnit.SECONDS);
message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);

// cleanup.
Expand Down Expand Up @@ -208,13 +205,13 @@ public void produceAbortTest() throws Exception {
Awaitility.await().until(consumer::isConnected);

// Can't receive transaction messages before abort.
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);

txn.abort().get();

// Cant't receive transaction messages after abort.
message = consumer.receive(2, TimeUnit.SECONDS);
message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
Awaitility.await().until(() -> {
boolean flag = true;
Expand Down Expand Up @@ -302,7 +299,7 @@ private void testAckWithTransactionReduceUnAckMessageCount(boolean enableBatch)
Transaction txn = getTxn();

for (int i = 0; i < messageCount / 2; i++) {
Message<byte[]> message = consumer.receive();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), txn).get();
}

Expand Down Expand Up @@ -382,14 +379,14 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize,

// consume and ack messages with txn
for (int i = 0; i < messageCnt; i++) {
Message<byte[]> message = consumer.receive();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
log.info("receive msgId: {}, count : {}", message.getMessageId(), i);
consumer.acknowledgeAsync(message.getMessageId(), txn).get();
}

// the messages are pending ack state and can't be received
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);

// 1) txn abort
Expand All @@ -408,7 +405,7 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize,
commitTxn.commit().get();

// after transaction commit, the messages can't be received
message = consumer.receive(2, TimeUnit.SECONDS);
message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);

Field field = TransactionImpl.class.getDeclaredField("state");
Expand Down Expand Up @@ -445,7 +442,7 @@ public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
.topic(topicTwo).subscriptionName(sub).subscribe();
String content = "test";
producer.send(content);
assertEquals(consumer.receive().getValue(), content);
assertEquals(consumer.receive(3, TimeUnit.SECONDS).getValue(), content);

// cleanup.
producer.close();
Expand Down Expand Up @@ -484,7 +481,7 @@ public void txnMessageAckTest() throws Exception {
log.info("produce transaction messages finished");

// Can't receive transaction messages before commit.
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
log.info("transaction messages can't be received before transaction committed");

Expand All @@ -493,7 +490,7 @@ public void txnMessageAckTest() throws Exception {
int ackedMessageCount = 0;
int receiveCnt = 0;
for (int i = 0; i < messageCnt; i++) {
message = consumer.receive();
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
receiveCnt ++;
if (i % 2 == 0) {
Expand All @@ -503,7 +500,7 @@ public void txnMessageAckTest() throws Exception {
}
Assert.assertEquals(messageCnt, receiveCnt);

message = consumer.receive(2, TimeUnit.SECONDS);
message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);

String checkTopic = TopicName.get(topic).getPartition(0).toString();
Expand All @@ -522,7 +519,7 @@ public void txnMessageAckTest() throws Exception {
}
Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt);

message = consumer.receive(2, TimeUnit.SECONDS);
message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);

topic = TopicName.get(topic).getPartition(0).toString();
Expand Down Expand Up @@ -638,7 +635,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri
}

// the messages are pending ack state and can't be received
message = consumer.receive(2, TimeUnit.SECONDS);
message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);

abortTxn.abort().get();
Expand Down Expand Up @@ -667,7 +664,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri
Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
}

message = consumer.receive(1, TimeUnit.SECONDS);
message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
}

Expand Down Expand Up @@ -729,7 +726,7 @@ public void txnMetadataHandlerRecoverTest() throws Exception {
Awaitility.await().until(consumer::isConnected);

for (int i = 0; i < txnCnt * messageCnt; i++) {
Message<byte[]> message = consumer.receive();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
}

Expand Down Expand Up @@ -938,14 +935,14 @@ public void transactionTimeoutTest() throws Exception {
.withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();

Message<String> message = consumer.receive();
Message<String> message = consumer.receive(5, TimeUnit.SECONDS);

consumer.acknowledgeAsync(message.getMessageId(), consumeTimeoutTxn).get();

Message<String> reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS);
Message<String> reReceiveMessage = consumer.receive(300, TimeUnit.MILLISECONDS);
assertNull(reReceiveMessage);

reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS);
reReceiveMessage = consumer.receive(5, TimeUnit.SECONDS);

assertEquals(reReceiveMessage.getValue(), message.getValue());

Expand Down Expand Up @@ -992,9 +989,9 @@ public void txnTransactionRedeliverNullDispatcher(CommandAck.AckType ackType) th
}
Transaction txn = getTxn();
if (ackType == CommandAck.AckType.Individual) {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn);
consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), txn);
} else {
consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), txn);
consumer.acknowledgeCumulativeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), txn);
}
topic = TopicName.get(topic).toString();
boolean exist = false;
Expand Down Expand Up @@ -1117,7 +1114,7 @@ public void testTxnTimeOutInClient() throws Exception{
.InvalidTxnStatusException);
}
try {
Message<String> message = consumer.receive();
Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
Assert.fail();
} catch (Exception e) {
Expand Down Expand Up @@ -1159,7 +1156,7 @@ public void testCumulativeAckRedeliverMessages() throws Exception {

Message<byte[]> message = null;
for (int i = 0; i < transactionCumulativeAck; i++) {
message = consumer.receive();
message = consumer.receive(5, TimeUnit.SECONDS);
}

// receive transaction in order
Expand All @@ -1182,7 +1179,7 @@ public void testCumulativeAckRedeliverMessages() throws Exception {

// receive the rest of the message
for (int i = 0; i < count; i++) {
message = consumer.receive();
message = consumer.receive(5, TimeUnit.SECONDS);
}

Transaction commitTransaction = getTxn();
Expand All @@ -1195,7 +1192,7 @@ public void testCumulativeAckRedeliverMessages() throws Exception {
commitTransaction.commit().get();

// then redeliver will not receive any message
message = consumer.receive(3, TimeUnit.SECONDS);
message = consumer.receive(300, TimeUnit.MILLISECONDS);
assertNull(message);

// cleanup.
Expand Down Expand Up @@ -1270,7 +1267,7 @@ public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exce

// receive the batch messages add to a list
for (int i = 0; i < 5; i++) {
messageIds.add(consumer.receive().getMessageId());
messageIds.add(consumer.receive(5, TimeUnit.SECONDS).getMessageId());
}

MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
Expand Down Expand Up @@ -1330,21 +1327,21 @@ public void testSendTxnAckMessageToDLQ() throws Exception {
.build().get();

// consumer receive the message the first time, redeliverCount = 0
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), transaction).get();

transaction.abort().get();

transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get();

// consumer receive the message the second time, redeliverCount = 1, also can be received
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), transaction).get();

transaction.abort().get();

// consumer receive the message the third time, redeliverCount = 2,
// the message will be sent to DLQ, can't receive
assertNull(consumer.receive(3, TimeUnit.SECONDS));
assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));

assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);

Expand Down Expand Up @@ -1394,7 +1391,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception {
Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
.build().get();

Message<byte[]> message = consumer.receive();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
assertEquals(value1, new String(message.getValue()));
// consumer receive the batch message one the first time, redeliverCount = 0
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
Expand All @@ -1404,7 +1401,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception {
// consumer will receive the batch message two and then receive
// the message one and message two again, redeliverCount = 1
for (int i = 0; i < 3; i ++) {
message = consumer.receive();
message = consumer.receive(5, TimeUnit.SECONDS);
}

transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
Expand All @@ -1418,7 +1415,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception {

// consumer receive the batch message the third time, redeliverCount = 2,
// the message will be sent to DLQ, can't receive
assertNull(consumer.receive(3, TimeUnit.SECONDS));
assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));

assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 6);

Expand Down Expand Up @@ -1473,7 +1470,7 @@ public void testDelayedTransactionMessages() throws Exception {

// Failover consumer will receive the messages immediately while
// the shared consumer will get them after the delay
Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
Message<String> msg = sharedConsumer.receive(300, TimeUnit.MILLISECONDS);
assertNull(msg);

for (int i = 0; i < 10; i++) {
Expand Down

0 comments on commit f3ac2e6

Please sign in to comment.