Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Add fine-grain authorization to ns/topic management endpoints #22305

Merged
merged 7 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
case COMPACT:
case OFFLOAD:
case UNLOAD:
case TRIM_TOPIC:
case DELETE_METADATA:
case UPDATE_METADATA:
case ADD_BUNDLE_RANGE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
Expand Down Expand Up @@ -714,10 +712,7 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(int numPartitions,
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
return getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility strategy of topic {} {}",
clientAppId(), topicName, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2339,102 +2339,110 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) {
}

protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
updatePoliciesAsync(namespaceName, policies -> {
policies.properties.put(key, value);
return policies;
}).thenAccept(v -> {
log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key,
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.put(key, value);
return policies;
}))
.thenAccept(v -> {
log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key,
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalSetProperties(Map<String, String> properties, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
updatePoliciesAsync(namespaceName, policies -> {
policies.properties.putAll(properties);
return policies;
}).thenAccept(v -> {
log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.putAll(properties);
return policies;
}))
.thenAccept(v -> {
log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalGetProperty(String key, AsyncResponse asyncResponse) {
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
asyncResponse.resume(policies.properties.get(key));
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties.get(key)))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalGetProperties(AsyncResponse asyncResponse) {
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
asyncResponse.resume(policies.properties);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();

AtomicReference<String> oldVal = new AtomicReference<>(null);
updatePoliciesAsync(namespaceName, policies -> {
oldVal.set(policies.properties.remove(key));
return policies;
}).thenAccept(v -> {
asyncResponse.resume(oldVal.get());
log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key,
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
oldVal.set(policies.properties.remove(key));
return policies;
})).thenAccept(v -> {
asyncResponse.resume(oldVal.get());
log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key,
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalClearProperties(AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
AtomicReference<Integer> clearedCount = new AtomicReference<>(0);
updatePoliciesAsync(namespaceName, policies -> {
clearedCount.set(policies.properties.size());
policies.properties.clear();
return policies;
}).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
clearedCount.set(policies.properties.size());
policies.properties.clear();
return policies;
}))
.thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

private CompletableFuture<Void> updatePoliciesAsync(NamespaceName ns, Function<Policies, Policies> updateFunction) {
Expand Down
Loading
Loading