From 95c1778e3c2a6254b81a7c8c95315473a713121f Mon Sep 17 00:00:00 2001 From: Ruguo Yu Date: Tue, 23 Aug 2022 00:07:35 +0800 Subject: [PATCH] =?UTF-8?q?[Authorization]=20Fix=20producer/consume=20perm?= =?UTF-8?q?ission=20can=E2=80=99t=20get=20v1/schema=20(#16018)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [Authorization] Fix producer/consume permission can’t get v1/schema --- .../admin/impl/SchemasResourceBase.java | 207 ------------------ .../broker/admin/v1/SchemasResource.java | 96 +++++++- 2 files changed, 89 insertions(+), 214 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java index 175ab5ac27c73..a115b26407d18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java @@ -29,32 +29,23 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; -import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; -import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TopicOperation; -import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; -import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse; -import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse; import org.apache.pulsar.common.protocol.schema.PostSchemaPayload; -import org.apache.pulsar.common.protocol.schema.PostSchemaResponse; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,33 +79,12 @@ private String getSchemaId() { } } - public void getSchema(boolean authoritative, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); - String schemaId = getSchemaId(); - pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> { - handleGetSchemaResponse(response, schema, error); - return null; - }); - } - public CompletableFuture getSchemaAsync(boolean authoritative) { return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA) .thenApply(__ -> getSchemaId()) .thenCompose(schemaId -> pulsar().getSchemaRegistryService().getSchema(schemaId)); } - public void getSchema(boolean authoritative, String version, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); - String schemaId = getSchemaId(); - ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES); - bbVersion.putLong(Long.parseLong(version)); - SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array()); - pulsar().getSchemaRegistryService().getSchema(schemaId, v).handle((schema, error) -> { - handleGetSchemaResponse(response, schema, error); - return null; - }); - } - public CompletableFuture getSchemaAsync(boolean authoritative, String version) { return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA) .thenApply(__ -> getSchemaId()) @@ -127,16 +97,6 @@ public CompletableFuture getSchemaAsync(boolean authoritative }); } - public void getAllSchemas(boolean authoritative, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); - - String schemaId = getSchemaId(); - pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> { - handleGetAllSchemasResponse(response, schema, error); - return null; - }); - } - public CompletableFuture> getAllSchemasAsync(boolean authoritative) { return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA) .thenCompose(__ -> { @@ -145,24 +105,6 @@ public CompletableFuture> getAllSchemasAsync(boolean aut }); } - public void deleteSchema(boolean authoritative, AsyncResponse response, boolean force) { - validateDestinationAndAdminOperation(authoritative); - - String schemaId = getSchemaId(); - pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""), force) - .handle((version, error) -> { - if (isNull(error)) { - response.resume(Response.ok() - .entity(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build()) - .build()); - } else { - log.error("[{}] Failed to delete schema for topic {}", clientAppId(), topicName, error); - response.resume(new RestException(error)); - } - return null; - }); - } - public CompletableFuture deleteSchemaAsync(boolean authoritative, boolean force) { return validateDestinationAndAdminOperationAsync(authoritative) .thenCompose(__ -> { @@ -172,61 +114,6 @@ public CompletableFuture deleteSchemaAsync(boolean authoritative, }); } - public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); - - getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> { - byte[] data; - if (SchemaType.KEY_VALUE.name().equals(payload.getType())) { - try { - data = DefaultImplementation.getDefaultImplementation() - .convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8)); - } catch (IOException conversionError) { - log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, conversionError); - response.resume(new RestException(conversionError)); - return; - } - } else { - data = payload.getSchema().getBytes(Charsets.UTF_8); - } - pulsar().getSchemaRegistryService() - .putSchemaIfAbsent(getSchemaId(), - SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis()) - .type(SchemaType.valueOf(payload.getType())).user(defaultIfEmpty(clientAppId(), "")) - .props(payload.getProperties()).build(), - schemaCompatibilityStrategy) - .thenAccept(version -> response.resume( - Response.accepted().entity(PostSchemaResponse.builder().version(version).build()).build())) - .exceptionally(error -> { - Throwable throwable = FutureUtil.unwrapCompletionException(error); - if (throwable instanceof IncompatibleSchemaException) { - response.resume(Response - .status(Response.Status.CONFLICT.getStatusCode(), throwable.getMessage()) - .build()); - } else if (throwable instanceof InvalidSchemaDataException) { - response.resume(Response.status(422, /* Unprocessable Entity */ - throwable.getMessage()).build()); - } else { - log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable); - response.resume(new RestException(throwable)); - } - return null; - }); - }).exceptionally(error -> { - Throwable throwable = FutureUtil.unwrapCompletionException(error); - if (throwable instanceof RestException) { - // Unprocessable Entity - response.resume(Response - .status(((RestException) throwable).getResponse().getStatus(), throwable.getMessage()) - .build()); - } else { - log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable); - response.resume(new RestException(throwable)); - } - return null; - }); - } - public CompletableFuture postSchemaAsync(PostSchemaPayload payload, boolean authoritative) { return validateDestinationAndAdminOperationAsync(authoritative) .thenCompose(__ -> getSchemaCompatibilityStrategyAsync()) @@ -254,27 +141,6 @@ public CompletableFuture postSchemaAsync(PostSchemaPayload payloa }); } - public void testCompatibility(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); - - String schemaId = getSchemaId(); - - getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy -> pulsar() - .getSchemaRegistryService().isCompatible(schemaId, - SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false) - .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType())) - .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(), - schemaCompatibilityStrategy) - .thenAccept(isCompatible -> response.resume(Response.accepted() - .entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible) - .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build()) - .build()))) - .exceptionally(error -> { - response.resume(new RestException(FutureUtil.unwrapCompletionException(error))); - return null; - }); - } - public CompletableFuture> testCompatibilityAsync( PostSchemaPayload payload, boolean authoritative) { return validateDestinationAndAdminOperationAsync(authoritative) @@ -292,26 +158,6 @@ public CompletableFuture> testCompati }); } - public void getVersionBySchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) { - validateDestinationAndAdminOperation(authoritative); - - String schemaId = getSchemaId(); - - pulsar().getSchemaRegistryService() - .findSchemaVersion(schemaId, - SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false) - .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType())) - .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build()) - .thenAccept(version -> response.resume(Response.accepted() - .entity(LongSchemaVersionResponse.builder().version(version).build()).build())) - .exceptionally(error -> { - Throwable throwable = FutureUtil.unwrapCompletionException(error); - log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, throwable); - response.resume(new RestException(throwable)); - return null; - }); - } - public CompletableFuture getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) { return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA) .thenCompose(__ -> { @@ -349,25 +195,6 @@ private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(Sch } } - protected static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) { - if (isNull(error)) { - if (isNull(schema)) { - response.resume(Response.status( - Response.Status.NOT_FOUND.getStatusCode(), "Schema not found").build()); - } else if (schema.schema.isDeleted()) { - response.resume(Response.status( - Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted").build()); - } else { - response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON) - .entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build()); - } - } else { - log.error("Failed to get schema", error); - response.resume(new RestException(error)); - } - - } - protected GetSchemaResponse convertToSchemaResponse(SchemaAndMetadata schema) { if (isNull(schema)) { throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema not found"); @@ -389,40 +216,6 @@ protected GetAllVersionsSchemaResponse convertToAllVersionsSchemaResponse(List schemas, - Throwable error) { - if (isNull(error)) { - if (isNull(schemas)) { - response.resume(Response.status( - Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found").build()); - } else { - response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON) - .entity(GetAllVersionsSchemaResponse.builder() - .getSchemaResponses(schemas.stream() - .map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse) - .collect(Collectors.toList())) - .build()) - .build()); - } - } else { - log.error("Failed to get all schemas", error); - response.resume(new RestException(error)); - } - } - - private void validateDestinationAndAdminOperation(boolean authoritative) { - try { - validateAdminAccessForTenant(topicName.getTenant()); - validateTopicOwnership(topicName, authoritative); - } catch (RestException e) { - if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) { - throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage()); - } else { - throw e; - } - } - } - private CompletableFuture validateDestinationAndAdminOperationAsync(boolean authoritative) { return validateTopicOwnershipAsync(topicName, authoritative) .thenCompose(__ -> validateAdminAccessForTenantAsync(topicName.getTenant())); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java index c6e4239a3aa30..13bfba351216b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java @@ -38,14 +38,20 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.admin.impl.SchemasResourceBase; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse; +import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse; import org.apache.pulsar.common.protocol.schema.PostSchemaPayload; import org.apache.pulsar.common.protocol.schema.PostSchemaResponse; import org.apache.pulsar.common.schema.LongSchemaVersion; +import org.apache.pulsar.common.util.FutureUtil; @Path("/schemas") @Api( @@ -53,6 +59,7 @@ description = "Schemas related admin APIs", tags = "schemas" ) +@Slf4j public class SchemasResource extends SchemasResourceBase { public SchemasResource() { @@ -81,7 +88,16 @@ public void getSchema( @Suspended final AsyncResponse response ) { validateTopicName(tenant, cluster, namespace, topic); - getSchema(authoritative, response); + getSchemaAsync(authoritative) + .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata)) + .thenApply(response::resume) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get schema for topic {}", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @GET @@ -107,7 +123,17 @@ public void getSchema( @Suspended final AsyncResponse response ) { validateTopicName(tenant, cluster, namespace, topic); - getSchema(authoritative, version, response); + getSchemaAsync(authoritative, version) + .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata)) + .thenAccept(response::resume) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get schema for topic {} with version {}", + clientAppId(), topicName, version, ex); + } + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @GET @@ -132,7 +158,16 @@ public void getAllSchemas( @Suspended final AsyncResponse response ) { validateTopicName(tenant, cluster, namespace, topic); - getAllSchemas(authoritative, response); + getAllSchemasAsync(authoritative) + .thenApply(schemaAndMetadata -> convertToAllVersionsSchemaResponse(schemaAndMetadata)) + .thenAccept(response::resume) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get all schemas for topic {}", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @DELETE @@ -157,7 +192,17 @@ public void deleteSchema( @Suspended final AsyncResponse response ) { validateTopicName(tenant, cluster, namespace, topic); - deleteSchema(authoritative, response, force); + deleteSchemaAsync(authoritative, force) + .thenAccept(version -> { + response.resume(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build()); + }) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to delete schemas for topic {}", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @POST @@ -195,7 +240,25 @@ public void postSchema( @Suspended final AsyncResponse response ) { validateTopicName(tenant, cluster, namespace, topic); - postSchema(payload, authoritative, response); + postSchemaAsync(payload, authoritative) + .thenAccept(version -> response.resume(PostSchemaResponse.builder().version(version).build())) + .exceptionally(ex -> { + Throwable root = FutureUtil.unwrapCompletionException(ex); + if (root instanceof IncompatibleSchemaException) { + response.resume(Response + .status(Response.Status.CONFLICT.getStatusCode(), root.getMessage()) + .build()); + } else if (root instanceof InvalidSchemaDataException) { + response.resume(Response.status(422, /* Unprocessable Entity */ + root.getMessage()).build()); + } else { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to post schemas for topic {}", clientAppId(), topicName, root); + } + resumeAsyncResponseExceptionally(response, ex); + } + return null; + }); } @POST @@ -232,7 +295,18 @@ public void testCompatibility( @Suspended final AsyncResponse response ) { validateTopicName(tenant, cluster, namespace, topic); - testCompatibility(payload, authoritative, response); + testCompatibilityAsync(payload, authoritative) + .thenAccept(pair -> response.resume(Response.accepted() + .entity(IsCompatibilityResponse.builder().isCompatibility(pair.getLeft()) + .schemaCompatibilityStrategy(pair.getRight().name()).build()) + .build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to test compatibility for topic {}", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @POST @@ -270,6 +344,14 @@ public void getVersionBySchema( @Suspended final AsyncResponse response ) { validateTopicName(tenant, cluster, namespace, topic); - getVersionBySchema(payload, authoritative, response); + getVersionBySchemaAsync(payload, authoritative) + .thenAccept(version -> response.resume(LongSchemaVersionResponse.builder().version(version).build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } }