Skip to content

Commit

Permalink
[improve][broker] Add fine-grain authorization to retention admin API (
Browse files Browse the repository at this point in the history
…#22163)

(cherry picked from commit 6ec473e)
  • Loading branch information
mattisonchao committed Mar 4, 2024
1 parent 21660bd commit f0da29b
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2406,7 +2406,8 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.READ)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetRetention(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
Expand All @@ -2433,7 +2434,8 @@ public void setRetention(@Suspended final AsyncResponse asyncResponse,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Retention policies for the specified topic") RetentionPolicies retention) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetRetention(retention, isGlobal))
.thenRun(() -> {
try {
Expand Down Expand Up @@ -2469,7 +2471,8 @@ public void removeRetention(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveRetention(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove retention: namespace={}, topic={}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;

import static org.awaitility.Awaitility.await;
import io.jsonwebtoken.Jwts;
import java.util.Set;
import java.util.UUID;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {

private PulsarAdmin superUserAdmin;

private PulsarAdmin tenantManagerAdmin;

private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString();
private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();

@SneakyThrows
@BeforeClass
public void before() {
configureTokenAuthentication();
configureDefaultAuthorization();
start();
this.superUserAdmin =PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
.build();
final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public");
tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
superUserAdmin.tenants().updateTenant("public", tenantInfo);
this.tenantManagerAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
.build();
}


@SneakyThrows
@AfterClass
public void after() {
close();
}


@SneakyThrows
@Test
public void testRetention() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);

@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
final RetentionPolicies definedRetentionPolicy = new RetentionPolicies(1, 1);
// test superuser
superUserAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);

// because the topic policies is eventual consistency, we should wait here
await().untilAsserted(() -> {
final RetentionPolicies receivedRetentionPolicy = superUserAdmin.topicPolicies().getRetention(topic);
Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
});
superUserAdmin.topicPolicies().removeRetention(topic);

await().untilAsserted(() -> {
final RetentionPolicies retention = superUserAdmin.topicPolicies().getRetention(topic);
Assert.assertNull(retention);
});

// test tenant manager

tenantManagerAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
await().untilAsserted(() -> {
final RetentionPolicies receivedRetentionPolicy = tenantManagerAdmin.topicPolicies().getRetention(topic);
Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
});
tenantManagerAdmin.topicPolicies().removeRetention(topic);
await().untilAsserted(() -> {
final RetentionPolicies retention = tenantManagerAdmin.topicPolicies().getRetention(topic);
Assert.assertNull(retention);
});

// test nobody

try {
subAdmin.topicPolicies().getRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}

try {

subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}

try {
subAdmin.topicPolicies().removeRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}

// test sub user with permissions
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
subject, Set.of(action));
try {
subAdmin.topicPolicies().getRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}

try {

subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}

try {
subAdmin.topicPolicies().removeRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.security;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import javax.crypto.SecretKey;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;


public abstract class MockedPulsarStandalone implements AutoCloseable {

@Getter
private final ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
private PulsarTestContext pulsarTestContext;

@Getter
private PulsarService pulsarService;
private PulsarAdmin serviceInternalAdmin;


{
serviceConfiguration.setClusterName(TEST_CLUSTER_NAME);
serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
serviceConfiguration.setBrokerServicePort(Optional.of(0));
serviceConfiguration.setBrokerServicePortTls(Optional.of(0));
serviceConfiguration.setAdvertisedAddress("localhost");
serviceConfiguration.setWebServicePort(Optional.of(0));
serviceConfiguration.setWebServicePortTls(Optional.of(0));
serviceConfiguration.setNumExecutorThreadPoolSize(5);
serviceConfiguration.setExposeBundlesMetricsInPrometheus(true);
}


protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);

private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal";
private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
.claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
protected static final String SUPER_USER_SUBJECT = "super-user";
protected static final String SUPER_USER_TOKEN = Jwts.builder()
.claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
protected static final String NOBODY_SUBJECT = "nobody";
protected static final String NOBODY_TOKEN = Jwts.builder()
.claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();


@SneakyThrows
protected void configureTokenAuthentication() {
serviceConfiguration.setAuthenticationEnabled(true);
serviceConfiguration.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
// internal client
serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
final Map<String, String> brokerClientAuthParams = new HashMap<>();
brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
final String brokerClientAuthParamStr = MAPPER.writeValueAsString(brokerClientAuthParams);
serviceConfiguration.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);

Properties properties = serviceConfiguration.getProperties();
if (properties == null) {
properties = new Properties();
serviceConfiguration.setProperties(properties);
}
properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));

}



protected void configureDefaultAuthorization() {
serviceConfiguration.setAuthorizationEnabled(true);
serviceConfiguration.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT));
}


@SneakyThrows
protected void start() {
this.pulsarTestContext = PulsarTestContext.builder()
.spyByDefault()
.config(serviceConfiguration)
.withMockZookeeper(false)
.build();
this.pulsarService = pulsarTestContext.getPulsarService();
this.serviceInternalAdmin = pulsarService.getAdminClient();
setupDefaultTenantAndNamespace();
}

private void setupDefaultTenantAndNamespace() throws Exception {
if (!serviceInternalAdmin.clusters().getClusters().contains(TEST_CLUSTER_NAME)) {
serviceInternalAdmin.clusters().createCluster(TEST_CLUSTER_NAME,
ClusterData.builder().serviceUrl(pulsarService.getWebServiceAddress()).build());
}
if (!serviceInternalAdmin.tenants().getTenants().contains(DEFAULT_TENANT)) {
serviceInternalAdmin.tenants().createTenant(DEFAULT_TENANT, TenantInfo.builder().allowedClusters(
Sets.newHashSet(TEST_CLUSTER_NAME)).build());
}
if (!serviceInternalAdmin.namespaces().getNamespaces(DEFAULT_TENANT).contains(DEFAULT_NAMESPACE)) {
serviceInternalAdmin.namespaces().createNamespace(DEFAULT_NAMESPACE);
}
}


@Override
public void close() throws Exception {
if (pulsarTestContext != null) {
pulsarTestContext.close();
}
}

// Utils
protected static final ObjectMapper mapper = new ObjectMapper();

// Static name
private static final String DEFAULT_TENANT = "public";
private static final String DEFAULT_NAMESPACE = "public/default";
private static final String TEST_CLUSTER_NAME = "test-standalone";

private static final ObjectMapper MAPPER = ObjectMapperFactory.getMapper().getObjectMapper();
}

0 comments on commit f0da29b

Please sign in to comment.