diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 2f3312c4ae224..7603e0c2e437a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1560,6 +1560,33 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn }); } + private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse, + String subName, Map subscriptionProperties, + boolean authoritative) { + validateTopicOwnershipAsync(topicName, authoritative) + .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME)) + .thenCompose(__ -> { + Topic topic = getTopicReference(topicName); + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + throw new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName)); + } + return sub.updateSubscriptionProperties(subscriptionProperties); + }).thenRun(() -> { + log.info("[{}][{}] Updated subscription {}", clientAppId(), topicName, subName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause); + } + asyncResponse.resume(new RestException(cause)); + return null; + }); + } + protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { CompletableFuture future; @@ -2289,6 +2316,85 @@ private void internalCreateSubscriptionForNonPartitionedTopic( }); } + protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName, + Map subscriptionProperties, + boolean authoritative) { + CompletableFuture future; + if (topicName.isGlobal()) { + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); + } + + future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { + if (topicName.isPartitioned()) { + internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, + subscriptionProperties, authoritative); + } else { + getPartitionedTopicMetadataAsync(topicName, + authoritative, false).thenAcceptAsync(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); + + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics() + .updateSubscriptionPropertiesAsync(topicNamePartition.toString(), + subName, subscriptionProperties)); + } catch (Exception e) { + log.error("[{}] Failed to update properties for subscription {} {}", + clientAppId(), topicNamePartition, subName, + e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); + return null; + } else if (t instanceof PreconditionFailedException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Subscription has active connected consumers")); + return null; + } else { + log.error("[{}] Failed to update properties for subscription {} {}", + clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, + subscriptionProperties, authoritative); + } + }, pulsar().getExecutor()).exceptionally(ex -> { + log.error("[{}] Failed to update properties for subscription {} from topic {}", + clientAppId(), subName, topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + }).exceptionally(ex -> { + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + log.error("[{}] Failed to update subscription {} from topic {}", + clientAppId(), subName, topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative, MessageIdImpl messageId, boolean isExcluded, int batchIndex) { CompletableFuture ret; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 4cd405d3e9a49..3261e84b505a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1527,6 +1527,43 @@ public void resetCursor( } } + @PUT + @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties") + @ApiOperation(value = "Replaces all the properties on the given subscription") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), + @ApiResponse(code = 405, message = "Method Not Allowed"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") + }) + public void updateSubscriptionProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Subscription to update", required = true) + @PathParam("subName") String encodedSubName, + @ApiParam(value = "The new properties") Map subscriptionProperties, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + try { + validateTopicName(tenant, namespace, encodedTopic); + internalUpdateSubscriptionProperties(asyncResponse, decode(encodedSubName), + subscriptionProperties, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + @POST @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor") @ApiOperation(value = "Reset subscription to message position closest to given position.", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java index 2c37235ebdbd8..67416b4288e7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java @@ -174,5 +174,50 @@ public void testCreateSubscriptionWithProperties(boolean partitioned) throws Exc assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty()); } + // clear the properties on subscriptionName + admin.topics().updateSubscriptionProperties(topic, subscriptionName, new HashMap<>()); + + if (partitioned) { + PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic); + for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { + SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) + .getSubscriptions().get(subscriptionName); + assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + } + + // aggregated properties + SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false) + .getSubscriptions().get(subscriptionName); + assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + + } else { + SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + } + + // update the properties on subscriptionName + admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties); + + if (partitioned) { + PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic); + for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { + SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) + .getSubscriptions().get(subscriptionName); + assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + } + + // aggregated properties + SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false) + .getSubscriptions().get(subscriptionName); + assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + + } else { + SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + + SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2); + assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty()); + } + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 48ef03cd67e9c..d07c67e11ac92 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1812,6 +1812,17 @@ CompletableFuture createSubscriptionAsync(String topic, String subscriptio */ void resetCursor(String topic, String subName, MessageId messageId, boolean isExcluded) throws PulsarAdminException; + /** + * Update Subscription Properties on a topic subscription. + * The new properties will override the existing values, properties that are not passed will be removed. + * @param topic + * @param subName + * @param subscriptionProperties + * @throws PulsarAdminException + */ + void updateSubscriptionProperties(String topic, String subName, Map subscriptionProperties) + throws PulsarAdminException; + /** * Reset cursor position on a topic subscription. * @@ -1836,6 +1847,16 @@ CompletableFuture createSubscriptionAsync(String topic, String subscriptio */ CompletableFuture resetCursorAsync(String topic, String subName, MessageId messageId, boolean isExcluded); + /** + * Update Subscription Properties on a topic subscription. + * The new properties will override the existing values, properties that are not passed will be removed. + * @param topic + * @param subName + * @param subscriptionProperties + */ + CompletableFuture updateSubscriptionPropertiesAsync(String topic, String subName, + Map subscriptionProperties); + /** * Reset cursor position on a topic subscription. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 5314872fc00a9..d3682a153178f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1181,6 +1181,25 @@ public void resetCursor(String topic, String subName, MessageId messageId) throw sync(() -> resetCursorAsync(topic, subName, messageId)); } + @Override + public void updateSubscriptionProperties(String topic, String subName, Map subscriptionProperties) + throws PulsarAdminException { + sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties)); + } + + @Override + public CompletableFuture updateSubscriptionPropertiesAsync(String topic, String subName, + Map subscriptionProperties) { + TopicName tn = validateTopic(topic); + String encodedSubName = Codec.encode(subName); + WebTarget path = topicPath(tn, "subscription", encodedSubName, + "properties"); + if (subscriptionProperties == null) { + subscriptionProperties = new HashMap<>(); + } + return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON)); + } + @Override public void resetCursor(String topic, String subName, MessageId messageId , boolean isExcluded) throws PulsarAdminException { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 2b46d2ab76042..324c18deb38c6 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1490,6 +1490,17 @@ public void topics() throws Exception { cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r")); verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true, null); + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear")); + verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>()); + + cmdTopics = new CmdTopics(() -> admin); + props = new HashMap<>(); + props.put("a", "b"); + props.put("c", "d"); + cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 -p a=b -p c=d")); + verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", props); + cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 0d27d74b86aef..fb78679c5f0b1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -33,6 +33,7 @@ import io.netty.buffer.Unpooled; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -96,6 +97,7 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("subscriptions", new ListSubscriptions()); jcommander.addCommand("unsubscribe", new DeleteSubscription()); jcommander.addCommand("create-subscription", new CreateSubscription()); + jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties()); jcommander.addCommand("stats", new GetStats()); jcommander.addCommand("stats-internal", new GetInternalStats()); @@ -956,6 +958,41 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Update the properties of a subscription on a topic") + private class UpdateSubscriptionProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-s", + "--subscription" }, description = "Subscription to update", required = true) + private String subscriptionName; + + @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)", + required = false) + private java.util.List properties; + + @Parameter(names = {"--clear", "-c"}, description = "Remove all properties", + required = false) + private boolean clear; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + Map map = parseListKeyValueMap(properties); + if (map == null) { + map = Collections.emptyMap(); + } + if ((map.isEmpty()) && !clear) { + throw new ParameterException("If you want to clear the properties you have to use --clear"); + } + if (clear && !map.isEmpty()) { + throw new ParameterException("If you set --clear then you should not pass any properties"); + } + getTopics().updateSubscriptionProperties(topic, subscriptionName, map); + } + } + + @Parameters(commandDescription = "Reset position for subscription to a position that is closest to " + "timestamp or messageId.") private class ResetCursor extends CliCommand { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java index 66baadf590c26..6c00314a7e545 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java @@ -174,7 +174,7 @@ public void testCreateSubscriptionCommand() throws Exception { } @Test - public void testCreateSubscriptionWithPropertiesCommand() throws Exception { + public void testCreateUpdateSubscriptionWithPropertiesCommand() throws Exception { String topic = "testCreateSubscriptionCommmand"; String subscriptionPrefix = "subscription-"; @@ -194,6 +194,29 @@ public void testCreateSubscriptionWithPropertiesCommand() throws Exception { "" + subscriptionPrefix + i ); result.assertNoOutput(); + + ContainerExecResult resultUpdate = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "update-subscription-properties", + "-p", + "a=e", + "persistent://public/default/" + topic, + "--subscription", + "" + subscriptionPrefix + i + ); + resultUpdate.assertNoOutput(); + + ContainerExecResult resultClear = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "update-subscription-properties", + "-c", + "persistent://public/default/" + topic, + "--subscription", + "" + subscriptionPrefix + i + ); + resultClear.assertNoOutput(); i++; } }