Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] Refresh ns policy when deciding auto topic creation eligibility #19097

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,15 @@ public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
}

public CompletableFuture<Optional<Policies>> getPoliciesAsync(NamespaceName ns) {
return getCache().get(joinPath(BASE_POLICIES_PATH, ns.toString()));
return getPoliciesAsync(ns, false);
}

public CompletableFuture<Optional<Policies>> getPoliciesAsync(NamespaceName ns, boolean refresh) {
if (refresh) {
return refreshAndGetAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
} else {
return getAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
}

public void setPolicies(NamespaceName ns, Function<Policies, Policies> function) throws MetadataStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,45 @@

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class NamespaceResourcesTest {

private MetadataStore mockMetadataStore;
private NamespaceResources namespaceResources;

@BeforeMethod
public void setUp() {
mockMetadataStore = Mockito.mock(MetadataStore.class);
Mockito.doReturn(Mockito.mock(MetadataCache.class)).when(mockMetadataStore).getMetadataCache(Policies.class);
Mockito.doReturn(CompletableFuture.completedFuture(null)).when(mockMetadataStore).sync(Mockito.anyString());
namespaceResources = new NamespaceResources(mockMetadataStore, 0);
}
@Test
public void test_pathIsFromNamespace() {
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies"));
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant"));
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
}

@Test
public void testGetPolicesAsync() {
namespaceResources.getPoliciesAsync(NamespaceName.get("public/default"));
Mockito.verify(mockMetadataStore, Mockito.never()).sync(Mockito.anyString());
}

@Test
public void testGetPolicesAsyncAndRefresh() {
namespaceResources.getPoliciesAsync(NamespaceName.get("public/default"), true);
Mockito.verify(mockMetadataStore, Mockito.times(1)).sync("/admin/policies/public/default");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadat
.thenCompose(__ -> {
if (checkAllowAutoCreation) {
return pulsar().getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName);
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName, true);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
Expand All @@ -87,7 +86,6 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
Expand Down Expand Up @@ -2236,7 +2234,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
} else {
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName, true)
.thenCompose(allowAutoTopicCreation -> getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
Expand Down Expand Up @@ -2338,7 +2336,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic(

validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subscriptionName))
.thenCompose(__ -> pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName))
.thenCompose(__ -> pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName, true))
.thenCompose(isAllowAutoTopicCreation -> pulsar().getBrokerService()
.getTopic(topicName.toString(), isAllowAutoTopicCreation))
.thenApply(optTopic -> {
Expand Down Expand Up @@ -4217,62 +4215,6 @@ protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean author
});
}

public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used anywhere, removed it to minimize the code paths we need to think about.

PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
.thenRun(() -> authorizationFuture.complete(null))
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
validateAdminAccessForTenantAsync(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
.thenRun(() -> {
authorizationFuture.complete(null);
}).exceptionally(ex -> {
Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
if (throwable2 instanceof RestException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
authorizationFuture.completeExceptionally(new PulsarClientException(
String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName, throwable2.getMessage())));
} else {
authorizationFuture.completeExceptionally(throwable2);
}
return null;
});
} else {
// throw without wrapping to PulsarClientException that considers: unknown error marked as
// internal server error
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
authorizationFuture.completeExceptionally(throwable);
}
return null;
});

// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
authorizationFuture.thenCompose(__ ->
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
.thenCompose(res ->
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
})
.exceptionally(e -> {
metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
return metadataFuture;
}

/**
* Get partitioned topic metadata without checking the permission.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topic
? CompletableFuture.completedFuture(true)
: pulsar().getNamespaceService().checkTopicExists(topicName)
.thenCompose(exists -> exists ? CompletableFuture.completedFuture(true)
: pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName));
: pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName, true));

return existFuture;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,6 @@ public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean cl
public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) {
return getTopic(topic, false /* createIfMissing */);
}

public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
return isAllowAutoTopicCreationAsync(topic)
.thenCompose(isAllowed -> getTopic(topic, isAllowed))
Expand Down Expand Up @@ -2908,12 +2907,31 @@ private void createPendingLoadTopic() {

public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
TopicName topicName) {
return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName, false);
}

public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
TopicName topicName, boolean refresh) {
if (pulsar.getNamespaceService() == null) {
return FutureUtil.failedFuture(new NamingException("namespace service is not ready"));
}
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());

if (refresh) {
return pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject(), true)
.thenCompose(policies ->
fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName, policies));
} else {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName, policies);
}
}

private CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
TopicName topicName, Optional<Policies> policies) {

return pulsar.getNamespaceService().checkTopicExists(topicName)
.thenCompose(topicExists -> {
return fetchPartitionedTopicMetadataAsync(topicName)
Expand Down Expand Up @@ -3155,14 +3173,20 @@ public Optional<Integer> getListenPortTls() {

public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoTopicCreationAsync(topicName);
return isAllowAutoTopicCreationAsync(topicName, false);
}

public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName topicName) {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
return isAllowAutoTopicCreationAsync(topicName, policies);
public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName topicName, final boolean refresh) {
if (refresh) {
return pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject(), true)
.thenCompose(policies -> isAllowAutoTopicCreationAsync(topicName, policies));
} else {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
return isAllowAutoTopicCreationAsync(topicName, policies);
}
}

private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.lookup.http;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -105,7 +106,7 @@ public void setUp() throws Exception {
doReturn(auth).when(brokerService).getAuthorizationService();
doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore();
doReturn(CompletableFuture.completedFuture(false)).when(brokerService)
.isAllowAutoTopicCreationAsync(any(TopicName.class));
.isAllowAutoTopicCreationAsync(any(TopicName.class), eq(true));
}

@Test
Expand Down