Skip to content

Commit

Permalink
[improve] [broker] Improve cache handling for partitioned topic metad…
Browse files Browse the repository at this point in the history
…ata when doing lookup (apache#21063)

Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below:
1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer.
1. `Broker-side`: Create partitioned topic metadata.
1. `Broker-side`: response `{"partitions":3}`.
1. `Client-side`: Create separate connections for each partition of the topic.
1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics.

In the `step 2` above, the flow of the progress is like the below:
1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` )
1. Check the partitioned topic metadata already exists.
1. Try to create the partitioned topic metadata if it does not exist.
1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client.

There is a race condition that makes the client get non-partitioned metadata of the topic:
| time | `broker-1` | `broker-2` |
| --- | --- | --- |
| 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` |
| 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists |
| 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path |
| 4 |  | succeed create the partitioned topic metadata |
| 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata |
| 6 | Creating the metadata failed due to it already exists |
| 7 | Read the partitioned topic metadata again |

If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client.

**What thing would make the `step-5` is executed later than `step-7`?**
Provide a scenario: Such as the issue that the PR apache#20303 fixed, it makes `zk operation` and `zk node changed notifications`  executed in different threads: `main-thread of ZK client` and `metadata store thread`.

Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it.

Modifications: Before reading the partitioned topic metadata again, refresh the cache first.
(cherry picked from commit d099ac4)
(cherry picked from commit 2e534d2)
  • Loading branch information
poorbarcode authored and mukesh-ctds committed Apr 19, 2024
1 parent db4ecfe commit 802aff5
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3151,8 +3151,11 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
if (ex.getCause()
instanceof MetadataStoreException
.AlreadyExistsException) {
log.info("[{}] The partitioned topic is already"
+ " created, try to refresh the cache and read"
+ " again.", topicName);
// The partitioned topic might be created concurrently
fetchPartitionedTopicMetadataAsync(topicName)
fetchPartitionedTopicMetadataAsync(topicName, true)
.whenComplete((metadata2, ex2) -> {
if (ex2 == null) {
future.complete(metadata2);
Expand All @@ -3161,6 +3164,9 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
}
});
} else {
log.error("[{}] operation of creating partitioned"
+ " topic metadata failed",
topicName, ex);
future.completeExceptionally(ex);
}
return null;
Expand Down Expand Up @@ -3206,9 +3212,14 @@ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopi
}

public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) {
return fetchPartitionedTopicMetadataAsync(topicName, false);
}

public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName,
boolean refreshCacheAndGet) {
// gets the number of partitions from the configuration cache
return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(topicName).thenApply(metadata -> {
.getPartitionedTopicMetadataAsync(topicName, refreshCacheAndGet).thenApply(metadata -> {
// if the partitioned topic is not found in metadata, then the topic is not partitioned
return metadata.orElseGet(() -> new PartitionedTopicMetadata());
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class BrokerServiceChaosTest extends CanReconnectZKClientPulsarServiceBaseTest {

@Override
@BeforeClass(alwaysRun = true, timeOut = 300000)
public void setup() throws Exception {
super.setup();
}

@Override
@AfterClass(alwaysRun = true, timeOut = 300000)
public void cleanup() throws Exception {
super.cleanup();
}

@Test
public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws Exception {
final String configMetadataStoreConnectString =
WhiteboxImpl.getInternalState(pulsar.getConfigurationMetadataStore(), "zkConnectString");
final ZooKeeper anotherZKCli = new ZooKeeper(configMetadataStoreConnectString, 5000, null);
// Set policy of auto create topic to PARTITIONED.
final String ns = defaultTenant + "/ns_" + UUID.randomUUID().toString().replaceAll("-", "");
final TopicName topicName1 = TopicName.get("persistent://" + ns + "/tp1");
final TopicName topicName2 = TopicName.get("persistent://" + ns + "/tp2");
admin.namespaces().createNamespace(ns);
AutoTopicCreationOverride autoTopicCreationOverride =
new AutoTopicCreationOverrideImpl.AutoTopicCreationOverrideImplBuilder().allowAutoTopicCreation(true)
.topicType(TopicType.PARTITIONED.toString())
.defaultNumPartitions(3).build();
admin.namespaces().setAutoTopicCreationAsync(ns, autoTopicCreationOverride);
// Make the cache of namespace policy is valid.
admin.namespaces().getAutoSubscriptionCreation(ns);
// Trigger the zk node "/admin/partitioned-topics/{namespace}/persistent" created.
admin.topics().createPartitionedTopic(topicName1.toString(), 2);
admin.topics().deletePartitionedTopic(topicName1.toString());

// Since there is no partitioned metadata created, the partitions count of metadata will be 0.
PartitionedTopicMetadata partitionedTopicMetadata1 =
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2).get();
assertEquals(partitionedTopicMetadata1.partitions, 0);

// Create the partitioned metadata by another zk client.
// Make a error to make the cache could not update.
makeLocalMetadataStoreKeepReconnect();
anotherZKCli.create("/admin/partitioned-topics/" + ns + "/persistent/" + topicName2.getLocalName(),
"{\"partitions\":3}".getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
stopLocalMetadataStoreAlwaysReconnect();

// Get the partitioned metadata from cache, there is 90% chance that partitions count of metadata is 0.
PartitionedTopicMetadata partitionedTopicMetadata2 =
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2).get();
// Note: If you want to reproduce the issue, you can perform validation on the next line.
// assertEquals(partitionedTopicMetadata2.partitions, 0);

// Verify the new method will return a correct result.
PartitionedTopicMetadata partitionedTopicMetadata3 =
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2, true).get();
assertEquals(partitionedTopicMetadata3.partitions, 3);

// cleanup.
admin.topics().deletePartitionedTopic(topicName2.toString());
anotherZKCli.close();
}
}

0 comments on commit 802aff5

Please sign in to comment.