diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java index 6c768a01af240..4e6de472bf85e 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java @@ -32,6 +32,7 @@ import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; +import javax.ws.rs.Encoded; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; @@ -86,6 +87,7 @@ import com.yahoo.pulsar.common.policies.data.Policies; import com.yahoo.pulsar.common.util.Codec; import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer; +import static com.yahoo.pulsar.common.util.Codec.decode; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; @@ -159,7 +161,8 @@ public List getList(@PathParam("property") String property, @PathParam(" @ApiResponse(code = 404, message = "Namespace doesn't exist") }) public Map> getPermissionsOnDestination(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("destination") String destination) { + @PathParam("destination") @Encoded String destination) { + destination = decode(destination); // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessOnProperty(property); @@ -217,7 +220,8 @@ protected void validateAdminOperationOnDestination(DestinationName fqdn, boolean @ApiResponse(code = 409, message = "Concurrent modification") }) public void grantPermissionsOnDestination(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("destination") String destination, @PathParam("role") String role, Set actions) { + @PathParam("destination") @Encoded String destination, @PathParam("role") String role, Set actions) { + destination = decode(destination); // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessOnProperty(property); validatePoliciesReadOnlyAccess(); @@ -264,7 +268,8 @@ public void grantPermissionsOnDestination(@PathParam("property") String property @ApiResponse(code = 412, message = "Permissions are not set at the destination level") }) public void revokePermissionsOnDestination(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("destination") String destination, @PathParam("role") String role) { + @PathParam("destination") @Encoded String destination, @PathParam("role") String role) { + destination = decode(destination); // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessOnProperty(property); validatePoliciesReadOnlyAccess(); @@ -317,8 +322,9 @@ public void revokePermissionsOnDestination(@PathParam("property") String propert @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 409, message = "Partitioned topic already exist") }) public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, int numPartitions, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, int numPartitions, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); validateAdminAccessOnProperty(dn.getProperty()); if (numPartitions <= 1) { @@ -345,41 +351,12 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path @Path("/{property}/{cluster}/{namespace}/{destination}/partitions") @ApiOperation(value = "Get partitioned topic metadata.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) - public PartitionedTopicMetadata getPartitionedTopicMetadata(@PathParam("property") String property, + public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("destination") String destination, + @PathParam("destination") @Encoded String destination, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); - validateClusterOwnership(dn.getCluster()); - - try { - checkConnect(dn); - } catch (RestException e) { - validateAdminAccessOnProperty(dn.getProperty()); - } - - String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(), - dn.getEncodedLocalName()); - PartitionedTopicMetadata partitionMetadata; - try { - // gets the number of partitions from the zk cache - partitionMetadata = globalZkCache().getData(path, new Deserializer() { - - @Override - public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception { - return jsonMapper().readValue(content, PartitionedTopicMetadata.class); - } - }); - } catch (Exception e) { - // if the partitioned topic is not found in zk cache, it returns zero - partitionMetadata = new PartitionedTopicMetadata(); - } - - if (log.isDebugEnabled()) { - log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), dn, - partitionMetadata.partitions); - } - return partitionMetadata; + destination = decode(destination); + return getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); } @DELETE @@ -388,8 +365,9 @@ public PartitionedTopicMetadata deserialize(String key, byte[] content) throws E @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Partitioned topic does not exist") }) public void deletePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); validateAdminAccessOnProperty(dn.getProperty()); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, @@ -461,8 +439,9 @@ public void deletePartitionedTopic(@PathParam("property") String property, @Path @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Topic has active producers/subscriptions") }) public void deleteTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); validateAdminOperationOnDestination(dn, authoritative); PersistentTopic topic = getTopicReference(dn); @@ -491,8 +470,9 @@ public void deleteTopic(@PathParam("property") String property, @PathParam("clus @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist") }) public List getSubscriptions(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); List subscriptions = Lists.newArrayList(); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, @@ -527,8 +507,9 @@ public List getSubscriptions(@PathParam("property") String property, @Pa @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist") }) public PersistentTopicStats getStats(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); validateAdminOperationOnDestination(dn, authoritative); PersistentTopic topic = getTopicReference(dn); @@ -542,8 +523,9 @@ public PersistentTopicStats getStats(@PathParam("property") String property, @Pa @ApiResponse(code = 404, message = "Topic does not exist") }) public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("destination") String destination, + @PathParam("destination") @Encoded String destination, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); validateAdminOperationOnDestination(dn, authoritative); PersistentTopic topic = getTopicReference(dn); @@ -557,8 +539,9 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri @ApiResponse(code = 404, message = "Topic does not exist") }) public PartitionedTopicStats getPartitionedStats(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("destination") String destination, + @PathParam("destination") @Encoded String destination, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); @@ -586,10 +569,10 @@ public PartitionedTopicStats getPartitionedStats(@PathParam("property") String p @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 412, message = "Subscription has active consumers") }) public void deleteSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @PathParam("subName") String subName, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); @@ -631,9 +614,10 @@ public void deleteSubscription(@PathParam("property") String property, @PathPara @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist") }) public void skipAllMessages(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @PathParam("subName") String subName, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); @@ -677,9 +661,10 @@ public void skipAllMessages(@PathParam("property") String property, @PathParam(" @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist") }) public void skipMessages(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @PathParam("subName") String subName, @PathParam("numMessages") int numMessages, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); @@ -713,48 +698,12 @@ public void skipMessages(@PathParam("property") String property, @PathParam("clu @ApiOperation(value = "Expire messages on a topic subscription.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist") }) - public void expireMessages(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + public void expireTopicMessages(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @PathParam("subName") String subName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, - destination, authoritative); - if (partitionMetadata.partitions > 0) { - // expire messages for each partition destination - try { - for (int i = 0; i < partitionMetadata.partitions; i++) { - pulsar().getAdminClient().persistentTopics().expireMessages(dn.getPartition(i).toString(), subName, - expireTimeInSeconds); - } - } catch (Exception e) { - throw new RestException(e); - } - } else { - // validate ownership and redirect if current broker is not owner - validateAdminOperationOnDestination(dn, authoritative); - PersistentTopic topic = getTopicReference(dn); - try { - if (subName.startsWith(topic.replicatorPrefix)) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = topic.getPersistentReplicator(remoteCluster); - checkNotNull(repl); - repl.expireMessages(expireTimeInSeconds); - } else { - PersistentSubscription sub = topic.getPersistentSubscription(subName); - checkNotNull(sub); - sub.expireMessages(expireTimeInSeconds); - } - log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, dn, - subName); - } catch (NullPointerException npe) { - throw new RestException(Status.NOT_FOUND, "Subscription not found"); - } catch (Exception exception) { - log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", clientAppId(), - expireTimeInSeconds, dn, subName, exception); - throw new RestException(exception); - } - } + destination = decode(destination); + expireMessages(property, cluster, namespace, destination, subName, expireTimeInSeconds, authoritative); } @POST @@ -764,8 +713,9 @@ public void expireMessages(@PathParam("property") String property, @PathParam("c @ApiResponse(code = 404, message = "Topic or subscription does not exist") }) public void expireMessagesForAllSubscriptions(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("destination") String destination, @PathParam("expireTimeInSeconds") int expireTimeInSeconds, + @PathParam("destination") @Encoded String destinationName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + final String destination = decode(destinationName); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); @@ -802,10 +752,10 @@ public void expireMessagesForAllSubscriptions(@PathParam("property") String prop @ApiResponse(code = 405, message = "Not supported for global topics"), @ApiResponse(code = 412, message = "Subscription has active consumers") }) public void resetCursor(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @PathParam("subName") String subName, @PathParam("timestamp") long timestamp, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); @@ -878,9 +828,10 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist") }) public Response peekNthMessage(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("destination") String destination, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @PathParam("subName") String subName, @PathParam("messagePosition") int messagePosition, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); @@ -960,8 +911,9 @@ public void write(OutputStream output) throws IOException, WebApplicationExcepti @ApiResponse(code = 404, message = "Namespace does not exist") }) public PersistentOfflineTopicStats getBacklog(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("destination") String destination, + @PathParam("destination") @Encoded String destination, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + destination = decode(destination); validateAdminAccessOnProperty(property); // Validate that namespace exists, throw 404 if it doesn't exist // note that we do not want to load the topic and hence skip validateAdminOperationOnDestination() @@ -999,6 +951,83 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop return offlineTopicStats; } + public PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace, + String destination, boolean authoritative) { + DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); + validateClusterOwnership(dn.getCluster()); + + try { + checkConnect(dn); + } catch (RestException e) { + validateAdminAccessOnProperty(dn.getProperty()); + } + + String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(), + dn.getEncodedLocalName()); + PartitionedTopicMetadata partitionMetadata; + try { + // gets the number of partitions from the zk cache + partitionMetadata = globalZkCache().getData(path, new Deserializer() { + + @Override + public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception { + return jsonMapper().readValue(content, PartitionedTopicMetadata.class); + } + }); + } catch (Exception e) { + // if the partitioned topic is not found in zk cache, it returns zero + partitionMetadata = new PartitionedTopicMetadata(); + } + + if (log.isDebugEnabled()) { + log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), dn, + partitionMetadata.partitions); + } + return partitionMetadata; + } + + public void expireMessages(String property, String cluster, String namespace, String destination, String subName, + int expireTimeInSeconds, boolean authoritative) { + DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, + destination, authoritative); + if (partitionMetadata.partitions > 0) { + // expire messages for each partition destination + try { + for (int i = 0; i < partitionMetadata.partitions; i++) { + pulsar().getAdminClient().persistentTopics().expireMessages(dn.getPartition(i).toString(), subName, + expireTimeInSeconds); + } + } catch (Exception e) { + throw new RestException(e); + } + } else { + // validate ownership and redirect if current broker is not owner + validateAdminOperationOnDestination(dn, authoritative); + PersistentTopic topic = getTopicReference(dn); + try { + if (subName.startsWith(topic.replicatorPrefix)) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + PersistentReplicator repl = topic.getPersistentReplicator(remoteCluster); + checkNotNull(repl); + repl.expireMessages(expireTimeInSeconds); + } else { + PersistentSubscription sub = topic.getPersistentSubscription(subName); + checkNotNull(sub); + sub.expireMessages(expireTimeInSeconds); + } + log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, dn, + subName); + } catch (NullPointerException npe) { + throw new RestException(Status.NOT_FOUND, "Subscription not found"); + } catch (Exception exception) { + log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", clientAppId(), + expireTimeInSeconds, dn, subName, exception); + throw new RestException(exception); + } + } + } + /** * Get the Topic object reference from the Pulsar broker */ diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java index d93651d9a8945..2e0cc41e5c1b2 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java @@ -21,6 +21,7 @@ import java.net.URISyntaxException; import javax.ws.rs.DefaultValue; +import javax.ws.rs.Encoded; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -36,6 +37,7 @@ import com.yahoo.pulsar.common.lookup.data.LookupData; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.broker.web.NoSwaggerDocumentation; +import com.yahoo.pulsar.common.util.Codec; @Path("/v2/destination/") @NoSwaggerDocumentation @@ -45,8 +47,9 @@ public class DestinationLookup extends LookupResource { @Path("persistent/{property}/{cluster}/{namespace}/{dest}") @Produces(MediaType.APPLICATION_JSON) public LookupData lookupDestination(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("dest") String dest, + @PathParam("namespace") String namespace, @PathParam("dest") @Encoded String dest, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) throws URISyntaxException { + dest = Codec.decode(dest); DestinationName fqdn = DestinationName.get("persistent", property, cluster, namespace, dest); return lookupFQDN(fqdn, authoritative); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java index dbf2770cb9145..043814bdfc6ee 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.lang.reflect.Field; import java.net.URL; import java.util.ArrayList; import java.util.EnumSet; @@ -30,6 +31,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.client.WebTarget; import org.apache.bookkeeper.test.PortManager; import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; @@ -59,6 +62,8 @@ import com.yahoo.pulsar.client.admin.PulsarAdminException.NotFoundException; import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import com.yahoo.pulsar.client.admin.internal.PropertiesImpl; +import com.yahoo.pulsar.client.admin.internal.LookupImpl; +import com.yahoo.pulsar.client.admin.internal.PersistentTopicsImpl; import com.yahoo.pulsar.client.api.Authentication; import com.yahoo.pulsar.client.api.ClientConfiguration; import com.yahoo.pulsar.client.api.Consumer; @@ -69,11 +74,13 @@ import com.yahoo.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import com.yahoo.pulsar.client.api.PulsarClient; import com.yahoo.pulsar.client.api.SubscriptionType; +import com.yahoo.pulsar.common.lookup.data.LookupData; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.naming.NamespaceBundle; import com.yahoo.pulsar.common.naming.NamespaceBundleFactory; import com.yahoo.pulsar.common.naming.NamespaceBundles; import com.yahoo.pulsar.common.naming.NamespaceName; +import com.yahoo.pulsar.common.partition.PartitionedTopicMetadata; import com.yahoo.pulsar.common.policies.data.AuthAction; import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyData; import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyType; @@ -91,6 +98,7 @@ import com.yahoo.pulsar.common.policies.data.Policies; import com.yahoo.pulsar.common.policies.data.PropertyAdmin; import com.yahoo.pulsar.common.policies.data.RetentionPolicies; +import com.yahoo.pulsar.common.util.Codec; import com.yahoo.pulsar.common.util.ObjectMapperFactory; public class AdminApiTest extends MockedPulsarServiceBaseTest { @@ -153,6 +161,11 @@ public static Object[][] bundling() { return new Object[][] { { 0 }, { 4 } }; } + @DataProvider(name = "topicName") + public Object[][] topicNamesProvider() { + return new Object[][] { { "topic_+&*%{}() \\/$@#^%" }, { "simple-topicName" } }; + } + @Test public void clusters() throws Exception { admin.clusters().createCluster("usw", @@ -488,14 +501,15 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc // otheradmin.namespaces().unload("prop-xyz/use/ns2"); } - @Test(enabled = true) - public void persistentTopics() throws Exception { + @Test(dataProvider = "topicName") + public void persistentTopics(String topicName) throws Exception { assertEquals(admin.persistentTopics().getList("prop-xyz/use/ns1"), Lists.newArrayList()); + final String persistentTopicName = "persistent://prop-xyz/use/ns1/" + topicName; // Force to create a destination - publishMessagesOnPersistentTopic("persistent://prop-xyz/use/ns1/ds2", 0); + publishMessagesOnPersistentTopic("persistent://prop-xyz/use/ns1/" + topicName, 0); assertEquals(admin.persistentTopics().getList("prop-xyz/use/ns1"), - Lists.newArrayList("persistent://prop-xyz/use/ns1/ds2")); + Lists.newArrayList("persistent://prop-xyz/use/ns1/" + topicName)); // create consumer and subscription URL pulsarUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT); @@ -504,80 +518,75 @@ public void persistentTopics() throws Exception { PulsarClient client = PulsarClient.create(pulsarUrl.toString(), clientConf); ConsumerConfiguration conf = new ConsumerConfiguration(); conf.setSubscriptionType(SubscriptionType.Exclusive); - Consumer consumer = client.subscribe("persistent://prop-xyz/use/ns1/ds2", "my-sub", conf); + Consumer consumer = client.subscribe(persistentTopicName, "my-sub", conf); - assertEquals(admin.persistentTopics().getSubscriptions("persistent://prop-xyz/use/ns1/ds2"), - Lists.newArrayList("my-sub")); + assertEquals(admin.persistentTopics().getSubscriptions(persistentTopicName), Lists.newArrayList("my-sub")); - publishMessagesOnPersistentTopic("persistent://prop-xyz/use/ns1/ds2", 10); + publishMessagesOnPersistentTopic("persistent://prop-xyz/use/ns1/" + topicName, 10); - PersistentTopicStats topicStats = admin.persistentTopics().getStats("persistent://prop-xyz/use/ns1/ds2"); + PersistentTopicStats topicStats = admin.persistentTopics().getStats(persistentTopicName); assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub"))); assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1); assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 10); assertEquals(topicStats.publishers.size(), 0); - PersistentTopicInternalStats internalStats = admin.persistentTopics() - .getInternalStats("persistent://prop-xyz/use/ns1/ds2"); + PersistentTopicInternalStats internalStats = admin.persistentTopics().getInternalStats(persistentTopicName); assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub"))); - List messages = admin.persistentTopics().peekMessages("persistent://prop-xyz/use/ns1/ds2", "my-sub", - 3); + List messages = admin.persistentTopics().peekMessages(persistentTopicName, "my-sub", 3); assertEquals(messages.size(), 3); for (int i = 0; i < 3; i++) { String expectedMessage = "message-" + i; assertEquals(messages.get(i).getData(), expectedMessage.getBytes()); } - messages = admin.persistentTopics().peekMessages("persistent://prop-xyz/use/ns1/ds2", "my-sub", 15); + messages = admin.persistentTopics().peekMessages(persistentTopicName, "my-sub", 15); assertEquals(messages.size(), 10); for (int i = 0; i < 10; i++) { String expectedMessage = "message-" + i; assertEquals(messages.get(i).getData(), expectedMessage.getBytes()); } - admin.persistentTopics().skipMessages("persistent://prop-xyz/use/ns1/ds2", "my-sub", 5); - topicStats = admin.persistentTopics().getStats("persistent://prop-xyz/use/ns1/ds2"); + admin.persistentTopics().skipMessages(persistentTopicName, "my-sub", 5); + topicStats = admin.persistentTopics().getStats(persistentTopicName); assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 5); - admin.persistentTopics().skipAllMessages("persistent://prop-xyz/use/ns1/ds2", "my-sub"); - topicStats = admin.persistentTopics().getStats("persistent://prop-xyz/use/ns1/ds2"); + admin.persistentTopics().skipAllMessages(persistentTopicName, "my-sub"); + topicStats = admin.persistentTopics().getStats(persistentTopicName); assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 0); consumer.close(); client.close(); - admin.persistentTopics().deleteSubscription("persistent://prop-xyz/use/ns1/ds2", "my-sub"); + admin.persistentTopics().deleteSubscription(persistentTopicName, "my-sub"); - assertEquals(admin.persistentTopics().getSubscriptions("persistent://prop-xyz/use/ns1/ds2"), - Lists.newArrayList()); - topicStats = admin.persistentTopics().getStats("persistent://prop-xyz/use/ns1/ds2"); + assertEquals(admin.persistentTopics().getSubscriptions(persistentTopicName), Lists.newArrayList()); + topicStats = admin.persistentTopics().getStats(persistentTopicName); assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet()); assertEquals(topicStats.publishers.size(), 0); try { - admin.persistentTopics().skipAllMessages("persistent://prop-xyz/use/ns1/ds2", "my-sub"); + admin.persistentTopics().skipAllMessages(persistentTopicName, "my-sub"); } catch (NotFoundException e) { } - admin.persistentTopics().delete("persistent://prop-xyz/use/ns1/ds2"); + admin.persistentTopics().delete(persistentTopicName); try { - admin.persistentTopics().delete("persistent://prop-xyz/use/ns1/ds1"); + admin.persistentTopics().delete(persistentTopicName); fail("Should have received 404"); } catch (NotFoundException e) { } assertEquals(admin.persistentTopics().getList("prop-xyz/use/ns1"), Lists.newArrayList()); } + + @Test(dataProvider = "topicName") + public void partitionedTopics(String topicName) throws Exception { + final String partitionedTopicName = "persistent://prop-xyz/use/ns1/" + topicName; + admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 4); - @Test - public void partitionedTopics() throws Exception { - admin.persistentTopics().createPartitionedTopic("persistent://prop-xyz/use/ns1/ds1", 4); - - assertEquals( - admin.persistentTopics().getPartitionedTopicMetadata("persistent://prop-xyz/use/ns1/ds1").partitions, - 4); + assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 4); // check if the virtual topic doesn't get created List destinations = admin.persistentTopics().getList("prop-xyz/use/ns1"); @@ -594,37 +603,34 @@ public void partitionedTopics() throws Exception { PulsarClient client = PulsarClient.create(pulsarUrl.toString(), clientConf); ConsumerConfiguration conf = new ConsumerConfiguration(); conf.setSubscriptionType(SubscriptionType.Exclusive); - Consumer consumer = client.subscribe("persistent://prop-xyz/use/ns1/ds1", "my-sub", conf); + Consumer consumer = client.subscribe(partitionedTopicName, "my-sub", conf); - assertEquals(admin.persistentTopics().getSubscriptions("persistent://prop-xyz/use/ns1/ds1"), - Lists.newArrayList("my-sub")); + assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub")); - Consumer consumer1 = client.subscribe("persistent://prop-xyz/use/ns1/ds1", "my-sub-1", conf); + Consumer consumer1 = client.subscribe(partitionedTopicName, "my-sub-1", conf); - assertEquals(Sets.newHashSet(admin.persistentTopics().getSubscriptions("persistent://prop-xyz/use/ns1/ds1")), + assertEquals(Sets.newHashSet(admin.persistentTopics().getSubscriptions(partitionedTopicName)), Sets.newHashSet("my-sub", "my-sub-1")); consumer1.close(); - admin.persistentTopics().deleteSubscription("persistent://prop-xyz/use/ns1/ds1", "my-sub-1"); - assertEquals(admin.persistentTopics().getSubscriptions("persistent://prop-xyz/use/ns1/ds1"), - Lists.newArrayList("my-sub")); + admin.persistentTopics().deleteSubscription(partitionedTopicName, "my-sub-1"); + assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub")); ProducerConfiguration prodConf = new ProducerConfiguration(); prodConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - Producer producer = client.createProducer("persistent://prop-xyz/use/ns1/ds1", prodConf); + Producer producer = client.createProducer(partitionedTopicName, prodConf); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); } - assertEquals(Sets.newHashSet(admin.persistentTopics().getList("prop-xyz/use/ns1")), Sets.newHashSet( - "persistent://prop-xyz/use/ns1/ds1-partition-0", "persistent://prop-xyz/use/ns1/ds1-partition-1", - "persistent://prop-xyz/use/ns1/ds1-partition-2", "persistent://prop-xyz/use/ns1/ds1-partition-3")); + assertEquals(Sets.newHashSet(admin.persistentTopics().getList("prop-xyz/use/ns1")), + Sets.newHashSet(partitionedTopicName + "-partition-0", partitionedTopicName + "-partition-1", + partitionedTopicName + "-partition-2", partitionedTopicName + "-partition-3")); // test cumulative stats for partitioned topic - PartitionedTopicStats topicStats = admin.persistentTopics() - .getPartitionedStats("persistent://prop-xyz/use/ns1/ds1", false); + PartitionedTopicStats topicStats = admin.persistentTopics().getPartitionedStats(partitionedTopicName, false); assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub"))); assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1); assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 10); @@ -632,49 +638,47 @@ public void partitionedTopics() throws Exception { assertEquals(topicStats.partitions, Maps.newHashMap()); // test per partition stats for partitioned topic - topicStats = admin.persistentTopics().getPartitionedStats("persistent://prop-xyz/use/ns1/ds1", true); + topicStats = admin.persistentTopics().getPartitionedStats(partitionedTopicName, true); assertEquals(topicStats.metadata.partitions, 4); - assertEquals(topicStats.partitions.keySet(), Sets.newHashSet("persistent://prop-xyz/use/ns1/ds1-partition-0", - "persistent://prop-xyz/use/ns1/ds1-partition-1", "persistent://prop-xyz/use/ns1/ds1-partition-2", - "persistent://prop-xyz/use/ns1/ds1-partition-3")); - PersistentTopicStats partitionStats = topicStats.partitions - .get("persistent://prop-xyz/use/ns1/ds1-partition-0"); + assertEquals(topicStats.partitions.keySet(), + Sets.newHashSet(partitionedTopicName + "-partition-0", partitionedTopicName + "-partition-1", + partitionedTopicName + "-partition-2", partitionedTopicName + "-partition-3")); + PersistentTopicStats partitionStats = topicStats.partitions.get(partitionedTopicName + "-partition-0"); assertEquals(partitionStats.publishers.size(), 1); assertEquals(partitionStats.subscriptions.get("my-sub").consumers.size(), 1); assertEquals(partitionStats.subscriptions.get("my-sub").msgBacklog, 3, 1); try { - admin.persistentTopics().skipMessages("persistent://prop-xyz/use/ns1/ds1", "my-sub", 5); + admin.persistentTopics().skipMessages(partitionedTopicName, "my-sub", 5); fail("skip messages for partitioned topics should fail"); } catch (Exception e) { // ok } - admin.persistentTopics().skipAllMessages("persistent://prop-xyz/use/ns1/ds1", "my-sub"); - topicStats = admin.persistentTopics().getPartitionedStats("persistent://prop-xyz/use/ns1/ds1", false); + admin.persistentTopics().skipAllMessages(partitionedTopicName, "my-sub"); + topicStats = admin.persistentTopics().getPartitionedStats(partitionedTopicName, false); assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 0); producer.close(); consumer.close(); - admin.persistentTopics().deleteSubscription("persistent://prop-xyz/use/ns1/ds1", "my-sub"); + admin.persistentTopics().deleteSubscription(partitionedTopicName, "my-sub"); - assertEquals(admin.persistentTopics().getSubscriptions("persistent://prop-xyz/use/ns1/ds1"), - Lists.newArrayList()); + assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), Lists.newArrayList()); try { - admin.persistentTopics().createPartitionedTopic("persistent://prop-xyz/use/ns1/ds1", 32); + admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 32); fail("Should have failed as the partitioned topic already exists"); } catch (ConflictException ce) { } - producer = client.createProducer("persistent://prop-xyz/use/ns1/ds1"); + producer = client.createProducer(partitionedTopicName); destinations = admin.persistentTopics().getList("prop-xyz/use/ns1"); assertEquals(destinations.size(), 4); try { - admin.persistentTopics().deletePartitionedTopic("persistent://prop-xyz/use/ns1/ds1"); + admin.persistentTopics().deletePartitionedTopic(partitionedTopicName); fail("The topic is busy"); } catch (PreconditionFailedException pfe) { // ok @@ -683,17 +687,13 @@ public void partitionedTopics() throws Exception { producer.close(); client.close(); - admin.persistentTopics().deletePartitionedTopic("persistent://prop-xyz/use/ns1/ds1"); + admin.persistentTopics().deletePartitionedTopic(partitionedTopicName); - assertEquals( - admin.persistentTopics().getPartitionedTopicMetadata("persistent://prop-xyz/use/ns1/ds1").partitions, - 0); + assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 0); - admin.persistentTopics().createPartitionedTopic("persistent://prop-xyz/use/ns1/ds1", 32); + admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 32); - assertEquals( - admin.persistentTopics().getPartitionedTopicMetadata("persistent://prop-xyz/use/ns1/ds1").partitions, - 32); + assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 32); try { admin.persistentTopics().deletePartitionedTopic("persistent://prop-xyz/use/ns1/ds2"); @@ -701,13 +701,13 @@ public void partitionedTopics() throws Exception { } catch (NotFoundException nfe) { } - admin.persistentTopics().deletePartitionedTopic("persistent://prop-xyz/use/ns1/ds1"); + admin.persistentTopics().deletePartitionedTopic(partitionedTopicName); // delete a partitioned topic in a global namespace - admin.persistentTopics().createPartitionedTopic("persistent://prop-xyz/global/ns1/ds1", 4); - admin.persistentTopics().deletePartitionedTopic("persistent://prop-xyz/global/ns1/ds1"); + admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 4); + admin.persistentTopics().deletePartitionedTopic(partitionedTopicName); } - + @Test(dataProvider = "numBundles") public void testDeleteNamespaceBundle(Integer numBundles) throws Exception { admin.namespaces().deleteNamespace("prop-xyz/use/ns1"); @@ -1085,13 +1085,13 @@ public void testBackwardCompatiblity() throws Exception { assertEquals(admin.properties().getProperties(), Lists.newArrayList()); } - @Test - public void persistentTopicsCursorReset() throws Exception { + @Test(dataProvider = "topicName") + public void persistentTopicsCursorReset(String topicName) throws Exception { admin.namespaces().setRetention("prop-xyz/use/ns1", new RetentionPolicies(10, 10)); assertEquals(admin.persistentTopics().getList("prop-xyz/use/ns1"), Lists.newArrayList()); - String topicName = "persistent://prop-xyz/use/ns1/cursorreset"; + topicName = "persistent://prop-xyz/use/ns1/" + topicName; // create consumer and subscription ConsumerConfiguration conf = new ConsumerConfiguration(); @@ -1149,12 +1149,12 @@ public void persistentTopicsCursorReset() throws Exception { admin.persistentTopics().delete(topicName); } - @Test - public void persistentTopicsCursorResetAfterReset() throws Exception { + @Test(dataProvider = "topicName") + public void persistentTopicsCursorResetAfterReset(String topicName) throws Exception { admin.namespaces().setRetention("prop-xyz/use/ns1", new RetentionPolicies(10, 10)); assertEquals(admin.persistentTopics().getList("prop-xyz/use/ns1"), Lists.newArrayList()); - String topicName = "persistent://prop-xyz/use/ns1/cursorresetafterreset"; + topicName = "persistent://prop-xyz/use/ns1/" + topicName; // create consumer and subscription ConsumerConfiguration conf = new ConsumerConfiguration(); @@ -1234,10 +1234,10 @@ public void persistentTopicsCursorResetAfterReset() throws Exception { admin.persistentTopics().delete(topicName); } - @Test - public void partitionedTopicsCursorReset() throws Exception { + @Test(dataProvider = "topicName") + public void partitionedTopicsCursorReset(String topicName) throws Exception { admin.namespaces().setRetention("prop-xyz/use/ns1", new RetentionPolicies(10, 10)); - String topicName = "persistent://prop-xyz/use/ns1/partitionedcursorreset"; + topicName = "persistent://prop-xyz/use/ns1/" + topicName; admin.persistentTopics().createPartitionedTopic(topicName, 4); @@ -1485,4 +1485,109 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception{ } + /** + * This test-case verifies that broker should support both url/uri encoding for topic-name. It calls below api with + * url-encoded and also uri-encoded topic-name in http request: a. PartitionedMetadataLookup b. TopicLookup c. Topic + * Stats + * + * @param topicName + * @throws Exception + */ + @Test(dataProvider = "topicName") + public void testPulsarAdminForUriAndUrlEncoding(String topicName) throws Exception { + final String ns1 = "prop-xyz/use/ns1"; + final String dn1 = "persistent://" + ns1 + "/" + topicName; + final String urlEncodedTopic = Codec.encode(topicName); + final String uriEncodedTopic = urlEncodedTopic.replaceAll("\\+", "%20"); + final int numOfPartitions = 4; + admin.persistentTopics().createPartitionedTopic(dn1, numOfPartitions); + // Create a consumer to get stats on this topic + Consumer consumer1 = pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration()); + + PersistentTopicsImpl persistent = (PersistentTopicsImpl) admin.persistentTopics(); + Field field = PersistentTopicsImpl.class.getDeclaredField("persistentTopics"); + field.setAccessible(true); + WebTarget persistentTopics = (WebTarget) field.get(persistent); + + // (1) Get PartitionedMetadata : with Url and Uri encoding + final CompletableFuture urlEncodedPartitionedMetadata = new CompletableFuture<>(); + // (a) Url encoding + persistent.asyncGetRequest(persistentTopics.path(ns1).path(urlEncodedTopic).path("partitions"), + new InvocationCallback() { + @Override + public void completed(PartitionedTopicMetadata response) { + urlEncodedPartitionedMetadata.complete(response); + } + + @Override + public void failed(Throwable e) { + Assert.fail(e.getMessage()); + } + }); + final CompletableFuture uriEncodedPartitionedMetadata = new CompletableFuture<>(); + // (b) Uri encoding + persistent.asyncGetRequest(persistentTopics.path(ns1).path(uriEncodedTopic).path("partitions"), + new InvocationCallback() { + @Override + public void completed(PartitionedTopicMetadata response) { + uriEncodedPartitionedMetadata.complete(response); + } + + @Override + public void failed(Throwable e) { + uriEncodedPartitionedMetadata.completeExceptionally(e); + } + }); + assertEquals(urlEncodedPartitionedMetadata.get().partitions, numOfPartitions); + assertEquals(urlEncodedPartitionedMetadata.get().partitions, (uriEncodedPartitionedMetadata.get().partitions)); + + // (2) Get Topic Lookup + LookupImpl lookup = (LookupImpl) admin.lookups(); + Field field2 = LookupImpl.class.getDeclaredField("v2lookup"); + field2.setAccessible(true); + WebTarget target2 = (WebTarget) field2.get(lookup); + // (a) Url encoding + LookupData urlEncodedLookupData = lookup + .request(target2.path("/destination/persistent").path(ns1 + "/" + urlEncodedTopic)) + .get(LookupData.class); + // (b) Uri encoding + LookupData uriEncodedLookupData = lookup + .request(target2.path("/destination/persistent").path(ns1 + "/" + uriEncodedTopic)) + .get(LookupData.class); + Assert.assertNotNull(urlEncodedLookupData.getBrokerUrl()); + assertEquals(urlEncodedLookupData.getBrokerUrl(), uriEncodedLookupData.getBrokerUrl()); + + // (3) Get Topic Stats + final CompletableFuture urlStats = new CompletableFuture<>(); + // (a) Url encoding + persistent.asyncGetRequest(persistentTopics.path(ns1).path(urlEncodedTopic + "-partition-1").path("stats"), + new InvocationCallback() { + @Override + public void completed(PersistentTopicStats response) { + urlStats.complete(response); + } + + @Override + public void failed(Throwable e) { + urlStats.completeExceptionally(e); + } + }); + // (b) Uri encoding + final CompletableFuture uriStats = new CompletableFuture<>(); + persistent.asyncGetRequest(persistentTopics.path(ns1).path(uriEncodedTopic + "-partition-1").path("stats"), + new InvocationCallback() { + @Override + public void completed(PersistentTopicStats response) { + uriStats.complete(response); + } + + @Override + public void failed(Throwable e) { + uriStats.completeExceptionally(e); + } + }); + assertEquals(urlStats.get().subscriptions.size(), 1); + assertEquals(uriStats.get().subscriptions.size(), 1); + } + } diff --git a/pulsar-common/src/test/java/com/yahoo/pulsar/common/lookup/data/LookupDataTest.java b/pulsar-common/src/test/java/com/yahoo/pulsar/common/lookup/data/LookupDataTest.java index 5be68f9edd4a9..63c89aa663ee9 100644 --- a/pulsar-common/src/test/java/com/yahoo/pulsar/common/lookup/data/LookupDataTest.java +++ b/pulsar-common/src/test/java/com/yahoo/pulsar/common/lookup/data/LookupDataTest.java @@ -19,10 +19,10 @@ import java.util.Map; -import com.fasterxml.jackson.databind.ObjectMapper; import org.testng.annotations.Test; -import com.yahoo.pulsar.common.lookup.data.LookupData; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yahoo.pulsar.common.util.Codec; import com.yahoo.pulsar.common.util.ObjectMapperFactory; @Test @@ -50,4 +50,14 @@ void serializeToJsonTest() throws Exception { assertEquals(jsonMap.get("nativeUrl"), "pulsar://localhost:8888"); assertEquals(jsonMap.get("httpUrl"), "http://localhost:8080"); } + + @Test + void testUrlEncoder() { + final String str = "specialCharacters_+&*%{}() \\/$@#^%"; + final String urlEncoded = Codec.encode(str); + final String uriEncoded = urlEncoded.replaceAll("//+", "%20"); + assertEquals("specialCharacters_%2B%26*%25%7B%7D%28%29+%5C%2F%24%40%23%5E%25", urlEncoded); + assertEquals(str, Codec.decode(urlEncoded)); + assertEquals(Codec.decode(urlEncoded), Codec.decode(uriEncoded)); + } }