Skip to content

Commit

Permalink
[fix] [log] Do not print error log if tenant/namespace does not exist…
Browse files Browse the repository at this point in the history
… when calling get topic metadata (apache#23291)
  • Loading branch information
poorbarcode authored Sep 27, 2024
1 parent b1c5d96 commit 5583102
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -672,8 +674,6 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), requestId);
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
topicName, ex.getMessage(), ex);
ServerError error = ServerError.ServiceNotReady;
if (ex instanceof MetadataStoreException) {
error = ServerError.MetadataError;
Expand All @@ -685,6 +685,14 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
error = ServerError.MetadataError;
}
}
if (error == ServerError.TopicNotFound) {
log.info("Trying to get Partitioned Metadata for a resource not exist"
+ "[{}] {}: {}", remoteAddress,
topicName, ex.getMessage());
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}",
remoteAddress, topicName, ex.getMessage(), ex);
}
commandSender.sendPartitionMetadataResponse(error, ex.getMessage(),
requestId);
}
Expand All @@ -702,6 +710,16 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "partition-metadata", getPrincipal(), Optional.of(topicName), ex);
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof WebApplicationException restException) {
if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
"Tenant or namespace or topic does not exist: " + topicName.getNamespace() ,
requestId));
lookupSemaphore.release();
return null;
}
}
final String msg = "Exception occurred while trying to authorize get Partition Metadata";
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
requestId));
Expand Down Expand Up @@ -3663,13 +3681,22 @@ protected void messageReceived() {
private static void logAuthException(SocketAddress remoteAddress, String operation,
String principal, Optional<TopicName> topic, Throwable ex) {
String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
if (ex instanceof AuthenticationException) {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof AuthenticationException) {
log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
remoteAddress, operation, principal, topicString, ex.getMessage());
} else {
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
remoteAddress, operation, principal, topicString, ex);
remoteAddress, operation, principal, topicString, actEx.getMessage());
return;
} else if (actEx instanceof WebApplicationException restException){
// Do not print error log if users tries to access a not found resource.
if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
log.info("[{}] Trying to authenticate for a topic which under a namespace not exists: operation={},"
+ " principal={}{}, reason: {}",
remoteAddress, operation, principal, topicString, actEx.getMessage());
return;
}
}
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
remoteAddress, operation, principal, topicString, ex);
}

private static void logNamespaceNameAuthException(SocketAddress remoteAddress, String operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,4 +578,55 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}

@Test(dataProvider = "topicDomains")
public void testNamespaceNotExist(TopicDomain topicDomain) throws Exception {
int lookupPermitsBefore = getLookupRequestPermits();
final String namespaceNotExist = BrokerTestUtil.newUniqueName("public/ns");
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist + "/tp");
PulsarClientImpl[] clientArray = getClientsToTest(false);
for (PulsarClientImpl client : clientArray) {
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, true, true)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
fail("Expected a not found ex");
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException ||
unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
}
}
// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}

@Test(dataProvider = "topicDomains")
public void testTenantNotExist(TopicDomain topicDomain) throws Exception {
int lookupPermitsBefore = getLookupRequestPermits();
final String tenantNotExist = BrokerTestUtil.newUniqueName("tenant");
final String namespaceNotExist = BrokerTestUtil.newUniqueName(tenantNotExist + "/default");
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist + "/tp");
PulsarClientImpl[] clientArray = getClientsToTest(false);
for (PulsarClientImpl client : clientArray) {
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, true, true)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
fail("Expected a not found ex");
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException ||
unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
}
}
// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}
}

0 comments on commit 5583102

Please sign in to comment.