Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-105 add support for updating the Subscription properties #15751

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,33 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
});
}

private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, Map<String, String> subscriptionProperties,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.updateSubscriptionProperties(subscriptionProperties);
}).thenRun(() -> {
log.info("[{}][{}] Updated subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
return null;
});
}

protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
CompletableFuture<Void> future;
Expand Down Expand Up @@ -2289,6 +2316,85 @@ private void internalCreateSubscriptionForNonPartitionedTopic(
});
}

protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName,
Map<String, String> subscriptionProperties,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}

future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
subscriptionProperties, authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAcceptAsync(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.updateSubscriptionPropertiesAsync(topicNamePartition.toString(),
subName, subscriptionProperties));
} catch (Exception e) {
log.error("[{}] Failed to update properties for subscription {} {}",
clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
return null;
} else {
log.error("[{}] Failed to update properties for subscription {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}

asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
subscriptionProperties, authoritative);
}
}, pulsar().getExecutor()).exceptionally(ex -> {
log.error("[{}] Failed to update properties for subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
Comment on lines +2389 to +2392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will not get a RedirectException here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will double check, this code is basically copy/pasted from resetCursor IIRC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should come from here

throw new WebApplicationException(Response.temporaryRedirect(redirect).build());

in validateTopicOwnershipAsync

resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,43 @@ public void resetCursor(
}
}

@PUT
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
@ApiOperation(value = "Replaces all the properties on the given subscription")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Method Not Allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void updateSubscriptionProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to update", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "The new properties") Map<String, String> subscriptionProperties,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalUpdateSubscriptionProperties(asyncResponse, decode(encodedSubName),
subscriptionProperties, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
@ApiOperation(value = "Reset subscription to message position closest to given position.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,50 @@ public void testCreateSubscriptionWithProperties(boolean partitioned) throws Exc
assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
}

// clear the properties on subscriptionName
admin.topics().updateSubscriptionProperties(topic, subscriptionName, new HashMap<>());

if (partitioned) {
PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
}

// aggregated properties
SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
.getSubscriptions().get(subscriptionName);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());

} else {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
}

// update the properties on subscriptionName
admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties);

if (partitioned) {
PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
}

// aggregated properties
SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

} else {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,17 @@ CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptio
*/
void resetCursor(String topic, String subName, MessageId messageId, boolean isExcluded) throws PulsarAdminException;

/**
* Update Subscription Properties on a topic subscription.
* The new properties will override the existing values, properties that are not passed will be removed.
* @param topic
* @param subName
* @param subscriptionProperties
* @throws PulsarAdminException
*/
void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
throws PulsarAdminException;

/**
* Reset cursor position on a topic subscription.
*
Expand All @@ -1836,6 +1847,16 @@ CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptio
*/
CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId, boolean isExcluded);

/**
* Update Subscription Properties on a topic subscription.
* The new properties will override the existing values, properties that are not passed will be removed.
* @param topic
* @param subName
* @param subscriptionProperties
*/
CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
Map<String, String> subscriptionProperties);

/**
* Reset cursor position on a topic subscription.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,25 @@ public void resetCursor(String topic, String subName, MessageId messageId) throw
sync(() -> resetCursorAsync(topic, subName, messageId));
}

@Override
public void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
throws PulsarAdminException {
sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties));
}

@Override
public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
Map<String, String> subscriptionProperties) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName,
"properties");
if (subscriptionProperties == null) {
subscriptionProperties = new HashMap<>();
}
return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON));
}

@Override
public void resetCursor(String topic, String subName, MessageId messageId
, boolean isExcluded) throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,17 @@ public void topics() throws Exception {
cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r"));
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true, null);

cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());

cmdTopics = new CmdTopics(() -> admin);
props = new HashMap<>();
props.put("a", "b");
props.put("c", "d");
cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 -p a=b -p c=d"));
verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", props);

cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -96,6 +97,7 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("subscriptions", new ListSubscriptions());
jcommander.addCommand("unsubscribe", new DeleteSubscription());
jcommander.addCommand("create-subscription", new CreateSubscription());
jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties());

jcommander.addCommand("stats", new GetStats());
jcommander.addCommand("stats-internal", new GetInternalStats());
Expand Down Expand Up @@ -956,6 +958,41 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Update the properties of a subscription on a topic")
private class UpdateSubscriptionProperties extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to update", required = true)
private String subscriptionName;

@Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
required = false)
private java.util.List<String> properties;

@Parameter(names = {"--clear", "-c"}, description = "Remove all properties",
required = false)
private boolean clear;

@Override
void run() throws Exception {
String topic = validateTopicName(params);
Map<String, String> map = parseListKeyValueMap(properties);
if (map == null) {
map = Collections.emptyMap();
}
if ((map.isEmpty()) && !clear) {
throw new ParameterException("If you want to clear the properties you have to use --clear");
}
if (clear && !map.isEmpty()) {
throw new ParameterException("If you set --clear then you should not pass any properties");
}
getTopics().updateSubscriptionProperties(topic, subscriptionName, map);
}
}


@Parameters(commandDescription = "Reset position for subscription to a position that is closest to "
+ "timestamp or messageId.")
private class ResetCursor extends CliCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void testCreateSubscriptionCommand() throws Exception {
}

@Test
public void testCreateSubscriptionWithPropertiesCommand() throws Exception {
public void testCreateUpdateSubscriptionWithPropertiesCommand() throws Exception {
String topic = "testCreateSubscriptionCommmand";

String subscriptionPrefix = "subscription-";
Expand All @@ -194,6 +194,29 @@ public void testCreateSubscriptionWithPropertiesCommand() throws Exception {
"" + subscriptionPrefix + i
);
result.assertNoOutput();

ContainerExecResult resultUpdate = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"topics",
"update-subscription-properties",
"-p",
"a=e",
"persistent://public/default/" + topic,
"--subscription",
"" + subscriptionPrefix + i
);
resultUpdate.assertNoOutput();

ContainerExecResult resultClear = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"topics",
"update-subscription-properties",
"-c",
"persistent://public/default/" + topic,
"--subscription",
"" + subscriptionPrefix + i
);
resultClear.assertNoOutput();
i++;
}
}
Expand Down