diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index eee363aeed86b..15d23d6e4d85f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -2406,7 +2406,8 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetRetention(applied, isGlobal)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { @@ -2433,7 +2434,8 @@ public void setRetention(@Suspended final AsyncResponse asyncResponse, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Retention policies for the specified topic") RetentionPolicies retention) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetRetention(retention, isGlobal)) .thenRun(() -> { try { @@ -2469,7 +2471,8 @@ public void removeRetention(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveRetention(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove retention: namespace={}, topic={}", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java new file mode 100644 index 0000000000000..9cf4c111dddbc --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.awaitility.Awaitility.await; +import io.jsonwebtoken.Jwts; +import java.util.Set; +import java.util.UUID; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone { + + private PulsarAdmin superUserAdmin; + + private PulsarAdmin tenantManagerAdmin; + + private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); + private static final String TENANT_ADMIN_TOKEN = Jwts.builder() + .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); + + @SneakyThrows + @BeforeClass + public void before() { + configureTokenAuthentication(); + configureDefaultAuthorization(); + start(); + this.superUserAdmin =PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public"); + tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT); + superUserAdmin.tenants().updateTenant("public", tenantInfo); + this.tenantManagerAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) + .build(); + } + + + @SneakyThrows + @AfterClass + public void after() { + close(); + } + + + @SneakyThrows + @Test + public void testRetention() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + final RetentionPolicies definedRetentionPolicy = new RetentionPolicies(1, 1); + // test superuser + superUserAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy); + + // because the topic policies is eventual consistency, we should wait here + await().untilAsserted(() -> { + final RetentionPolicies receivedRetentionPolicy = superUserAdmin.topicPolicies().getRetention(topic); + Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy); + }); + superUserAdmin.topicPolicies().removeRetention(topic); + + await().untilAsserted(() -> { + final RetentionPolicies retention = superUserAdmin.topicPolicies().getRetention(topic); + Assert.assertNull(retention); + }); + + // test tenant manager + + tenantManagerAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy); + await().untilAsserted(() -> { + final RetentionPolicies receivedRetentionPolicy = tenantManagerAdmin.topicPolicies().getRetention(topic); + Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy); + }); + tenantManagerAdmin.topicPolicies().removeRetention(topic); + await().untilAsserted(() -> { + final RetentionPolicies retention = tenantManagerAdmin.topicPolicies().getRetention(topic); + Assert.assertNull(retention); + }); + + // test nobody + + try { + subAdmin.topicPolicies().getRetention(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeRetention(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + // test sub user with permissions + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", + subject, Set.of(action)); + try { + subAdmin.topicPolicies().getRetention(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeRetention(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject); + } + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java new file mode 100644 index 0000000000000..20dd2e1066a87 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.security; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import javax.crypto.SecretKey; +import lombok.Getter; +import lombok.SneakyThrows; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.ObjectMapperFactory; + + +public abstract class MockedPulsarStandalone implements AutoCloseable { + + @Getter + private final ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + private PulsarTestContext pulsarTestContext; + + @Getter + private PulsarService pulsarService; + private PulsarAdmin serviceInternalAdmin; + + + { + serviceConfiguration.setClusterName(TEST_CLUSTER_NAME); + serviceConfiguration.setBrokerShutdownTimeoutMs(0L); + serviceConfiguration.setBrokerServicePort(Optional.of(0)); + serviceConfiguration.setBrokerServicePortTls(Optional.of(0)); + serviceConfiguration.setAdvertisedAddress("localhost"); + serviceConfiguration.setWebServicePort(Optional.of(0)); + serviceConfiguration.setWebServicePortTls(Optional.of(0)); + serviceConfiguration.setNumExecutorThreadPoolSize(5); + serviceConfiguration.setExposeBundlesMetricsInPrometheus(true); + } + + + protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal"; + private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder() + .claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact(); + protected static final String SUPER_USER_SUBJECT = "super-user"; + protected static final String SUPER_USER_TOKEN = Jwts.builder() + .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact(); + protected static final String NOBODY_SUBJECT = "nobody"; + protected static final String NOBODY_TOKEN = Jwts.builder() + .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact(); + + + @SneakyThrows + protected void configureTokenAuthentication() { + serviceConfiguration.setAuthenticationEnabled(true); + serviceConfiguration.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName())); + // internal client + serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + final Map brokerClientAuthParams = new HashMap<>(); + brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN); + final String brokerClientAuthParamStr = MAPPER.writeValueAsString(brokerClientAuthParams); + serviceConfiguration.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr); + + Properties properties = serviceConfiguration.getProperties(); + if (properties == null) { + properties = new Properties(); + serviceConfiguration.setProperties(properties); + } + properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + + } + + + + protected void configureDefaultAuthorization() { + serviceConfiguration.setAuthorizationEnabled(true); + serviceConfiguration.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); + serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT)); + } + + + @SneakyThrows + protected void start() { + this.pulsarTestContext = PulsarTestContext.builder() + .spyByDefault() + .config(serviceConfiguration) + .withMockZookeeper(false) + .build(); + this.pulsarService = pulsarTestContext.getPulsarService(); + this.serviceInternalAdmin = pulsarService.getAdminClient(); + setupDefaultTenantAndNamespace(); + } + + private void setupDefaultTenantAndNamespace() throws Exception { + if (!serviceInternalAdmin.clusters().getClusters().contains(TEST_CLUSTER_NAME)) { + serviceInternalAdmin.clusters().createCluster(TEST_CLUSTER_NAME, + ClusterData.builder().serviceUrl(pulsarService.getWebServiceAddress()).build()); + } + if (!serviceInternalAdmin.tenants().getTenants().contains(DEFAULT_TENANT)) { + serviceInternalAdmin.tenants().createTenant(DEFAULT_TENANT, TenantInfo.builder().allowedClusters( + Sets.newHashSet(TEST_CLUSTER_NAME)).build()); + } + if (!serviceInternalAdmin.namespaces().getNamespaces(DEFAULT_TENANT).contains(DEFAULT_NAMESPACE)) { + serviceInternalAdmin.namespaces().createNamespace(DEFAULT_NAMESPACE); + } + } + + + @Override + public void close() throws Exception { + if (pulsarTestContext != null) { + pulsarTestContext.close(); + } + } + + // Utils + protected static final ObjectMapper mapper = new ObjectMapper(); + + // Static name + private static final String DEFAULT_TENANT = "public"; + private static final String DEFAULT_NAMESPACE = "public/default"; + private static final String TEST_CLUSTER_NAME = "test-standalone"; + + private static final ObjectMapper MAPPER = ObjectMapperFactory.getMapper().getObjectMapper(); +}