Skip to content

Commit

Permalink
[fix][client] ExtNonPersistentTopics and prevent prefix match (apache…
Browse files Browse the repository at this point in the history
…#19065)

Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Dec 27, 2022
1 parent d12a314 commit 70803bb
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 15 deletions.
3 changes: 2 additions & 1 deletion pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,8 @@
<location>org.apache.pulsar.broker.admin.v2.NonPersistentTopics</location>
<location>org.apache.pulsar.broker.admin.v2.PersistentTopics</location>
<!-- See https://github.com/apache/pulsar/issues/18947 -->
<!-- <location>org.apache.pulsar.broker.admin.v2.PersistentTopicsExt</location> -->
<!-- <location>org.apache.pulsar.broker.admin.v2.ExtPersistentTopics</location> -->
<!-- <location>org.apache.pulsar.broker.admin.v2.ExtNonPersistentTopics</location> -->
<location>org.apache.pulsar.broker.admin.v2.ResourceGroups</location>
<location>org.apache.pulsar.broker.admin.v2.ResourceQuotas</location>
<location>org.apache.pulsar.broker.admin.v2.SchemasResource</location>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.admin.v2;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is for preventing docs conflict before we find a good way to fix
* <a href="https://github.com/apache/pulsar/issues/18947">ISSUE-18947</a>.
*/
@Path("/non-persistent")
@Produces(MediaType.APPLICATION_JSON)
@Api(value = "/non-persistent", description = "Non-Persistent topic admin apis", tags = "non-persistent topic")
public class ExtNonPersistentTopics extends PersistentTopicsBase {

@PUT
@Consumes(PartitionedTopicMetadata.MEDIA_TYPE)
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Create a partitioned topic.",
notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and"
+ " less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
@ApiResponse(code = 412,
message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createPartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The metadata for the topic",
required = true, type = "PartitionedTopicMetadata") PartitionedTopicMetadata metadata,
@QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateNamespaceName(tenant, namespace);
validateGlobalNamespaceOwnership();
validateTopicName(tenant, namespace, encodedTopic);
internalCreatePartitionedTopic(asyncResponse, metadata.partitions, createLocalTopicOnly,
metadata.properties);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

private static final Logger log = LoggerFactory.getLogger(NonPersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
@Path("/persistent")
@Produces(MediaType.APPLICATION_JSON)
@Api(value = "/persistent", description = "Persistent topic admin apis", tags = "persistent topic")
public class PersistentTopicsExt extends PersistentTopicsBase {
public class ExtPersistentTopics extends PersistentTopicsBase {

@PUT
@Consumes(PartitionedTopicMetadata.MEDIA_TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopicsExt;
import org.apache.pulsar.broker.admin.v2.ExtPersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand Down Expand Up @@ -106,7 +106,7 @@
public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {

private PersistentTopics persistentTopics;
private PersistentTopicsExt persistentTopicsExt;
private ExtPersistentTopics extPersistentTopics;
private final String testTenant = "my-tenant";
private final String testLocalCluster = "use";
private final String testNamespace = "my-namespace";
Expand Down Expand Up @@ -137,15 +137,15 @@ protected void setup() throws Exception {
doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();

persistentTopicsExt = spy(PersistentTopicsExt.class);
persistentTopicsExt.setServletContext(new MockServletContext());
persistentTopicsExt.setPulsar(pulsar);
doReturn(false).when(persistentTopicsExt).isRequestHttps();
doReturn(null).when(persistentTopicsExt).originalPrincipal();
doReturn("test").when(persistentTopicsExt).clientAppId();
doReturn(TopicDomain.persistent.value()).when(persistentTopicsExt).domain();
doNothing().when(persistentTopicsExt).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopicsExt).clientAuthData();
extPersistentTopics = spy(ExtPersistentTopics.class);
extPersistentTopics.setServletContext(new MockServletContext());
extPersistentTopics.setPulsar(pulsar);
doReturn(false).when(extPersistentTopics).isRequestHttps();
doReturn(null).when(extPersistentTopics).originalPrincipal();
doReturn("test").when(extPersistentTopics).clientAppId();
doReturn(TopicDomain.persistent.value()).when(extPersistentTopics).domain();
doNothing().when(extPersistentTopics).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(extPersistentTopics).clientAuthData();

nonPersistentTopic = spy(NonPersistentTopics.class);
nonPersistentTopic.setServletContext(new MockServletContext());
Expand Down Expand Up @@ -555,7 +555,7 @@ public void testCreatePartitionedTopic() {
Map<String, String> topicMetadata = new HashMap<>();
topicMetadata.put("key1", "value1");
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
persistentTopicsExt.createPartitionedTopic(response2, testTenant, testNamespace, topicName2, metadata, true);
extPersistentTopics.createPartitionedTopic(response2, testTenant, testNamespace, topicName2, metadata, true);
Awaitility.await().untilAsserted(() -> {
persistentTopics.getPartitionedMetadata(response2,
testTenant, testNamespace, topicName2, true, false);
Expand Down Expand Up @@ -660,7 +660,7 @@ public void testUpdatePartitionedTopicHavingProperties() throws Exception {
ArgumentCaptor<PartitionedTopicMetadata> responseCaptor =
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
persistentTopicsExt.createPartitionedTopic(response, tenant, namespace, topic, metadata, true);
extPersistentTopics.createPartitionedTopic(response, tenant, namespace, topic, metadata, true);
Awaitility.await().untilAsserted(() -> {
persistentTopics.getPartitionedMetadata(response,
tenant, namespace, topic, true, false);
Expand Down

0 comments on commit 70803bb

Please sign in to comment.