From 49ae379571cc11e23ff26dde6d6631013f169518 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Mon, 18 Mar 2024 21:26:31 +0800 Subject: [PATCH 1/7] add --- .../PulsarAuthorizationProvider.java | 1 + .../pulsar/broker/admin/AdminResource.java | 7 +- .../broker/admin/impl/NamespacesBase.java | 166 ++++---- .../admin/impl/PersistentTopicsBase.java | 162 ++++---- .../broker/admin/NamespaceAuthZTest.java | 159 +++++++ .../pulsar/broker/admin/TopicAuthZTest.java | 387 ++++++++++++++++++ 6 files changed, 714 insertions(+), 168 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index acb6fce9b92e4..a39c3d0560760 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -597,6 +597,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, case COMPACT: case OFFLOAD: case UNLOAD: + case TRIM_TOPIC: case DELETE_METADATA: case UPDATE_METADATA: case ADD_BUNDLE_RANGE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 8eba6cc7b050b..618c4ca73e17a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -62,8 +62,6 @@ import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.PolicyName; -import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; @@ -714,10 +712,7 @@ private CompletableFuture provisionPartitionedTopicPath(int numPartitions, } protected CompletableFuture getSchemaCompatibilityStrategyAsync() { - return validateTopicPolicyOperationAsync(topicName, - PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, - PolicyOperation.READ) - .thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> { + return getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> { if (ex != null) { log.error("[{}] Failed to get schema compatibility strategy of topic {} {}", clientAppId(), topicName, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 5531d977d074d..37ab552b7b79d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2339,102 +2339,110 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) { } protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - updatePoliciesAsync(namespaceName, policies -> { - policies.properties.put(key, value); - return policies; - }).thenAccept(v -> { - log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key, - namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key, - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.properties.put(key, value); + return policies; + })) + .thenAccept(v -> { + log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key, + namespaceName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key, + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalSetProperties(Map properties, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - updatePoliciesAsync(namespaceName, policies -> { - policies.properties.putAll(properties); - return policies; - }).thenAccept(v -> { - log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(), - namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(), - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.properties.putAll(properties); + return policies; + })) + .thenAccept(v -> { + log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(), + namespaceName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(), + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalGetProperty(String key, AsyncResponse asyncResponse) { - getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { - asyncResponse.resume(policies.properties.get(key)); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key, - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.properties.get(key))) + .exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key, + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalGetProperties(AsyncResponse asyncResponse) { - getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { - asyncResponse.resume(policies.properties); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.properties)) + .exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - AtomicReference oldVal = new AtomicReference<>(null); - updatePoliciesAsync(namespaceName, policies -> { - oldVal.set(policies.properties.remove(key)); - return policies; - }).thenAccept(v -> { - asyncResponse.resume(oldVal.get()); - log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key, - namespaceName); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key, - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + oldVal.set(policies.properties.remove(key)); + return policies; + })).thenAccept(v -> { + asyncResponse.resume(oldVal.get()); + log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key, + namespaceName); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key, + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalClearProperties(AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); AtomicReference clearedCount = new AtomicReference<>(0); - updatePoliciesAsync(namespaceName, policies -> { - clearedCount.set(policies.properties.size()); - policies.properties.clear(); - return policies; - }).thenAccept(v -> { - asyncResponse.resume(Response.noContent().build()); - log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(), - namespaceName); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(), - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + clearedCount.set(policies.properties.size()); + policies.properties.clear(); + return policies; + })) + .thenAccept(v -> { + asyncResponse.resume(Response.noContent().build()); + log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(), + namespaceName); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(), + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } private CompletableFuture updatePoliciesAsync(NamespaceName ns, Function updateFunction) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 638a12d3b97db..73fd635471c2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -822,13 +822,13 @@ private CompletableFuture internalRemovePartitionsAuthenticationPoliciesAs protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) { log.info("[{}] Unloading topic {}", clientAppId(), topicName); - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenAccept(__ -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.UNLOAD); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenAccept(__ -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { if (isTransactionCoordinatorAssign(topicName)) { @@ -1048,13 +1048,12 @@ protected CompletableFuture internalSetDeduplicationSnapshotInterval(Integ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenCompose(topic -> topic.close(false)) .thenRun(() -> { log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName); asyncResponse.resume(Response.noContent().build()); - })) + }) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (!isNot307And404Exception(ex)) { @@ -1067,16 +1066,12 @@ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncResponse, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD) - .thenCompose(v -> pulsar() - .getTransactionMetadataStoreService() - .removeTransactionMetadataStore( + .thenCompose(v -> pulsar().getTransactionMetadataStoreService().removeTransactionMetadataStore( TransactionCoordinatorID.get(topicName.getPartitionIndex()))) - .thenRun(() -> { - log.info("[{}] Successfully unloaded tc {}", clientAppId(), - topicName.getPartitionIndex()); - asyncResponse.resume(Response.noContent().build()); - })) + .thenRun(() -> { + log.info("[{}] Successfully unloaded tc {}", clientAppId(), topicName.getPartitionIndex()); + asyncResponse.resume(Response.noContent().build()); + }) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (!isNot307And404Exception(ex)) { @@ -1284,13 +1279,13 @@ protected CompletableFuture internalGetInternalSta } protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenAccept(__ -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenAccept(__ -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse); @@ -1394,13 +1389,13 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) { protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition, GetStatsOptions getStatsOptions) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)).thenAccept(partitionMetadata -> { if (partitionMetadata.partitions == 0) { asyncResponse.resume(new RestException(Status.NOT_FOUND, @@ -1486,14 +1481,15 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean } protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) - .thenAccept(partitionMetadata -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) + .thenAccept(partitionMetadata -> { if (partitionMetadata.partitions == 0) { asyncResponse.resume(new RestException(Status.NOT_FOUND, getPartitionedTopicNotFoundErrorMessage(topicName.toString()))); @@ -2246,13 +2242,14 @@ private CompletableFuture internalResetCursorForNonPartitionedTopic(String protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName, MessageIdImpl messageId, boolean authoritative, boolean replicated, Map properties) { - CompletableFuture ret; - if (topicName.isGlobal()) { - ret = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - ret = CompletableFuture.completedFuture(null); - } - ret.thenAccept(__ -> { + CompletableFuture ret = validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, + subscriptionName); + ret.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenAccept(__ -> { final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.latest : messageId; log.info("[{}][{}] Creating subscription {} at message id {} with properties {}", clientAppId(), topicName, subscriptionName, targetMessageId, properties); @@ -2411,14 +2408,13 @@ private void internalCreateSubscriptionForNonPartitionedTopic( protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName, Map subscriptionProperties, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - - future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { if (topicName.isPartitioned()) { internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, subscriptionProperties, authoritative); @@ -2490,14 +2486,13 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, String subName, Optional position, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - - future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) .thenCompose(__ -> { if (topicName.isPartitioned()) { return CompletableFuture.completedFuture(null); @@ -2529,14 +2524,13 @@ protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, S protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - - future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { if (topicName.isPartitioned()) { internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, authoritative); @@ -4215,13 +4209,14 @@ private CompletableFuture internalExpireMessagesNonPartitionedTopicByPosit protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) { log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName); - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenAccept(__ -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.COMPACT); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenAccept(__ -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { internalTriggerCompactionNonPartitionedTopic(asyncResponse, authoritative); @@ -4653,11 +4648,12 @@ protected CompletableFuture internalTrimTopic(AsyncResponse asyncResponse, "Trim on a non-persistent topic is not allowed")); return null; } + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC); if (topicName.isPartitioned()) { - return validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC).thenCompose((x) + return future.thenCompose((x) -> trimNonPartitionedTopic(asyncResponse, topicName, authoritative)); } - return validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC) + return future .thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)) .thenCompose(metadata -> { if (metadata.partitions > 0) { @@ -5339,12 +5335,12 @@ private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic( } protected CompletableFuture internalGetSchemaCompatibilityStrategy(boolean applied) { + CompletableFuture future = validateTopicPolicyOperationAsync(topicName, + PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); if (applied) { return getSchemaCompatibilityStrategyAsync(); } - return validateTopicPolicyOperationAsync(topicName, - PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, - PolicyOperation.READ) + return future .thenCompose(n -> getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> { if (!op.isPresent()) { return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java new file mode 100644 index 0000000000000..c864da32350e6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class NamespaceAuthZTest extends MockedPulsarStandalone { + + private PulsarAdmin superUserAdmin; + + private PulsarAdmin tenantManagerAdmin; + + private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); + private static final String TENANT_ADMIN_TOKEN = Jwts.builder() + .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); + + @SneakyThrows + @BeforeClass + public void before() { + configureTokenAuthentication(); + configureDefaultAuthorization(); + start(); + this.superUserAdmin =PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public"); + tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT); + superUserAdmin.tenants().updateTenant("public", tenantInfo); + this.tenantManagerAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) + .build(); + } + + + @SneakyThrows + @AfterClass + public void after() { + if (superUserAdmin != null) { + superUserAdmin.close(); + } + if (tenantManagerAdmin != null) { + tenantManagerAdmin.close(); + } + close(); + } + + + @SneakyThrows + @Test + public void testProperties() { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test superuser + Map properties = new HashMap<>(); + properties.put("key1", "value1"); + superUserAdmin.namespaces().setProperties(namespace, properties); + superUserAdmin.namespaces().setProperty(namespace, "key2", "value2"); + superUserAdmin.namespaces().getProperties(namespace); + superUserAdmin.namespaces().getProperty(namespace, "key2"); + superUserAdmin.namespaces().removeProperty(namespace, "key2"); + superUserAdmin.namespaces().clearProperties(namespace); + + // test tenant manager + tenantManagerAdmin.namespaces().setProperties(namespace, properties); + tenantManagerAdmin.namespaces().setProperty(namespace, "key2", "value2"); + tenantManagerAdmin.namespaces().getProperties(namespace); + tenantManagerAdmin.namespaces().getProperty(namespace, "key2"); + tenantManagerAdmin.namespaces().removeProperty(namespace, "key2"); + tenantManagerAdmin.namespaces().clearProperties(namespace); + + // test nobody + try { + subAdmin.namespaces().setProperties(namespace, properties); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.namespaces().setProperty(namespace, "key2", "value2"); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.namespaces().getProperties(namespace); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.namespaces().getProperty(namespace, "key2"); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.namespaces().removeProperty(namespace, "key2"); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.namespaces().clearProperties(namespace); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java new file mode 100644 index 0000000000000..8fef7114d0622 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +public class TopicAuthZTest extends MockedPulsarStandalone { + + private PulsarAdmin superUserAdmin; + + private PulsarAdmin tenantManagerAdmin; + + private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); + private static final String TENANT_ADMIN_TOKEN = Jwts.builder() + .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); + + @SneakyThrows + @BeforeClass + public void before() { + configureTokenAuthentication(); + configureDefaultAuthorization(); + start(); + this.superUserAdmin =PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public"); + tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT); + superUserAdmin.tenants().updateTenant("public", tenantInfo); + this.tenantManagerAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) + .build(); + } + + + @SneakyThrows + @AfterClass + public void after() { + if (superUserAdmin != null) { + superUserAdmin.close(); + } + if (tenantManagerAdmin != null) { + tenantManagerAdmin.close(); + } + close(); + } + + + @SneakyThrows + @Test + public void testUnloadAndCompactAndTrim() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test superuser + superUserAdmin.topics().unload(topic); + superUserAdmin.topics().triggerCompaction(topic); + superUserAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName()); + superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false); + + // test tenant manager + tenantManagerAdmin.topics().unload(topic); + tenantManagerAdmin.topics().triggerCompaction(topic); + tenantManagerAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName()); + tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false); + + // test nobody + try { + subAdmin.topics().unload(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topics().triggerCompaction(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName()); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + try { + subAdmin.topics().unload(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topics().triggerCompaction(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topics().trimTopic(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } + + @Test + @SneakyThrows + public void testGetManagedLedgerInfo() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test superuser + superUserAdmin.topics().getInternalInfo(topic); + + // test tenant manager + tenantManagerAdmin.topics().getInternalInfo(topic); + + // test nobody + try { + subAdmin.topics().getInternalInfo(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + if (action == AuthAction.produce || action == AuthAction.consume) { + subAdmin.topics().getInternalInfo(topic); + } else { + try { + subAdmin.topics().getInternalInfo(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + } + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } + + @Test + @SneakyThrows + public void testGetPartitionedStatsAndInternalStats() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test superuser + superUserAdmin.topics().getPartitionedStats(topic, false); + superUserAdmin.topics().getPartitionedInternalStats(topic); + + // test tenant manager + tenantManagerAdmin.topics().getPartitionedStats(topic, false); + tenantManagerAdmin.topics().getPartitionedInternalStats(topic); + + // test nobody + try { + subAdmin.topics().getPartitionedStats(topic, false); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topics().getPartitionedInternalStats(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + if (action == AuthAction.produce || action == AuthAction.consume) { + subAdmin.topics().getPartitionedStats(topic, false); + subAdmin.topics().getPartitionedInternalStats(topic); + } else { + try { + subAdmin.topics().getPartitionedStats(topic, false); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topics().getPartitionedInternalStats(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + } + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } + + @Test + @SneakyThrows + public void testCreateSubscriptionAndUpdateSubscriptionProperties() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + AtomicInteger suffix = new AtomicInteger(1); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // + superUserAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); + + // test tenant manager + tenantManagerAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); + + // test nobody + try { + subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + if (action == AuthAction.consume) { + subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); + } else { + try { + subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + } + superUserAdmin.topics().revokePermissions(topic, subject); + } + // test UpdateSubscriptionProperties + Map properties = new HashMap<>(); + superUserAdmin.topics().createSubscription(topic, "test-sub", MessageId.earliest); + // test superuser + superUserAdmin.topics().updateSubscriptionProperties(topic, "test-sub" , properties); + superUserAdmin.topics().getSubscriptionProperties(topic, "test-sub"); + superUserAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()); + + // test tenant manager + tenantManagerAdmin.topics().updateSubscriptionProperties(topic, "test-sub" , properties); + tenantManagerAdmin.topics().getSubscriptionProperties(topic, "test-sub"); + tenantManagerAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()); + + // test nobody + try { + subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topics().getSubscriptionProperties(topic, "test-sub"); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + if (action == AuthAction.consume) { + subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties); + subAdmin.topics().getSubscriptionProperties(topic, "test-sub"); + subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()); + } else { + try { + subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topics().getSubscriptionProperties(topic, "test-sub"); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + try { + subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + } + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } +} From 2039c6a8c0b77f0089d401e65c3b37bc792fd755 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 19 Mar 2024 11:30:16 +0800 Subject: [PATCH 2/7] fix --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 73fd635471c2f..27d556583c8b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -5338,7 +5338,7 @@ protected CompletableFuture internalGetSchemaCompat CompletableFuture future = validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); if (applied) { - return getSchemaCompatibilityStrategyAsync(); + return future.thenCompose(__ -> getSchemaCompatibilityStrategyAsync()); } return future .thenCompose(n -> getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> { From 6542815589fb1c2192b43baf786885218ee131c0 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 19 Mar 2024 13:31:30 +0800 Subject: [PATCH 3/7] address comment --- .../broker/admin/NamespaceAuthZTest.java | 49 ++--- .../pulsar/broker/admin/TopicAuthZTest.java | 190 ++++++------------ 2 files changed, 70 insertions(+), 169 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index c864da32350e6..36d581b87fc62 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -113,47 +113,24 @@ public void testProperties() { tenantManagerAdmin.namespaces().clearProperties(namespace); // test nobody - try { - subAdmin.namespaces().setProperties(namespace, properties); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setProperties(namespace, properties)); - try { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setProperty(namespace, "key2", "value2")); - subAdmin.namespaces().setProperty(namespace, "key2", "value2"); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getProperties(namespace)); - try { - subAdmin.namespaces().getProperties(namespace); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getProperty(namespace, "key2")); - try { - subAdmin.namespaces().getProperty(namespace, "key2"); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.namespaces().removeProperty(namespace, "key2"); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeProperty(namespace, "key2")); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().clearProperties(namespace)); - try { - subAdmin.namespaces().clearProperties(namespace); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java index 8fef7114d0622..06630a9dcfacc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java @@ -112,57 +112,34 @@ public void testUnloadAndCompactAndTrim() { tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false); // test nobody - try { - subAdmin.topics().unload(topic); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topics().triggerCompaction(topic); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName()); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().unload(topic)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().triggerCompaction(topic)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName())); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false)); + // Test only super/admin can do the operation, other auth are not permitted. for (AuthAction action : AuthAction.values()) { superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); - try { - subAdmin.topics().unload(topic); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topics().triggerCompaction(topic); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topics().trimTopic(topic); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().unload(topic)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().triggerCompaction(topic)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().trimTopic(topic)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false)); + superUserAdmin.topics().revokePermissions(topic, subject); } superUserAdmin.topics().deletePartitionedTopic(topic, true); @@ -190,24 +167,16 @@ public void testGetManagedLedgerInfo() { tenantManagerAdmin.topics().getInternalInfo(topic); // test nobody - try { - subAdmin.topics().getInternalInfo(topic); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getInternalInfo(topic)); for (AuthAction action : AuthAction.values()) { superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); if (action == AuthAction.produce || action == AuthAction.consume) { subAdmin.topics().getInternalInfo(topic); } else { - try { - subAdmin.topics().getInternalInfo(topic); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getInternalInfo(topic)); } superUserAdmin.topics().revokePermissions(topic, subject); } @@ -238,19 +207,11 @@ public void testGetPartitionedStatsAndInternalStats() { tenantManagerAdmin.topics().getPartitionedInternalStats(topic); // test nobody - try { - subAdmin.topics().getPartitionedStats(topic, false); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPartitionedStats(topic, false)); - try { - subAdmin.topics().getPartitionedInternalStats(topic); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPartitionedInternalStats(topic)); for (AuthAction action : AuthAction.values()) { superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); @@ -258,18 +219,11 @@ public void testGetPartitionedStatsAndInternalStats() { subAdmin.topics().getPartitionedStats(topic, false); subAdmin.topics().getPartitionedInternalStats(topic); } else { - try { - subAdmin.topics().getPartitionedStats(topic, false); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topics().getPartitionedInternalStats(topic); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPartitionedStats(topic, false)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPartitionedInternalStats(topic)); } superUserAdmin.topics().revokePermissions(topic, subject); } @@ -298,25 +252,16 @@ public void testCreateSubscriptionAndUpdateSubscriptionProperties() { tenantManagerAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); // test nobody - try { - subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest)); for (AuthAction action : AuthAction.values()) { superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); if (action == AuthAction.consume) { subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); } else { - try { - subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest)); } superUserAdmin.topics().revokePermissions(topic, subject); } @@ -334,25 +279,14 @@ public void testCreateSubscriptionAndUpdateSubscriptionProperties() { tenantManagerAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()); // test nobody - try { - subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topics().getSubscriptionProperties(topic, "test-sub"); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getSubscriptionProperties(topic, "test-sub")); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty())); for (AuthAction action : AuthAction.values()) { superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); @@ -361,24 +295,14 @@ public void testCreateSubscriptionAndUpdateSubscriptionProperties() { subAdmin.topics().getSubscriptionProperties(topic, "test-sub"); subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()); } else { - try { - subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topics().getSubscriptionProperties(topic, "test-sub"); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } - try { - subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()); - Assert.fail("unexpected behaviour"); - } catch (PulsarAdminException ex) { - Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); - } + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getSubscriptionProperties(topic, "test-sub")); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty())); } superUserAdmin.topics().revokePermissions(topic, subject); } From 895e2fb936aa18ef4c945ca17c806188c915274d Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 19 Mar 2024 18:40:18 +0800 Subject: [PATCH 4/7] fix --- .../pulsar/broker/admin/impl/NamespacesBase.java | 12 ++++++------ .../broker/admin/impl/PersistentTopicsBase.java | 4 +++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 37ab552b7b79d..9d01530c60121 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2339,7 +2339,7 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) { } protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) { - validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + validateAdminAccessForTenantAsync(namespaceName.getTenant()) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.properties.put(key, value); @@ -2359,7 +2359,7 @@ protected void internalSetProperty(String key, String value, AsyncResponse async } protected void internalSetProperties(Map properties, AsyncResponse asyncResponse) { - validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + validateAdminAccessForTenantAsync(namespaceName.getTenant()) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.properties.putAll(properties); @@ -2379,7 +2379,7 @@ protected void internalSetProperties(Map properties, AsyncRespon } protected void internalGetProperty(String key, AsyncResponse asyncResponse) { - validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + validateAdminAccessForTenantAsync(namespaceName.getTenant()) .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenAccept(policies -> asyncResponse.resume(policies.properties.get(key))) .exceptionally(ex -> { @@ -2392,7 +2392,7 @@ protected void internalGetProperty(String key, AsyncResponse asyncResponse) { } protected void internalGetProperties(AsyncResponse asyncResponse) { - validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + validateAdminAccessForTenantAsync(namespaceName.getTenant()) .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenAccept(policies -> asyncResponse.resume(policies.properties)) .exceptionally(ex -> { @@ -2405,7 +2405,7 @@ protected void internalGetProperties(AsyncResponse asyncResponse) { protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) { AtomicReference oldVal = new AtomicReference<>(null); - validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + validateAdminAccessForTenantAsync(namespaceName.getTenant()) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { oldVal.set(policies.properties.remove(key)); @@ -2425,7 +2425,7 @@ protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) { protected void internalClearProperties(AsyncResponse asyncResponse) { AtomicReference clearedCount = new AtomicReference<>(0); - validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + validateAdminAccessForTenantAsync(namespaceName.getTenant()) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { clearedCount.set(policies.properties.size()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 27d556583c8b1..372dc0d63fb16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -488,7 +488,9 @@ protected CompletableFuture internalCreateNonPartitionedTopicAsync(boolean protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { if (metadata != null) { - tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> { + CompletableFuture future = validateNamespaceOperationAsync(topicName.getNamespaceObject(), + NamespaceOperation.CREATE_TOPIC); + future.thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> { asyncResponse.resume(Response.noContent().build()); }).exceptionally(e -> { log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName); From a249aec453e62b908814404153b924bfd820f8c8 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 19 Mar 2024 22:56:19 +0800 Subject: [PATCH 5/7] address comment --- .../admin/impl/PersistentTopicsBase.java | 6 ++-- .../broker/admin/NamespaceAuthZTest.java | 27 ++++++++++++++ .../pulsar/broker/admin/TopicAuthZTest.java | 36 ++++++++++++++++++- 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 372dc0d63fb16..29c24cc64179b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2410,7 +2410,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic( protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName, Map subscriptionProperties, boolean authoritative) { - CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName); + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subName); future.thenCompose(__ -> { if (topicName.isGlobal()) { return validateGlobalNamespaceOwnershipAsync(namespaceName); @@ -2488,7 +2488,7 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, String subName, Optional position, boolean authoritative) { - CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName); + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS, subName); future.thenCompose(__ -> { if (topicName.isGlobal()) { return validateGlobalNamespaceOwnershipAsync(namespaceName); @@ -2526,7 +2526,7 @@ protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, S protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName, boolean authoritative) { - CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName); + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA, subName); future.thenCompose(__ -> { if (topicName.isGlobal()) { return validateGlobalNamespaceOwnershipAsync(namespaceName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index 36d581b87fc62..ce0b925614c55 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.security.MockedPulsarStandalone; import org.testng.Assert; @@ -33,8 +34,10 @@ import org.testng.annotations.Test; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; +@Test(groups = "broker-admin") public class NamespaceAuthZTest extends MockedPulsarStandalone { private PulsarAdmin superUserAdmin; @@ -132,5 +135,29 @@ public void testProperties() { Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().clearProperties(namespace)); + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setProperties(namespace, properties)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setProperty(namespace, "key2", "value2")); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getProperties(namespace)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getProperty(namespace, "key2")); + + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeProperty(namespace, "key2")); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().clearProperties(namespace)); + + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + superUserAdmin.topics().delete(topic, true); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java index 06630a9dcfacc..e23f9bbaf9b30 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java @@ -41,6 +41,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +@Test(groups = "broker-admin") public class TopicAuthZTest extends MockedPulsarStandalone { private PulsarAdmin superUserAdmin; @@ -232,7 +233,7 @@ public void testGetPartitionedStatsAndInternalStats() { @Test @SneakyThrows - public void testCreateSubscriptionAndUpdateSubscriptionProperties() { + public void testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubscriptionBacklog() { final String random = UUID.randomUUID().toString(); final String topic = "persistent://public/default/" + random; final String subject = UUID.randomUUID().toString(); @@ -308,4 +309,37 @@ public void testCreateSubscriptionAndUpdateSubscriptionProperties() { } superUserAdmin.topics().deletePartitionedTopic(topic, true); } + + @Test + @SneakyThrows + public void testCreateMissingPartition() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + AtomicInteger suffix = new AtomicInteger(1); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // + superUserAdmin.topics().createMissedPartitions(topic); + + // test tenant manager + tenantManagerAdmin.topics().createMissedPartitions(topic); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().createMissedPartitions(topic)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().createMissedPartitions(topic)); + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } } From 71a207fb49e892ab4ca24e054bbec72dd08653b8 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 19 Mar 2024 23:02:11 +0800 Subject: [PATCH 6/7] fix --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 29c24cc64179b..18107d12f69c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1068,8 +1068,10 @@ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncResponse, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(v -> pulsar().getTransactionMetadataStoreService().removeTransactionMetadataStore( - TransactionCoordinatorID.get(topicName.getPartitionIndex()))) + .thenCompose(v -> pulsar() + .getTransactionMetadataStoreService() + .removeTransactionMetadataStore( + TransactionCoordinatorID.get(topicName.getPartitionIndex()))) .thenRun(() -> { log.info("[{}] Successfully unloaded tc {}", clientAppId(), topicName.getPartitionIndex()); asyncResponse.resume(Response.noContent().build()); From b7c4bacd1a4c1cabd5fa4ddc43bd002ee4a8db33 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 20 Mar 2024 10:55:53 +0800 Subject: [PATCH 7/7] address comment --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 18107d12f69c6..10cf5edd3c366 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2490,7 +2490,7 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, String subName, Optional position, boolean authoritative) { - CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS, subName); + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName); future.thenCompose(__ -> { if (topicName.isGlobal()) { return validateGlobalNamespaceOwnershipAsync(namespaceName); @@ -2528,7 +2528,7 @@ protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, S protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName, boolean authoritative) { - CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA, subName); + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName); future.thenCompose(__ -> { if (topicName.isGlobal()) { return validateGlobalNamespaceOwnershipAsync(namespaceName);