From 3794711fe28318a85e48c9ecf48ce0ca4c3ad929 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 26 Dec 2022 22:56:44 +0800 Subject: [PATCH] [fix][client] ExtNonPersistentTopics and prevent prefix match Signed-off-by: tison --- pulsar-broker/pom.xml | 3 +- .../admin/v2/ExtNonPersistentTopics.java | 94 +++++++++++++++++++ ...opicsExt.java => ExtPersistentTopics.java} | 2 +- .../broker/admin/PersistentTopicsTest.java | 26 ++--- 4 files changed, 110 insertions(+), 15 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ExtNonPersistentTopics.java rename pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/{PersistentTopicsExt.java => ExtPersistentTopics.java} (98%) diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index e200d368027ee..ce20aed7bede6 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -706,7 +706,8 @@ org.apache.pulsar.broker.admin.v2.NonPersistentTopics org.apache.pulsar.broker.admin.v2.PersistentTopics - + + org.apache.pulsar.broker.admin.v2.ResourceGroups org.apache.pulsar.broker.admin.v2.ResourceQuotas org.apache.pulsar.broker.admin.v2.SchemasResource diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ExtNonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ExtNonPersistentTopics.java new file mode 100644 index 0000000000000..665e887fa6bea --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ExtNonPersistentTopics.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.admin.v2; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.Encoded; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.MediaType; +import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is for preventing docs conflict before we find a good way to fix + * ISSUE-18947. + */ +@Path("/non-persistent") +@Produces(MediaType.APPLICATION_JSON) +@Api(value = "/non-persistent", description = "Non-Persistent topic admin apis", tags = "non-persistent topic") +public class ExtNonPersistentTopics extends PersistentTopicsBase { + + @PUT + @Consumes(PartitionedTopicMetadata.MEDIA_TYPE) + @Path("/{tenant}/{namespace}/{topic}/partitions") + @ApiOperation(value = "Create a partitioned topic.", + notes = "It needs to be called before creating a producer on a partitioned topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist"), + @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and" + + " less than or equal to maxNumPartitionsPerPartitionedTopic"), + @ApiResponse(code = 409, message = "Partitioned topic already exist"), + @ApiResponse(code = 412, + message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") + }) + public void createPartitionedTopic( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "The metadata for the topic", + required = true, type = "PartitionedTopicMetadata") PartitionedTopicMetadata metadata, + @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) { + try { + validateNamespaceName(tenant, namespace); + validateGlobalNamespaceOwnership(); + validateTopicName(tenant, namespace, encodedTopic); + internalCreatePartitionedTopic(asyncResponse, metadata.partitions, createLocalTopicOnly, + metadata.properties); + } catch (Exception e) { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + } + } + + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopicsExt.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ExtPersistentTopics.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopicsExt.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ExtPersistentTopics.java index 441ea1121cdf3..2c449cb501062 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopicsExt.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ExtPersistentTopics.java @@ -49,7 +49,7 @@ @Path("/persistent") @Produces(MediaType.APPLICATION_JSON) @Api(value = "/persistent", description = "Persistent topic admin apis", tags = "persistent topic") -public class PersistentTopicsExt extends PersistentTopicsBase { +public class ExtPersistentTopics extends PersistentTopicsBase { @PUT @Consumes(PartitionedTopicMetadata.MEDIA_TYPE) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index eb2c878b5500c..875bf629d4f86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -53,7 +53,7 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.admin.v2.NonPersistentTopics; import org.apache.pulsar.broker.admin.v2.PersistentTopics; -import org.apache.pulsar.broker.admin.v2.PersistentTopicsExt; +import org.apache.pulsar.broker.admin.v2.ExtPersistentTopics; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -106,7 +106,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { private PersistentTopics persistentTopics; - private PersistentTopicsExt persistentTopicsExt; + private ExtPersistentTopics extPersistentTopics; private final String testTenant = "my-tenant"; private final String testLocalCluster = "use"; private final String testNamespace = "my-namespace"; @@ -137,15 +137,15 @@ protected void setup() throws Exception { doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant); doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData(); - persistentTopicsExt = spy(PersistentTopicsExt.class); - persistentTopicsExt.setServletContext(new MockServletContext()); - persistentTopicsExt.setPulsar(pulsar); - doReturn(false).when(persistentTopicsExt).isRequestHttps(); - doReturn(null).when(persistentTopicsExt).originalPrincipal(); - doReturn("test").when(persistentTopicsExt).clientAppId(); - doReturn(TopicDomain.persistent.value()).when(persistentTopicsExt).domain(); - doNothing().when(persistentTopicsExt).validateAdminAccessForTenant(this.testTenant); - doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopicsExt).clientAuthData(); + extPersistentTopics = spy(ExtPersistentTopics.class); + extPersistentTopics.setServletContext(new MockServletContext()); + extPersistentTopics.setPulsar(pulsar); + doReturn(false).when(extPersistentTopics).isRequestHttps(); + doReturn(null).when(extPersistentTopics).originalPrincipal(); + doReturn("test").when(extPersistentTopics).clientAppId(); + doReturn(TopicDomain.persistent.value()).when(extPersistentTopics).domain(); + doNothing().when(extPersistentTopics).validateAdminAccessForTenant(this.testTenant); + doReturn(mock(AuthenticationDataHttps.class)).when(extPersistentTopics).clientAuthData(); nonPersistentTopic = spy(NonPersistentTopics.class); nonPersistentTopic.setServletContext(new MockServletContext()); @@ -555,7 +555,7 @@ public void testCreatePartitionedTopic() { Map topicMetadata = new HashMap<>(); topicMetadata.put("key1", "value1"); PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata); - persistentTopicsExt.createPartitionedTopic(response2, testTenant, testNamespace, topicName2, metadata, true); + extPersistentTopics.createPartitionedTopic(response2, testTenant, testNamespace, topicName2, metadata, true); Awaitility.await().untilAsserted(() -> { persistentTopics.getPartitionedMetadata(response2, testTenant, testNamespace, topicName2, true, false); @@ -660,7 +660,7 @@ public void testUpdatePartitionedTopicHavingProperties() throws Exception { ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class); PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata); - persistentTopicsExt.createPartitionedTopic(response, tenant, namespace, topic, metadata, true); + extPersistentTopics.createPartitionedTopic(response, tenant, namespace, topic, metadata, true); Awaitility.await().untilAsserted(() -> { persistentTopics.getPartitionedMetadata(response, tenant, namespace, topic, true, false);