Skip to content

Commit

Permalink
fix issue apache#252 bundle unload bug (apache#404)
Browse files Browse the repository at this point in the history
Fix apache#252 

### Motivation
When bundle unload triggered, the `consumerTopicManagers` cache won't be evicted and it will return the old `KafkaTopicConsumerManager` instance in the next fetch request handle. However, after bundle unload, the producer/consumer/managedLedger of topics in related bundle will be closed. If we use old  `KafkaTopicConsumerManager` instance to read messages, it will return `managedLedger has been closed` exception.

### Changes
1. Change `consumerTopicManagers`, `topics`, `references` map to static attribute ConcurrentHashMap.
2. Evict related cache information for topics whose bundle trigged unload.
3. Turn on `DistributedClusterTest `.
  • Loading branch information
hangc0276 authored Apr 22, 2021
1 parent 5c91e8f commit 31e9295
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
run: mvn clean install -DskipTests

- name: tests module
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest' -pl tests
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test' -pl tests

- name: package surefire artifacts
if: failure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.socket.SocketChannel;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
Expand Down Expand Up @@ -169,9 +170,9 @@ public void unLoad(NamespaceBundle bundle) {
}
groupCoordinator.handleGroupEmigration(name.getPartitionIndex());
}
// remove cache when unload
KafkaTopicManager.removeTopicManagerCache(name.toString());
// deReference topic when unload
KopBrokerLookupManager.removeTopicManagerCache(name.toString());
KafkaTopicManager.deReference(name.toString());
}
} else {
log.error("Failed to get owned topic list for "
Expand Down Expand Up @@ -349,6 +350,10 @@ public void close() {
}
KafkaTopicManager.LOOKUP_CACHE.clear();
KopBrokerLookupManager.clear();
KafkaTopicManager.getConsumerTopicManagers().clear();
KafkaTopicManager.getReferences().clear();
KafkaTopicManager.getTopics().clear();
OffsetAcker.CONSUMERS.clear();
statsProvider.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ public class KafkaTopicManager {

// consumerTopicManagers for consumers cache.
@Getter
private final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> consumerTopicManagers;
private static final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>>
consumerTopicManagers = new ConcurrentHashMap<>();

// cache for topics: <topicName, persistentTopic>, for removing producer
private final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>> topics;
@Getter
private static final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>>
topics = new ConcurrentHashMap<>();
// cache for references in PersistentTopic: <topicName, producer>
private final ConcurrentHashMap<String, Producer> references;
@Getter
private static final ConcurrentHashMap<String, Producer>
references = new ConcurrentHashMap<>();

private InternalServerCnx internalServerCnx;

Expand Down Expand Up @@ -90,10 +95,6 @@ public class KafkaTopicManager {
this.brokerService = pulsarService.getBrokerService();
this.internalServerCnx = new InternalServerCnx(requestHandler);

consumerTopicManagers = new ConcurrentHashMap<>();
topics = new ConcurrentHashMap<>();
references = new ConcurrentHashMap<>();

this.rwLock = new ReentrantReadWriteLock();
this.closed = false;

Expand Down Expand Up @@ -357,27 +358,27 @@ public Producer getReferenceProducer(String topicName) {
return references.get(topicName);
}

public void deReference(String topicName) {
public static void deReference(String topicName) {
try {
removeTopicManagerCache(topicName);

if (consumerTopicManagers.containsKey(topicName)) {
CompletableFuture<KafkaTopicConsumerManager> manager = consumerTopicManagers.get(topicName);
manager.get().close();
consumerTopicManagers.remove(topicName);
consumerTopicManagers.remove(topicName).get().close();
}

if (!topics.containsKey(topicName)) {
return;
}
PersistentTopic persistentTopic = topics.get(topicName).get();
if (persistentTopic != null) {
persistentTopic.removeProducer(references.get(topicName));
Producer producer = references.get(topicName);
if (persistentTopic != null && producer != null) {
persistentTopic.removeProducer(producer);
}
topics.remove(topicName);

OffsetAcker.removeOffsetAcker(topicName);
} catch (Exception e) {
log.error("[{}] Failed to close reference for individual topic {}. exception:",
requestHandler.ctx.channel(), topicName, e);
log.error("Failed to close reference for individual topic {}. exception:", topicName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public CompletableFuture<AbstractResponse> handleFetch(
tcm = pair.getValue().get();
if (tcm == null) {
// remove null future cache from consumerTopicManagers
requestHandler.getTopicManager().getConsumerTopicManagers()
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(pair.getKey()));
throw new NullPointerException("topic not owned, and return null TCM in fetch.");
}
Expand Down Expand Up @@ -239,7 +239,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
"cursor.readEntry fail. deleteCursor");
} else {
// remove null future cache from consumerTopicManagers
requestHandler.getTopicManager().getConsumerTopicManagers()
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaTopic));
log.warn("Cursor deleted while TCM close.");
}
Expand Down Expand Up @@ -329,8 +329,8 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
cm.add(pair.getRight(), pair);
} else {
// remove null future cache from consumerTopicManagers
requestHandler.getTopicManager().getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaPartition));
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaPartition));
log.warn("Cursor deleted while TCM close, failed to add cursor back to TCM.");
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@Slf4j
public class OffsetAcker implements Closeable {

private static final Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> EMPTY_CONSUMERS = new HashMap<>();
private static final Map<String, CompletableFuture<Consumer<byte[]>>> EMPTY_CONSUMERS = new HashMap<>();

private final ConsumerBuilder<byte[]> consumerBuilder;
private final BrokerService brokerService;
Expand All @@ -57,8 +57,8 @@ public class OffsetAcker implements Closeable {
// value is the created future of consumer.
// The consumer, whose subscription is the group id, is used for acknowledging message id cumulatively.
// This behavior is equivalent to committing offsets in Kafka.
private final Map<String, Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>>
consumers = new ConcurrentHashMap<>();
public static final Map<String, Map<String, CompletableFuture<Consumer<byte[]>>>>
CONSUMERS = new ConcurrentHashMap<>();

public OffsetAcker(PulsarClientImpl pulsarClient) {
this.consumerBuilder = pulsarClient.newConsumer()
Expand Down Expand Up @@ -138,8 +138,8 @@ public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> of

public void close(Set<String> groupIds) {
for (String groupId : groupIds) {
final Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>
consumersToRemove = consumers.remove(groupId);
final Map<String, CompletableFuture<Consumer<byte[]>>>
consumersToRemove = CONSUMERS.remove(groupId);
if (consumersToRemove == null) {
continue;
}
Expand All @@ -151,9 +151,7 @@ public void close(Set<String> groupIds) {
}
final Consumer<byte[]> consumer = consumerFuture.getNow(null);
if (consumer != null) {
if (log.isDebugEnabled()) {
log.debug("Try to close consumer of [group={}] [topic={}]", groupId, topicPartition.toString());
}
log.debug("Try to close consumer of [group={}] [topic={}]", groupId, topicPartition);
consumer.closeAsync();
}
});
Expand All @@ -162,35 +160,41 @@ public void close(Set<String> groupIds) {

@Override
public void close() {
log.info("close OffsetAcker with {} groupIds", consumers.size());
close(consumers.keySet());
log.info("close OffsetAcker with {} groupIds", CONSUMERS.size());
close(CONSUMERS.keySet());
}

@NonNull
public CompletableFuture<Consumer<byte[]>> getOrCreateConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> group = consumers
Map<String, CompletableFuture<Consumer<byte[]>>> group = CONSUMERS
.computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>());
KopTopic kopTopic = new KopTopic(topicPartition.topic());
String topicName = kopTopic.getPartitionName((topicPartition.partition()));
return group.computeIfAbsent(
topicPartition,
partition -> createConsumer(groupId, partition));
topicName,
name -> createConsumer(groupId, name));
}

@NonNull
private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, TopicPartition topicPartition) {
KopTopic kopTopic = new KopTopic(topicPartition.topic());
private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, String topicName) {
return consumerBuilder.clone()
.topic(kopTopic.getPartitionName(topicPartition.partition()))
.topic(topicName)
.subscriptionName(groupId)
.subscribeAsync();
}

public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
return consumers.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicPartition);
KopTopic kopTopic = new KopTopic(topicPartition.topic());
String topicName = kopTopic.getPartitionName((topicPartition.partition()));
return CONSUMERS.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicName);
}

public void removeConsumer(String groupId, TopicPartition topicPartition) {
KopTopic kopTopic = new KopTopic(topicPartition.topic());
String topicName = kopTopic.getPartitionName((topicPartition.partition()));

final CompletableFuture<Consumer<byte[]>> consumerFuture =
consumers.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicPartition);
CONSUMERS.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicName);
if (consumerFuture != null) {
consumerFuture.whenComplete((consumer, e) -> {
if (e == null) {
Expand All @@ -202,4 +206,18 @@ public void removeConsumer(String groupId, TopicPartition topicPartition) {
});
}
}

public static void removeOffsetAcker(String topicName) {
CONSUMERS.forEach((groupId, group) -> {
CompletableFuture<Consumer<byte[]> > consumerCompletableFuture = group.remove(topicName);
if (consumerCompletableFuture != null) {
consumerCompletableFuture.thenApply(Consumer::closeAsync).whenCompleteAsync((ignore, t) -> {
if (t != null) {
log.error("Failed to close offsetAcker consumer when remove partition {}.",
topicName);
}
});
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import static org.junit.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -31,7 +31,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -195,7 +194,7 @@ public void setup() throws Exception {
}


@AfterMethod
@AfterMethod(timeOut = 30000)
@Override
public void cleanup() throws Exception {
log.info("--- Shutting down ---");
Expand Down Expand Up @@ -298,19 +297,14 @@ public void testMutiBrokerAndCoordinator() throws Exception {
// 1. produce message with Kafka producer.
int totalMsgs = 50;
String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_" + partitionNumber + "_";
@Cleanup
KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true);
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);

// 2. create 4 kafka consumer from different consumer groups.
// consume data and commit offsets for 4 consumer group.
@Cleanup
KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1");
@Cleanup
KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2");
@Cleanup
KConsumer kConsumer3 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-3");
@Cleanup
KConsumer kConsumer4 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-4");

List<TopicPartition> topicPartitions = IntStream.range(0, partitionNumber)
Expand Down Expand Up @@ -394,6 +388,12 @@ public void testMutiBrokerAndCoordinator() throws Exception {
assertTrue(records.isEmpty());
records = kConsumer4.getConsumer().poll(Duration.ofMillis(200));
assertTrue(records.isEmpty());

kProducer.close();
kConsumer1.close();
kConsumer2.close();
kConsumer3.close();
kConsumer4.close();
}

// Unit test for unload / reload user topic bundle, verify it works well.
Expand Down Expand Up @@ -421,15 +421,12 @@ public void testMutiBrokerUnloadReload() throws Exception {
// 2. produce consume message with Kafka producer.
int totalMsgs = 50;
String messageStrPrefix = "Message_" + kafkaTopicName + "_";
@Cleanup
KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true);
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);

List<TopicPartition> topicPartitions = IntStream.range(0, partitionNumber)
.mapToObj(i -> new TopicPartition(kafkaTopicName, i)).collect(Collectors.toList());
@Cleanup
KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1");
@Cleanup
KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2");
log.info("Partition size: {}, will consume and commitOffset for 2 consumers",
topicPartitions.size());
Expand All @@ -445,6 +442,10 @@ public void testMutiBrokerUnloadReload() throws Exception {
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);
kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions);
kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions);

kProducer.close();
kConsumer1.close();
kConsumer2.close();
}

@Test(timeOut = 30000)
Expand All @@ -470,15 +471,12 @@ public void testOneBrokerShutdown() throws Exception {
// 2. produce consume message with Kafka producer.
int totalMsgs = 50;
String messageStrPrefix = "Message_" + kafkaTopicName + "_";
@Cleanup
KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true);
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);

List<TopicPartition> topicPartitions = IntStream.range(0, partitionNumber)
.mapToObj(i -> new TopicPartition(kafkaTopicName, i)).collect(Collectors.toList());
@Cleanup
KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1");
@Cleanup
KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2");
log.info("Partition size: {}, will consume and commitOffset for 2 consumers",
topicPartitions.size());
Expand All @@ -494,5 +492,9 @@ public void testOneBrokerShutdown() throws Exception {
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);
kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions);
kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions);

kProducer.close();
kConsumer1.close();
kConsumer2.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
doReturn(mockZooKeeper).when(pulsar).getZkClient();

Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
Expand Down Expand Up @@ -463,7 +464,7 @@ public KProducer(String topic, Boolean isAsync, String host,
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSer);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);

if (retry) {
props.put(ProducerConfig.RETRIES_CONFIG, 3);
Expand Down

0 comments on commit 31e9295

Please sign in to comment.