Skip to content

Commit

Permalink
add kop address cache to prevent query zk each time (apache#257)
Browse files Browse the repository at this point in the history
improvement for [apache#256](streamnative/kop#256)
  • Loading branch information
dockerzhang authored Dec 10, 2020
1 parent 024ea31 commit 752bfa1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void onLoad(NamespaceBundle bundle) {
if (Topic.GROUP_METADATA_TOPIC_NAME.equals(getKafkaTopicNameFromPulsarTopicname(name))) {
checkState(name.isPartitioned(),
"OffsetTopic should be partitioned in onLoad, but get " + name);
KafkaTopicManager.removeLookupCache(name.toString());
KafkaTopicManager.removeTopicManagerCache(name.toString());

if (log.isDebugEnabled()) {
log.debug("New offset partition load: {}, broker: {}",
Expand Down Expand Up @@ -129,7 +129,7 @@ public void unLoad(NamespaceBundle bundle) {
if (Topic.GROUP_METADATA_TOPIC_NAME.equals(getKafkaTopicNameFromPulsarTopicname(name))) {
checkState(name.isPartitioned(),
"OffsetTopic should be partitioned in unLoad, but get " + name);
KafkaTopicManager.removeLookupCache(name.toString());
KafkaTopicManager.removeTopicManagerCache(name.toString());

if (log.isDebugEnabled()) {
log.debug("Offset partition unload: {}, broker: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("[{}] failed get pulsar address, returned null.", topic.toString());

// getTopicBroker returns null. topic should be removed from LookupCache.
topicManager.removeLookupCache(topic.toString());
topicManager.removeTopicManagerCache(topic.toString());

returnFuture.complete(Optional.empty());
return returnFuture;
Expand All @@ -1235,6 +1235,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
topic, pulsarAddress);
}

// get kop address from cache to prevent query zk each time.
if (topicManager.KOP_ADDRESS_CACHE.containsKey(topic.toString())) {
return topicManager.KOP_ADDRESS_CACHE.get(topic.toString());
}
// advertised data is write in /loadbalance/brokers/advertisedAddress:webServicePort
// here we get the broker url, need to find related webServiceUrl.
ZooKeeperCache zkCache = pulsarService.getLocalZkCache();
Expand Down Expand Up @@ -1290,6 +1294,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}

if (lookupDataContainsAddress(data, hostAndPort)) {
topicManager.KOP_ADDRESS_CACHE.put(topic.toString(), returnFuture);
returnFuture.complete(data.getProtocol(KafkaProtocolHandler.PROTOCOL_NAME));
return;
}
Expand Down Expand Up @@ -1358,7 +1363,7 @@ private CompletableFuture<PartitionMetadata> findBroker(TopicName topic) {
// here we found topic broker: broker2, but this is in broker1,
// how to clean the lookup cache?
if (!localListeners.contains(kopBrokerUrl)) {
topicManager.removeLookupCache(topic.toString());
topicManager.removeTopicManagerCache(topic.toString());
}

if (!topicManager.topicExists(topic.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -70,6 +71,9 @@ public class KafkaTopicManager {
public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>
LOOKUP_CACHE = new ConcurrentHashMap<>();

public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>>
KOP_ADDRESS_CACHE = new ConcurrentHashMap<>();

KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
this.requestHandler = kafkaRequestHandler;
this.pulsarService = kafkaRequestHandler.getPulsarService();
Expand Down Expand Up @@ -128,8 +132,9 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
);
}

public static void removeLookupCache(String topicName) {
public static void removeTopicManagerCache(String topicName) {
LOOKUP_CACHE.remove(topicName);
KOP_ADDRESS_CACHE.remove(topicName);
}

// whether topic exists in cache.
Expand Down Expand Up @@ -273,7 +278,7 @@ public CompletableFuture<PersistentTopic> getTopic(String topicName) {

// get topic broker returns null. topic should be removed from LookupCache.
if (ignore == null) {
removeLookupCache(topicName);
removeTopicManagerCache(topicName);
}

topicCompletableFuture.complete(null);
Expand All @@ -290,7 +295,7 @@ public CompletableFuture<PersistentTopic> getTopic(String topicName) {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), t, throwable);
// failed to getTopic from current broker, remove cache, which added in getTopicBroker.
removeLookupCache(t);
removeTopicManagerCache(t);
topicCompletableFuture.complete(null);
return;
}
Expand Down Expand Up @@ -351,7 +356,7 @@ public synchronized void close() {

for (Map.Entry<String, CompletableFuture<PersistentTopic>> entry : topics.entrySet()) {
String topicName = entry.getKey();
removeLookupCache(topicName);
removeTopicManagerCache(topicName);
CompletableFuture<PersistentTopic> topicFuture = entry.getValue();
if (log.isDebugEnabled()) {
log.debug("[{}] remove producer {} for topic {} at close()",
Expand All @@ -376,7 +381,7 @@ public Producer getReferenceProducer(String topicName) {

public void deReference(String topicName) {
try {
removeLookupCache(topicName);
removeTopicManagerCache(topicName);

if (consumerTopicManagers.containsKey(topicName)) {
CompletableFuture<KafkaTopicConsumerManager> manager = consumerTopicManagers.get(topicName);
Expand Down

0 comments on commit 752bfa1

Please sign in to comment.