Skip to content

Commit

Permalink
[improve][broker] Add fine-grain authorization to ns/topic management…
Browse files Browse the repository at this point in the history
… endpoints (#22305)

(cherry picked from commit fd34d4a)

# Conflicts:
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
  • Loading branch information
Technoboy- authored and lhotari committed Mar 20, 2024
1 parent a013f4a commit 99eb49a
Show file tree
Hide file tree
Showing 6 changed files with 683 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
case COMPACT:
case OFFLOAD:
case UNLOAD:
case TRIM_TOPIC:
case DELETE_METADATA:
case UPDATE_METADATA:
case ADD_BUNDLE_RANGE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
Expand Down Expand Up @@ -711,10 +709,7 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(int numPartitions,
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
return getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility strategy of topic {} {}",
clientAppId(), topicName, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2354,102 +2354,110 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) {
}

protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
updatePoliciesAsync(namespaceName, policies -> {
policies.properties.put(key, value);
return policies;
}).thenAccept(v -> {
log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key,
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.put(key, value);
return policies;
}))
.thenAccept(v -> {
log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key,
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalSetProperties(Map<String, String> properties, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
updatePoliciesAsync(namespaceName, policies -> {
policies.properties.putAll(properties);
return policies;
}).thenAccept(v -> {
log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.putAll(properties);
return policies;
}))
.thenAccept(v -> {
log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalGetProperty(String key, AsyncResponse asyncResponse) {
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
asyncResponse.resume(policies.properties.get(key));
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties.get(key)))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalGetProperties(AsyncResponse asyncResponse) {
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
asyncResponse.resume(policies.properties);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();

AtomicReference<String> oldVal = new AtomicReference<>(null);
updatePoliciesAsync(namespaceName, policies -> {
oldVal.set(policies.properties.remove(key));
return policies;
}).thenAccept(v -> {
asyncResponse.resume(oldVal.get());
log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key,
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
oldVal.set(policies.properties.remove(key));
return policies;
})).thenAccept(v -> {
asyncResponse.resume(oldVal.get());
log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key,
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalClearProperties(AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
AtomicReference<Integer> clearedCount = new AtomicReference<>(0);
updatePoliciesAsync(namespaceName, policies -> {
clearedCount.set(policies.properties.size());
policies.properties.clear();
return policies;
}).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
clearedCount.set(policies.properties.size());
policies.properties.clear();
return policies;
}))
.thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

private CompletableFuture<Void> updatePoliciesAsync(NamespaceName ns, Function<Policies, Policies> updateFunction) {
Expand Down
Loading

0 comments on commit 99eb49a

Please sign in to comment.