Skip to content

Commit

Permalink
[Authorization] Fix producer/consume permission can’t get v1/schema (#…
Browse files Browse the repository at this point in the history
…16018)

* [Authorization] Fix producer/consume permission can’t get v1/schema
  • Loading branch information
yuruguo authored Aug 22, 2022
1 parent 43759d2 commit 95c1778
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,23 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -88,33 +79,12 @@ private String getSchemaId() {
}
}

public void getSchema(boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> {
handleGetSchemaResponse(response, schema, error);
return null;
});
}

public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenApply(__ -> getSchemaId())
.thenCompose(schemaId -> pulsar().getSchemaRegistryService().getSchema(schemaId));
}

public void getSchema(boolean authoritative, String version, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
String schemaId = getSchemaId();
ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
bbVersion.putLong(Long.parseLong(version));
SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
pulsar().getSchemaRegistryService().getSchema(schemaId, v).handle((schema, error) -> {
handleGetSchemaResponse(response, schema, error);
return null;
});
}

public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative, String version) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenApply(__ -> getSchemaId())
Expand All @@ -127,16 +97,6 @@ public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative
});
}

public void getAllSchemas(boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> {
handleGetAllSchemasResponse(response, schema, error);
return null;
});
}

public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
Expand All @@ -145,24 +105,6 @@ public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean aut
});
}

public void deleteSchema(boolean authoritative, AsyncResponse response, boolean force) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""), force)
.handle((version, error) -> {
if (isNull(error)) {
response.resume(Response.ok()
.entity(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build())
.build());
} else {
log.error("[{}] Failed to delete schema for topic {}", clientAppId(), topicName, error);
response.resume(new RestException(error));
}
return null;
});
}

public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> {
Expand All @@ -172,61 +114,6 @@ public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative,
});
}

public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> {
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
try {
data = DefaultImplementation.getDefaultImplementation()
.convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8));
} catch (IOException conversionError) {
log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, conversionError);
response.resume(new RestException(conversionError));
return;
}
} else {
data = payload.getSchema().getBytes(Charsets.UTF_8);
}
pulsar().getSchemaRegistryService()
.putSchemaIfAbsent(getSchemaId(),
SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType())).user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(version -> response.resume(
Response.accepted().entity(PostSchemaResponse.builder().version(version).build()).build()))
.exceptionally(error -> {
Throwable throwable = FutureUtil.unwrapCompletionException(error);
if (throwable instanceof IncompatibleSchemaException) {
response.resume(Response
.status(Response.Status.CONFLICT.getStatusCode(), throwable.getMessage())
.build());
} else if (throwable instanceof InvalidSchemaDataException) {
response.resume(Response.status(422, /* Unprocessable Entity */
throwable.getMessage()).build());
} else {
log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
response.resume(new RestException(throwable));
}
return null;
});
}).exceptionally(error -> {
Throwable throwable = FutureUtil.unwrapCompletionException(error);
if (throwable instanceof RestException) {
// Unprocessable Entity
response.resume(Response
.status(((RestException) throwable).getResponse().getStatus(), throwable.getMessage())
.build());
} else {
log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
response.resume(new RestException(throwable));
}
return null;
});
}

public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
Expand Down Expand Up @@ -254,27 +141,6 @@ public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payloa
});
}

public void testCompatibility(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();

getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy -> pulsar()
.getSchemaRegistryService().isCompatible(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(isCompatible -> response.resume(Response.accepted()
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
.build())))
.exceptionally(error -> {
response.resume(new RestException(FutureUtil.unwrapCompletionException(error)));
return null;
});
}

public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompatibilityAsync(
PostSchemaPayload payload, boolean authoritative) {
return validateDestinationAndAdminOperationAsync(authoritative)
Expand All @@ -292,26 +158,6 @@ public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompati
});
}

public void getVersionBySchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();

pulsar().getSchemaRegistryService()
.findSchemaVersion(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build())
.thenAccept(version -> response.resume(Response.accepted()
.entity(LongSchemaVersionResponse.builder().version(version).build()).build()))
.exceptionally(error -> {
Throwable throwable = FutureUtil.unwrapCompletionException(error);
log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, throwable);
response.resume(new RestException(throwable));
return null;
});
}

public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
Expand Down Expand Up @@ -349,25 +195,6 @@ private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(Sch
}
}

protected static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
if (isNull(error)) {
if (isNull(schema)) {
response.resume(Response.status(
Response.Status.NOT_FOUND.getStatusCode(), "Schema not found").build());
} else if (schema.schema.isDeleted()) {
response.resume(Response.status(
Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted").build());
} else {
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
.entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
}
} else {
log.error("Failed to get schema", error);
response.resume(new RestException(error));
}

}

protected GetSchemaResponse convertToSchemaResponse(SchemaAndMetadata schema) {
if (isNull(schema)) {
throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema not found");
Expand All @@ -389,40 +216,6 @@ protected GetAllVersionsSchemaResponse convertToAllVersionsSchemaResponse(List<S
}
}

private static void handleGetAllSchemasResponse(AsyncResponse response, List<SchemaAndMetadata> schemas,
Throwable error) {
if (isNull(error)) {
if (isNull(schemas)) {
response.resume(Response.status(
Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found").build());
} else {
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
.entity(GetAllVersionsSchemaResponse.builder()
.getSchemaResponses(schemas.stream()
.map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
.collect(Collectors.toList()))
.build())
.build());
}
} else {
log.error("Failed to get all schemas", error);
response.resume(new RestException(error));
}
}

private void validateDestinationAndAdminOperation(boolean authoritative) {
try {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
} catch (RestException e) {
if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage());
} else {
throw e;
}
}
}

private CompletableFuture<Void> validateDestinationAndAdminOperationAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateAdminAccessForTenantAsync(topicName.getTenant()));
Expand Down
Loading

0 comments on commit 95c1778

Please sign in to comment.