Skip to content

Commit

Permalink
On multi-topic consumer, we shouldn't keep checking the partitioned m…
Browse files Browse the repository at this point in the history
…etadata (apache#10708)

* On multi-topic consumer, we shouldn't keep checking the partitioned metadata

* Added NON_PARTITIONED constant

* Removed assertion that is now invalid

* Fixed handling of deleted partitioned topic

* Fixed re-subscribing same topic
  • Loading branch information
merlimat authored and ciaocloud committed Oct 16, 2021
1 parent dc7a0cf commit 94c7301
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,20 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception {

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();

assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);

topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));

IntStream.range(0, topics.size()).forEach(index ->
assertEquals(consumers.get(index).getTopic(), topics.get(index)));

((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));

// 5. produce data
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -286,20 +286,20 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();

assertEquals(topics.size(), 1);
assertEquals(consumers.size(), 1);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 1);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0);

topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));

IntStream.range(0, topics.size()).forEach(index ->
assertEquals(consumers.get(index).getTopic(), topics.get(index)));

((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));

// 5. produce data
for (int i = 0; i < totalMessages / 4; i++) {
Expand Down Expand Up @@ -377,20 +377,20 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();

assertEquals(topics.size(), 7);
assertEquals(consumers.size(), 7);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 4);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);

topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));

IntStream.range(0, topics.size()).forEach(index ->
assertEquals(consumers.get(index).getTopic(), topics.get(index)));

((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));

// 5. produce data
for (int i = 0; i < totalMessages / 4; i++) {
Expand Down Expand Up @@ -508,9 +508,9 @@ public void testStartEmptyPatternConsumer() throws Exception {

// 3. verify consumer get methods, to get 5 number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 5);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 5);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 5);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 2);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);

// 4. create producer
String messagePredicate = "my-message-" + key + "-";
Expand All @@ -537,9 +537,9 @@ public void testStartEmptyPatternConsumer() throws Exception {

// 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);


// 7. produce data
Expand Down Expand Up @@ -614,9 +614,9 @@ public void testAutoSubscribePatternConsumer() throws Exception {

// 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);

// 5. produce data to topic 1,2,3; verify should receive all the message
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -649,9 +649,9 @@ public void testAutoSubscribePatternConsumer() throws Exception {
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 10);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 10);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 10);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 4);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 3);

// 8. produce data to topic3 and topic4, verify should receive all the message
for (int i = 0; i < totalMessages / 2; i++) {
Expand Down Expand Up @@ -723,9 +723,9 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {

// 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);

// 5. produce data to topic 1,2,3; verify should receive all the message
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -757,9 +757,9 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitions().size(), 2);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 1);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 1);

// 8. produce data to topic2, verify should receive all the message
for (int i = 0; i < totalMessages; i++) {
Expand Down Expand Up @@ -808,7 +808,7 @@ public void testTopicDeletion() throws Exception {

// 4. verify consumer get methods
assertSame(consumerImpl.getPattern(), pattern);
assertEquals(consumerImpl.getTopics().size(), 2);
assertEquals(consumerImpl.getPartitionedTopics().size(), 0);

producer1.send("msg-1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void testGetConsumersAndGetTopics() throws Exception {
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
assertTrue(consumer.getTopic().startsWith(MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));

List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics();
List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();

topics.forEach(topic -> log.info("topic: {}", topic));
Expand All @@ -157,7 +157,7 @@ public void testGetConsumersAndGetTopics() throws Exception {
IntStream.range(0, 6).forEach(index ->
assertEquals(consumers.get(index).getTopic(), topics.get(index)));

assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 3);
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2);

consumer.unsubscribe();
consumer.close();
Expand Down Expand Up @@ -563,12 +563,12 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception {
assertEquals(messageSet, totalMessages * 2 / 3);

// 7. use getter to verify internal topics number after un-subscribe topic3
List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics();
List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();

assertEquals(topics.size(), 3);
assertEquals(consumers.size(), 3);
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 2);
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 1);

// 8. re-subscribe topic3
CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3, true);
Expand All @@ -594,12 +594,12 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception {
assertEquals(messageSet, totalMessages);

// 11. use getter to verify internal topics number after subscribe topic3
topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics();
topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitions();
consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();

assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 3);
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2);

consumer.unsubscribe();
consumer.close();
Expand Down Expand Up @@ -1181,20 +1181,20 @@ public void testAutoDiscoverMultiTopicsPartitions() throws Exception {
.subscribe();

Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 3);
Assert.assertEquals(consumer.getConsumers().size(), 3);

admin.topics().deletePartitionedTopic(topicName, true);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 0);
Assert.assertEquals(consumer.getConsumers().size(), 0);
});

admin.topics().createPartitionedTopic(topicName, 7);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 7);
Assert.assertEquals(consumer.getConsumers().size(), 7);
});
}

Expand Down
Loading

0 comments on commit 94c7301

Please sign in to comment.