Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Nov 16, 2022
1 parent f1a7cf2 commit 5af3256
Showing 1 changed file with 61 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,6 +38,8 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -439,4 +443,61 @@ public void run() {
Assert.assertEquals(count, 9);
Assert.assertEquals(0, datas.size());
}

/**
* see https://github.com/apache/pulsar/pull/18491
*/
@Test
public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("my-topic");
admin.topics().createPartitionedTopic(topic, 2);

final int receiverQueueSize = 10;

@Cleanup
MultiTopicsConsumerImpl<Integer> consumer =
(MultiTopicsConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.receiverQueueSize(receiverQueueSize)
.subscribe();

@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();

for (int i = 0; i < receiverQueueSize; i++){
producer.send(i);
}

Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize);

consumer.redeliverUnacknowledgedMessages();
waitMultiTopicConsumerRedeliverFinish(consumer);

Set<Integer> receivedMsgs = new HashSet<>();
for (;;){
Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null){
break;
}
receivedMsgs.add(msg.getValue());
}
Assert.assertEquals(receivedMsgs.size(), 10);
}

/**
* If the task after "redeliver" finish, means task-redeliver finish.
*/
private void waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consumer){
ExecutorService internalPinnedExecutor =
WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor");
CompletableFuture<Void> taskAfterRedeliver = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
taskAfterRedeliver.complete(null);
});
taskAfterRedeliver.join();
}
}

0 comments on commit 5af3256

Please sign in to comment.