diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4f64d5aab869c..b9a8e74b9a48d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1075,10 +1075,22 @@ public CompletableFuture> getTopic(final TopicName topicName, bo return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); } - return CompletableFuture.completedFuture(Optional.empty()); + final String errorMsg = + String.format("Illegal topic partition name %s with max allowed " + + "%d partitions", topicName, metadata.partitions); + log.warn(errorMsg); + return FutureUtil + .failedFuture(new BrokerServiceException.NotAllowedException(errorMsg)); }); } return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); + }).thenCompose(optionalTopic -> { + if (!optionalTopic.isPresent() && createIfMissing) { + log.warn("[{}] Try to recreate the topic with createIfMissing=true " + + "but the returned topic is empty", topicName); + return getTopic(topicName, createIfMissing, properties); + } + return CompletableFuture.completedFuture(optionalTopic); }); }); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 5abb0e02e588b..2a49c14e35583 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3193,7 +3193,7 @@ public void testFailedUpdatePartitionedTopic() throws Exception { admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1, MessageId.earliest); fail("Unexpected behaviour"); - } catch (PulsarAdminException.PreconditionFailedException ex) { + } catch (PulsarAdminException.ConflictException ex) { // OK } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 590edc2d3f3bb..c9138beee52d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -27,7 +27,10 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -40,6 +43,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.TopicType; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -55,6 +59,7 @@ protected void setup() throws Exception { conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); conf.setAllowAutoTopicCreation(true); conf.setDefaultNumPartitions(3); + conf.setForceDeleteNamespaceAllowed(true); super.internalSetup(); super.producerBaseSetup(); } @@ -186,4 +191,56 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() } } + + @Test + public void testClientWithAutoCreationGotNotFoundException() throws PulsarAdminException, PulsarClientException { + final String namespace = "public/test_1"; + final String topicName = "persistent://public/test_1/test_auto_creation_got_not_found" + + System.currentTimeMillis(); + final int retryTimes = 30; + admin.namespaces().createNamespace(namespace); + admin.namespaces().setAutoTopicCreation(namespace, AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType("non-partitioned") + .build()); + + @Cleanup("shutdown") + final ExecutorService executor1 = Executors.newSingleThreadExecutor(); + + @Cleanup("shutdown") + final ExecutorService executor2 = Executors.newSingleThreadExecutor(); + + for (int i = 0; i < retryTimes; i++) { + final CompletableFuture adminListSub = CompletableFuture.runAsync(() -> { + try { + admin.topics().getSubscriptions(topicName); + } catch (PulsarAdminException e) { + throw new RuntimeException(e); + } + }, executor1); + + final CompletableFuture> consumerSub = CompletableFuture.supplyAsync(() -> { + try { + return pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-1") + .subscribe(); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + }, executor2); + + try { + adminListSub.join(); + } catch (Throwable ex) { + // we don't care the exception. + } + + consumerSub.join().close(); + admin.topics().delete(topicName, true); + } + + admin.namespaces().deleteNamespace(namespace, true); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 9995b6a28a903..6f60a13fd4894 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -457,8 +457,7 @@ public void testCreateNonExistentPartitions() throws PulsarAdminException, Pulsa .topic(partition.toString()) .create(); fail("unexpected behaviour"); - } catch (PulsarClientException.TopicDoesNotExistException ignored) { - + } catch (PulsarClientException.NotAllowedException ex) { } Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); }