From f9496396e29b95fe5dded98d445708c3f307ee8d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 21 Dec 2022 11:24:34 +0200 Subject: [PATCH] [improve][broker] Use validateTopicOperationAsync methods in all cases - remove the unused validateTopicOperation methods --- .../admin/impl/PersistentTopicsBase.java | 29 +++++++++---------- .../pulsar/broker/lookup/TopicLookupBase.java | 9 ------ .../pulsar/broker/web/PulsarWebResource.java | 8 ----- 3 files changed, 14 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d9b0b9b3e2cdb..a3cfecc075066 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1657,7 +1657,7 @@ private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncRes String subName, Map subscriptionProperties, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenCompose(topic -> { Subscription sub = topic.getSubscription(subName); @@ -1684,7 +1684,7 @@ private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncRespon String subName, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenApply((Topic topic) -> { Subscription sub = topic.getSubscription(subName); @@ -1787,7 +1787,7 @@ protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNSUBSCRIBE, subName)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenCompose(topic -> { Subscription sub = topic.getSubscription(subName); @@ -2339,18 +2339,17 @@ private void internalCreateSubscriptionForNonPartitionedTopic( Map properties) { validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> { - validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName); - return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) - .thenCompose(isAllowAutoTopicCreation -> pulsar().getBrokerService() - .getTopic(topicName.toString(), isAllowAutoTopicCreation)); - }).thenApply(optTopic -> { - if (optTopic.isPresent()) { - return optTopic.get(); - } else { - throw new RestException(Status.PRECONDITION_FAILED, - "Topic does not exist and cannot be auto-created"); - } + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subscriptionName)) + .thenCompose(__ -> pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)) + .thenCompose(isAllowAutoTopicCreation -> pulsar().getBrokerService() + .getTopic(topicName.toString(), isAllowAutoTopicCreation)) + .thenApply(optTopic -> { + if (optTopic.isPresent()) { + return optTopic.get(); + } else { + throw new RestException(Status.PRECONDITION_FAILED, + "Topic does not exist and cannot be auto-created"); + } }).thenCompose(topic -> { if (topic.getSubscriptions().containsKey(subscriptionName)) { throw new RestException(Status.CONFLICT, "Subscription already exists for topic"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 0acde9947226b..3b64d2a9f8393 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -137,15 +137,6 @@ protected CompletableFuture internalLookupTopicAsync(TopicName topic }); } - private void validateAdminAndClientPermission(TopicName topic) throws RestException, Exception { - try { - validateTopicOperation(topic, TopicOperation.LOOKUP); - } catch (Exception e) { - // unknown error marked as internal server error - throw new RestException(e); - } - } - protected String internalGetNamespaceBundle(TopicName topicName) { validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.GET_BUNDLE); try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 58b9436e6bf5c..8f238614fe3d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1226,14 +1226,6 @@ && pulsar().getBrokerService().isAuthorizationEnabled()) { return CompletableFuture.completedFuture(null); } - public void validateTopicOperation(TopicName topicName, TopicOperation operation) { - validateTopicOperation(topicName, operation, null); - } - - public void validateTopicOperation(TopicName topicName, TopicOperation operation, String subscription) { - sync(()-> validateTopicOperationAsync(topicName, operation, subscription)); - } - public CompletableFuture validateTopicOperationAsync(TopicName topicName, TopicOperation operation) { return validateTopicOperationAsync(topicName, operation, null); }