diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index cac70f6896e5f..50bfe6d3df54c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2704,12 +2704,9 @@ public CompletableFuture fetchPartitionedTopicMetadata if (pulsar.getNamespaceService() == null) { return FutureUtil.failedFuture(new NamingException("namespace service is not ready")); } - Optional policies = - pulsar.getPulsarResources().getNamespaceResources() - .getPoliciesIfCached(topicName.getNamespaceObject()); - return pulsar.getNamespaceService().checkTopicExists(topicName) - .thenCompose(topicExists -> { - return fetchPartitionedTopicMetadataAsync(topicName) + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(policies -> pulsar.getNamespaceService().checkTopicExists(topicName) + .thenCompose(topicExists -> fetchPartitionedTopicMetadataAsync(topicName) .thenCompose(metadata -> { CompletableFuture future = new CompletableFuture<>(); @@ -2751,8 +2748,8 @@ public CompletableFuture fetchPartitionedTopicMetadata }); return future; - }); - }); + })) + ); } @SuppressWarnings("deprecation") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index e6c12bbc5785f..579db85dc249b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1157,7 +1157,7 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex } else { // check topic should not be created if disable autoCreateTopic. Assert.assertTrue(load.isDone()); - Assert.assertTrue(load.get().isPresent()); + Assert.assertFalse(load.get().isPresent()); } } } finally { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java new file mode 100644 index 0000000000000..5068a8ccb6b0b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.Cleanup; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.UUID; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +@Slf4j +public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase { + + @Override + @BeforeMethod + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(invocationCount = 3) + @SneakyThrows + public void testPartitionedTopicAutoCreation() { + // Create namespace + final String namespace = "public/testPartitionedTopicAutoCreation"; + admin.namespaces().createNamespace(namespace); + // Set policies + final AutoTopicCreationOverride expectedPolicies = AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType("partitioned") + .defaultNumPartitions(1) + .build(); + admin.namespaces().setAutoTopicCreation(namespace, expectedPolicies); + // Double-check the policies + final AutoTopicCreationOverride nsAutoTopicCreationOverride = admin.namespaces() + .getAutoTopicCreation(namespace); + Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies); + // Background invalidate cache + final MetadataCache nsCache = pulsar.getPulsarResources().getNamespaceResources().getCache(); + final Thread t1 = new Thread(() -> { + while (true) { + nsCache.invalidate("/admin/policies/" + namespace); + } + }); + t1.start(); + + // trigger auto-creation + final String topicName = "persistent://" + namespace + "/test-" + UUID.randomUUID(); + @Cleanup final Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + final List topics = admin.topics().getList(namespace); + Assert.assertEquals(topics.size(), 1); // expect only one topic + Assert.assertEquals(topics.get(0), + TopicName.get(topicName).getPartition(0).toString()); // expect partitioned topic + + // double-check policies + final AutoTopicCreationOverride actualPolicies2 = admin.namespaces().getAutoTopicCreation(namespace); + Assert.assertEquals(actualPolicies2, expectedPolicies); + + t1.interrupt(); + } +} \ No newline at end of file