Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Authorization] Fix producer/consume permission can’t get v1/schema #16018

Merged
merged 2 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,23 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -88,33 +79,12 @@ private String getSchemaId() {
}
}

public void getSchema(boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> {
handleGetSchemaResponse(response, schema, error);
return null;
});
}

public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenApply(__ -> getSchemaId())
.thenCompose(schemaId -> pulsar().getSchemaRegistryService().getSchema(schemaId));
}

public void getSchema(boolean authoritative, String version, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
String schemaId = getSchemaId();
ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
bbVersion.putLong(Long.parseLong(version));
SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
pulsar().getSchemaRegistryService().getSchema(schemaId, v).handle((schema, error) -> {
handleGetSchemaResponse(response, schema, error);
return null;
});
}

public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative, String version) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenApply(__ -> getSchemaId())
Expand All @@ -127,16 +97,6 @@ public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative
});
}

public void getAllSchemas(boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> {
handleGetAllSchemasResponse(response, schema, error);
return null;
});
}

public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
Expand All @@ -145,24 +105,6 @@ public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean aut
});
}

public void deleteSchema(boolean authoritative, AsyncResponse response, boolean force) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""), force)
.handle((version, error) -> {
if (isNull(error)) {
response.resume(Response.ok()
.entity(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build())
.build());
} else {
log.error("[{}] Failed to delete schema for topic {}", clientAppId(), topicName, error);
response.resume(new RestException(error));
}
return null;
});
}

public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> {
Expand All @@ -172,61 +114,6 @@ public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative,
});
}

public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> {
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
try {
data = DefaultImplementation.getDefaultImplementation()
.convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8));
} catch (IOException conversionError) {
log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, conversionError);
response.resume(new RestException(conversionError));
return;
}
} else {
data = payload.getSchema().getBytes(Charsets.UTF_8);
}
pulsar().getSchemaRegistryService()
.putSchemaIfAbsent(getSchemaId(),
SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType())).user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(version -> response.resume(
Response.accepted().entity(PostSchemaResponse.builder().version(version).build()).build()))
.exceptionally(error -> {
Throwable throwable = FutureUtil.unwrapCompletionException(error);
if (throwable instanceof IncompatibleSchemaException) {
response.resume(Response
.status(Response.Status.CONFLICT.getStatusCode(), throwable.getMessage())
.build());
} else if (throwable instanceof InvalidSchemaDataException) {
response.resume(Response.status(422, /* Unprocessable Entity */
throwable.getMessage()).build());
} else {
log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
response.resume(new RestException(throwable));
}
return null;
});
}).exceptionally(error -> {
Throwable throwable = FutureUtil.unwrapCompletionException(error);
if (throwable instanceof RestException) {
// Unprocessable Entity
response.resume(Response
.status(((RestException) throwable).getResponse().getStatus(), throwable.getMessage())
.build());
} else {
log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
response.resume(new RestException(throwable));
}
return null;
});
}

public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
Expand Down Expand Up @@ -254,27 +141,6 @@ public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payloa
});
}

public void testCompatibility(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();

getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy -> pulsar()
.getSchemaRegistryService().isCompatible(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(isCompatible -> response.resume(Response.accepted()
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
.build())))
.exceptionally(error -> {
response.resume(new RestException(FutureUtil.unwrapCompletionException(error)));
return null;
});
}

public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompatibilityAsync(
PostSchemaPayload payload, boolean authoritative) {
return validateDestinationAndAdminOperationAsync(authoritative)
Expand All @@ -292,26 +158,6 @@ public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompati
});
}

public void getVersionBySchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();

pulsar().getSchemaRegistryService()
.findSchemaVersion(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build())
.thenAccept(version -> response.resume(Response.accepted()
.entity(LongSchemaVersionResponse.builder().version(version).build()).build()))
.exceptionally(error -> {
Throwable throwable = FutureUtil.unwrapCompletionException(error);
log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, throwable);
response.resume(new RestException(throwable));
return null;
});
}

public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
Expand Down Expand Up @@ -349,25 +195,6 @@ private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(Sch
}
}

protected static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
if (isNull(error)) {
if (isNull(schema)) {
response.resume(Response.status(
Response.Status.NOT_FOUND.getStatusCode(), "Schema not found").build());
} else if (schema.schema.isDeleted()) {
response.resume(Response.status(
Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted").build());
} else {
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
.entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
}
} else {
log.error("Failed to get schema", error);
response.resume(new RestException(error));
}

}

protected GetSchemaResponse convertToSchemaResponse(SchemaAndMetadata schema) {
if (isNull(schema)) {
throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema not found");
Expand All @@ -389,40 +216,6 @@ protected GetAllVersionsSchemaResponse convertToAllVersionsSchemaResponse(List<S
}
}

private static void handleGetAllSchemasResponse(AsyncResponse response, List<SchemaAndMetadata> schemas,
Throwable error) {
if (isNull(error)) {
if (isNull(schemas)) {
response.resume(Response.status(
Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found").build());
} else {
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
.entity(GetAllVersionsSchemaResponse.builder()
.getSchemaResponses(schemas.stream()
.map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
.collect(Collectors.toList()))
.build())
.build());
}
} else {
log.error("Failed to get all schemas", error);
response.resume(new RestException(error));
}
}

private void validateDestinationAndAdminOperation(boolean authoritative) {
try {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
} catch (RestException e) {
if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage());
} else {
throw e;
}
}
}

private CompletableFuture<Void> validateDestinationAndAdminOperationAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateAdminAccessForTenantAsync(topicName.getTenant()));
Expand Down
Loading