From f3b4e8632b6a69e5a175fec472310bf520d98051 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Mon, 13 Jun 2022 11:42:38 +0800 Subject: [PATCH] =?UTF-8?q?[fix][admin]=20Fix=20producer/consume=20permiss?= =?UTF-8?q?ion=20can=E2=80=99t=20get=20schema=20(#15956)=20(#16026)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cherry-pick #15956. ### Motivation Currently, we need admin permissions to operate the schema API. This is because the admin permission was defined when the schema API was first added. See #1381. Later, then adding authentication granularity with #6428, we don't change the schema API part. So leave the admin permission today. But the binary protocol allows the produce/consume permission to get the schema, so change the related method permission to `produce/consume`. --- .../admin/impl/SchemasResourceBase.java | 22 ++- .../broker/admin/v2/SchemasResource.java | 6 +- .../admin/AdminApiSchemaWithAuthTest.java | 139 ++++++++++++++++++ 3 files changed, 160 insertions(+), 7 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java index 5b119ec881db4..304b311cbea54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java @@ -38,6 +38,7 @@ import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; @@ -83,7 +84,7 @@ private String getSchemaId() { } public void getSchema(boolean authoritative, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); + validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA); String schemaId = getSchemaId(); pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> { handleGetSchemaResponse(response, schema, error); @@ -92,7 +93,7 @@ public void getSchema(boolean authoritative, AsyncResponse response) { } public void getSchema(boolean authoritative, String version, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); + validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA); String schemaId = getSchemaId(); ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES); bbVersion.putLong(Long.parseLong(version)); @@ -104,7 +105,7 @@ public void getSchema(boolean authoritative, String version, AsyncResponse respo } public void getAllSchemas(boolean authoritative, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); + validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA); String schemaId = getSchemaId(); pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> { @@ -208,7 +209,7 @@ public void testCompatibility(PostSchemaPayload payload, boolean authoritative, public void getVersionBySchema( PostSchemaPayload payload, boolean authoritative, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); + validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA); String schemaId = getSchemaId(); @@ -302,5 +303,18 @@ private void validateDestinationAndAdminOperation(boolean authoritative) { } } + private void validateOwnershipAndOperation(boolean authoritative, TopicOperation operation) { + try { + validateTopicOwnership(topicName, authoritative); + validateTopicOperation(topicName, operation); + } catch (RestException e) { + if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) { + throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage()); + } else { + throw e; + } + } + } + private static final Logger log = LoggerFactory.getLogger(SchemasResourceBase.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java index 9e6f7e9f80a14..fc30149ccd7f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java @@ -177,7 +177,7 @@ public void postSchema( @PathParam("namespace") String namespace, @PathParam("topic") String topic, @ApiParam( - value = "A JSON value presenting a schema playload. An example of the expected schema can be found down" + value = "A JSON value presenting a schema payload. An example of the expected schema can be found down" + " here.", examples = @Example( value = @ExampleProperty( @@ -212,7 +212,7 @@ public void testCompatibility( @PathParam("namespace") String namespace, @PathParam("topic") String topic, @ApiParam( - value = "A JSON value presenting a schema playload." + value = "A JSON value presenting a schema payload." + " An example of the expected schema can be found down here.", examples = @Example( value = @ExampleProperty( @@ -249,7 +249,7 @@ public void getVersionBySchema( @PathParam("namespace") String namespace, @PathParam("topic") String topic, @ApiParam( - value = "A JSON value presenting a schema playload." + value = "A JSON value presenting a schema payload." + " An example of the expected schema can be found down here.", examples = @Example( value = @ExampleProperty( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java new file mode 100644 index 0000000000000..29c0f97e61074 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import com.google.common.collect.Sets; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.mockito.Mockito; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import javax.crypto.SecretKey; +import java.util.Base64; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +/** + * Unit tests for schema admin api. + */ +@Slf4j +@Test(groups = "broker-admin") +public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest { + + private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact(); + private static final String CONSUME_TOKEN = Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact(); + + @BeforeMethod + @Override + public void setup() throws Exception { + conf.setAuthorizationEnabled(true); + conf.setAuthenticationEnabled(true); + conf.getProperties().setProperty("tokenSecretKey", "data:;base64," + + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); + Set providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + Set superUserRoles = new HashSet<>(); + superUserRoles.add("admin"); + conf.setSuperUserRoles(superUserRoles); + conf.setAuthenticationProviders(providers); + conf.setSystemTopicEnabled(false); + conf.setTopicLevelPoliciesEnabled(false); + super.internalSetup(); + + PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null + ? brokerUrl.toString() : brokerUrlTls.toString()) + .authentication(AuthenticationToken.class.getName(), + ADMIN_TOKEN); + admin = Mockito.spy(pulsarAdminBuilder.build()); + + // Setup namespaces + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("schematest", tenantInfo); + admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test")); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testGetCreateDeleteSchema() throws Exception { + String topicName = "persistent://schematest/test/testCreateSchema"; + PulsarAdmin adminWithoutPermission = PulsarAdmin.builder() + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) + .build(); + PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder() + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) + .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN) + .build(); + PulsarAdmin adminWithConsumePermission = PulsarAdmin.builder() + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) + .authentication(AuthenticationToken.class.getName(), CONSUME_TOKEN) + .build(); + admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume)); + admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce)); + + SchemaInfo si = Schema.BOOL.getSchemaInfo(); + assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().createSchema(topicName, si)); + adminWithAdminPermission.schemas().createSchema(topicName, si); + + assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName)); + SchemaInfo readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName); + assertEquals(readSi, si); + + assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName, 0)); + readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName, 0); + assertEquals(readSi, si); + List allSchemas = adminWithConsumePermission.schemas().getAllSchemas(topicName); + assertEquals(allSchemas.size(), 1); + + SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo(); + assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2)); + assertTrue(adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility()); + + assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getVersionBySchema(topicName, si)); + Long versionBySchema = adminWithConsumePermission.schemas().getVersionBySchema(topicName, si); + assertEquals(versionBySchema, Long.valueOf(0L)); + + assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().deleteSchema(topicName)); + adminWithAdminPermission.schemas().deleteSchema(topicName); + } +}