diff --git a/.github/workflows/pr-tests.yml b/.github/workflows/pr-tests.yml index 1ad98e03e0623..ac6f7082e2e45 100644 --- a/.github/workflows/pr-tests.yml +++ b/.github/workflows/pr-tests.yml @@ -25,7 +25,7 @@ jobs: run: mvn clean install -DskipTests - name: tests module - run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest' -pl tests + run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test' -pl tests - name: package surefire artifacts if: failure() diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 62bfa7f54312a..127a7587b4ee0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -23,6 +23,7 @@ import io.netty.channel.socket.SocketChannel; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; +import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; @@ -169,9 +170,9 @@ public void unLoad(NamespaceBundle bundle) { } groupCoordinator.handleGroupEmigration(name.getPartitionIndex()); } - // remove cache when unload - KafkaTopicManager.removeTopicManagerCache(name.toString()); + // deReference topic when unload KopBrokerLookupManager.removeTopicManagerCache(name.toString()); + KafkaTopicManager.deReference(name.toString()); } } else { log.error("Failed to get owned topic list for " @@ -349,6 +350,10 @@ public void close() { } KafkaTopicManager.LOOKUP_CACHE.clear(); KopBrokerLookupManager.clear(); + KafkaTopicManager.getConsumerTopicManagers().clear(); + KafkaTopicManager.getReferences().clear(); + KafkaTopicManager.getTopics().clear(); + OffsetAcker.CONSUMERS.clear(); statsProvider.stop(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index ee41cc7a013a5..cec1ab46c1ab2 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -55,12 +55,17 @@ public class KafkaTopicManager { // consumerTopicManagers for consumers cache. @Getter - private final ConcurrentHashMap> consumerTopicManagers; + private static final ConcurrentHashMap> + consumerTopicManagers = new ConcurrentHashMap<>(); // cache for topics: , for removing producer - private final ConcurrentHashMap> topics; + @Getter + private static final ConcurrentHashMap> + topics = new ConcurrentHashMap<>(); // cache for references in PersistentTopic: - private final ConcurrentHashMap references; + @Getter + private static final ConcurrentHashMap + references = new ConcurrentHashMap<>(); private InternalServerCnx internalServerCnx; @@ -90,10 +95,6 @@ public class KafkaTopicManager { this.brokerService = pulsarService.getBrokerService(); this.internalServerCnx = new InternalServerCnx(requestHandler); - consumerTopicManagers = new ConcurrentHashMap<>(); - topics = new ConcurrentHashMap<>(); - references = new ConcurrentHashMap<>(); - this.rwLock = new ReentrantReadWriteLock(); this.closed = false; @@ -357,27 +358,27 @@ public Producer getReferenceProducer(String topicName) { return references.get(topicName); } - public void deReference(String topicName) { + public static void deReference(String topicName) { try { removeTopicManagerCache(topicName); if (consumerTopicManagers.containsKey(topicName)) { - CompletableFuture manager = consumerTopicManagers.get(topicName); - manager.get().close(); - consumerTopicManagers.remove(topicName); + consumerTopicManagers.remove(topicName).get().close(); } if (!topics.containsKey(topicName)) { return; } PersistentTopic persistentTopic = topics.get(topicName).get(); - if (persistentTopic != null) { - persistentTopic.removeProducer(references.get(topicName)); + Producer producer = references.get(topicName); + if (persistentTopic != null && producer != null) { + persistentTopic.removeProducer(producer); } topics.remove(topicName); + + OffsetAcker.removeOffsetAcker(topicName); } catch (Exception e) { - log.error("[{}] Failed to close reference for individual topic {}. exception:", - requestHandler.ctx.channel(), topicName, e); + log.error("Failed to close reference for individual topic {}. exception:", topicName, e); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index d161278f7a820..44035a596bb5d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -123,7 +123,7 @@ public CompletableFuture handleFetch( tcm = pair.getValue().get(); if (tcm == null) { // remove null future cache from consumerTopicManagers - requestHandler.getTopicManager().getConsumerTopicManagers() + KafkaTopicManager.getConsumerTopicManagers() .remove(KopTopic.toString(pair.getKey())); throw new NullPointerException("topic not owned, and return null TCM in fetch."); } @@ -239,7 +239,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, "cursor.readEntry fail. deleteCursor"); } else { // remove null future cache from consumerTopicManagers - requestHandler.getTopicManager().getConsumerTopicManagers() + KafkaTopicManager.getConsumerTopicManagers() .remove(KopTopic.toString(kafkaTopic)); log.warn("Cursor deleted while TCM close."); } @@ -329,8 +329,8 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, cm.add(pair.getRight(), pair); } else { // remove null future cache from consumerTopicManagers - requestHandler.getTopicManager().getConsumerTopicManagers() - .remove(KopTopic.toString(kafkaPartition)); + KafkaTopicManager.getConsumerTopicManagers() + .remove(KopTopic.toString(kafkaPartition)); log.warn("Cursor deleted while TCM close, failed to add cursor back to TCM."); } }); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index 6feb11bc5e9d2..933d434710009 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -45,7 +45,7 @@ @Slf4j public class OffsetAcker implements Closeable { - private static final Map>> EMPTY_CONSUMERS = new HashMap<>(); + private static final Map>> EMPTY_CONSUMERS = new HashMap<>(); private final ConsumerBuilder consumerBuilder; private final BrokerService brokerService; @@ -57,8 +57,8 @@ public class OffsetAcker implements Closeable { // value is the created future of consumer. // The consumer, whose subscription is the group id, is used for acknowledging message id cumulatively. // This behavior is equivalent to committing offsets in Kafka. - private final Map>>> - consumers = new ConcurrentHashMap<>(); + public static final Map>>> + CONSUMERS = new ConcurrentHashMap<>(); public OffsetAcker(PulsarClientImpl pulsarClient) { this.consumerBuilder = pulsarClient.newConsumer() @@ -138,8 +138,8 @@ public void ackOffsets(String groupId, Map of public void close(Set groupIds) { for (String groupId : groupIds) { - final Map>> - consumersToRemove = consumers.remove(groupId); + final Map>> + consumersToRemove = CONSUMERS.remove(groupId); if (consumersToRemove == null) { continue; } @@ -151,9 +151,7 @@ public void close(Set groupIds) { } final Consumer consumer = consumerFuture.getNow(null); if (consumer != null) { - if (log.isDebugEnabled()) { - log.debug("Try to close consumer of [group={}] [topic={}]", groupId, topicPartition.toString()); - } + log.debug("Try to close consumer of [group={}] [topic={}]", groupId, topicPartition); consumer.closeAsync(); } }); @@ -162,35 +160,41 @@ public void close(Set groupIds) { @Override public void close() { - log.info("close OffsetAcker with {} groupIds", consumers.size()); - close(consumers.keySet()); + log.info("close OffsetAcker with {} groupIds", CONSUMERS.size()); + close(CONSUMERS.keySet()); } @NonNull public CompletableFuture> getOrCreateConsumer(String groupId, TopicPartition topicPartition) { - Map>> group = consumers + Map>> group = CONSUMERS .computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>()); + KopTopic kopTopic = new KopTopic(topicPartition.topic()); + String topicName = kopTopic.getPartitionName((topicPartition.partition())); return group.computeIfAbsent( - topicPartition, - partition -> createConsumer(groupId, partition)); + topicName, + name -> createConsumer(groupId, name)); } @NonNull - private CompletableFuture> createConsumer(String groupId, TopicPartition topicPartition) { - KopTopic kopTopic = new KopTopic(topicPartition.topic()); + private CompletableFuture> createConsumer(String groupId, String topicName) { return consumerBuilder.clone() - .topic(kopTopic.getPartitionName(topicPartition.partition())) + .topic(topicName) .subscriptionName(groupId) .subscribeAsync(); } public CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { - return consumers.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicPartition); + KopTopic kopTopic = new KopTopic(topicPartition.topic()); + String topicName = kopTopic.getPartitionName((topicPartition.partition())); + return CONSUMERS.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicName); } public void removeConsumer(String groupId, TopicPartition topicPartition) { + KopTopic kopTopic = new KopTopic(topicPartition.topic()); + String topicName = kopTopic.getPartitionName((topicPartition.partition())); + final CompletableFuture> consumerFuture = - consumers.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicPartition); + CONSUMERS.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicName); if (consumerFuture != null) { consumerFuture.whenComplete((consumer, e) -> { if (e == null) { @@ -202,4 +206,18 @@ public void removeConsumer(String groupId, TopicPartition topicPartition) { }); } } + + public static void removeOffsetAcker(String topicName) { + CONSUMERS.forEach((groupId, group) -> { + CompletableFuture > consumerCompletableFuture = group.remove(topicName); + if (consumerCompletableFuture != null) { + consumerCompletableFuture.thenApply(Consumer::closeAsync).whenCompleteAsync((ignore, t) -> { + if (t != null) { + log.error("Failed to close offsetAcker consumer when remove partition {}.", + topicName); + } + }); + } + }); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index dac5373a23492..2f4ead35de842 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -15,8 +15,8 @@ import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME; import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; +import static org.junit.Assert.assertTrue; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; -import lombok.Cleanup; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; @@ -195,7 +194,7 @@ public void setup() throws Exception { } - @AfterMethod + @AfterMethod(timeOut = 30000) @Override public void cleanup() throws Exception { log.info("--- Shutting down ---"); @@ -298,19 +297,14 @@ public void testMutiBrokerAndCoordinator() throws Exception { // 1. produce message with Kafka producer. int totalMsgs = 50; String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_" + partitionNumber + "_"; - @Cleanup KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true); kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); // 2. create 4 kafka consumer from different consumer groups. // consume data and commit offsets for 4 consumer group. - @Cleanup KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1"); - @Cleanup KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2"); - @Cleanup KConsumer kConsumer3 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-3"); - @Cleanup KConsumer kConsumer4 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-4"); List topicPartitions = IntStream.range(0, partitionNumber) @@ -394,6 +388,12 @@ public void testMutiBrokerAndCoordinator() throws Exception { assertTrue(records.isEmpty()); records = kConsumer4.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); + + kProducer.close(); + kConsumer1.close(); + kConsumer2.close(); + kConsumer3.close(); + kConsumer4.close(); } // Unit test for unload / reload user topic bundle, verify it works well. @@ -421,15 +421,12 @@ public void testMutiBrokerUnloadReload() throws Exception { // 2. produce consume message with Kafka producer. int totalMsgs = 50; String messageStrPrefix = "Message_" + kafkaTopicName + "_"; - @Cleanup KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true); kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); List topicPartitions = IntStream.range(0, partitionNumber) .mapToObj(i -> new TopicPartition(kafkaTopicName, i)).collect(Collectors.toList()); - @Cleanup KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1"); - @Cleanup KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2"); log.info("Partition size: {}, will consume and commitOffset for 2 consumers", topicPartitions.size()); @@ -445,6 +442,10 @@ public void testMutiBrokerUnloadReload() throws Exception { kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions); kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions); + + kProducer.close(); + kConsumer1.close(); + kConsumer2.close(); } @Test(timeOut = 30000) @@ -470,15 +471,12 @@ public void testOneBrokerShutdown() throws Exception { // 2. produce consume message with Kafka producer. int totalMsgs = 50; String messageStrPrefix = "Message_" + kafkaTopicName + "_"; - @Cleanup KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true); kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); List topicPartitions = IntStream.range(0, partitionNumber) .mapToObj(i -> new TopicPartition(kafkaTopicName, i)).collect(Collectors.toList()); - @Cleanup KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1"); - @Cleanup KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2"); log.info("Partition size: {}, will consume and commitOffset for 2 consumers", topicPartitions.size()); @@ -494,5 +492,9 @@ public void testOneBrokerShutdown() throws Exception { kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions); kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions); + + kProducer.close(); + kConsumer1.close(); + kConsumer2.close(); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index 66ef4d20a62f4..7054f40b3d881 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -338,6 +338,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception { doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory(); doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore(); + doReturn(mockZooKeeper).when(pulsar).getZkClient(); Supplier namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); @@ -463,7 +464,7 @@ public KProducer(String topic, Boolean isAsync, String host, props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySer); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSer); - props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000); if (retry) { props.put(ProducerConfig.RETRIES_CONFIG, 3);