diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5b67b01115e7c..aedd68d416fe7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -63,6 +63,8 @@ import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -672,8 +674,6 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, ex.getMessage(), requestId); } else { - log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, - topicName, ex.getMessage(), ex); ServerError error = ServerError.ServiceNotReady; if (ex instanceof MetadataStoreException) { error = ServerError.MetadataError; @@ -685,6 +685,14 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa error = ServerError.MetadataError; } } + if (error == ServerError.TopicNotFound) { + log.info("Trying to get Partitioned Metadata for a resource not exist" + + "[{}] {}: {}", remoteAddress, + topicName, ex.getMessage()); + } else { + log.warn("Failed to get Partitioned Metadata [{}] {}: {}", + remoteAddress, topicName, ex.getMessage(), ex); + } commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), requestId); } @@ -702,6 +710,16 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa return null; }).exceptionally(ex -> { logAuthException(remoteAddress, "partition-metadata", getPrincipal(), Optional.of(topicName), ex); + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof WebApplicationException restException) { + if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) { + writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError, + "Tenant or namespace or topic does not exist: " + topicName.getNamespace() , + requestId)); + lookupSemaphore.release(); + return null; + } + } final String msg = "Exception occurred while trying to authorize get Partition Metadata"; writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId)); @@ -3663,13 +3681,22 @@ protected void messageReceived() { private static void logAuthException(SocketAddress remoteAddress, String operation, String principal, Optional topic, Throwable ex) { String topicString = topic.map(t -> ", topic=" + t.toString()).orElse(""); - if (ex instanceof AuthenticationException) { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof AuthenticationException) { log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}", - remoteAddress, operation, principal, topicString, ex.getMessage()); - } else { - log.error("[{}] Error trying to authenticate: operation={}, principal={}{}", - remoteAddress, operation, principal, topicString, ex); + remoteAddress, operation, principal, topicString, actEx.getMessage()); + return; + } else if (actEx instanceof WebApplicationException restException){ + // Do not print error log if users tries to access a not found resource. + if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) { + log.info("[{}] Trying to authenticate for a topic which under a namespace not exists: operation={}," + + " principal={}{}, reason: {}", + remoteAddress, operation, principal, topicString, actEx.getMessage()); + return; + } } + log.error("[{}] Error trying to authenticate: operation={}, principal={}{}", + remoteAddress, operation, principal, topicString, ex); } private static void logNamespaceNameAuthException(SocketAddress remoteAddress, String operation, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index 87bc4267b48a3..e9a639697d9ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -578,4 +578,55 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config assertEquals(getLookupRequestPermits(), lookupPermitsBefore); }); } + + @Test(dataProvider = "topicDomains") + public void testNamespaceNotExist(TopicDomain topicDomain) throws Exception { + int lookupPermitsBefore = getLookupRequestPermits(); + final String namespaceNotExist = BrokerTestUtil.newUniqueName("public/ns"); + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist + "/tp"); + PulsarClientImpl[] clientArray = getClientsToTest(false); + for (PulsarClientImpl client : clientArray) { + try { + PartitionedTopicMetadata topicMetadata = client + .getPartitionedTopicMetadata(topicNameStr, true, true) + .join(); + log.info("Get topic metadata: {}", topicMetadata.partitions); + fail("Expected a not found ex"); + } catch (Exception ex) { + Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex); + assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException || + unwrapEx instanceof PulsarClientException.TopicDoesNotExistException); + } + } + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + } + + @Test(dataProvider = "topicDomains") + public void testTenantNotExist(TopicDomain topicDomain) throws Exception { + int lookupPermitsBefore = getLookupRequestPermits(); + final String tenantNotExist = BrokerTestUtil.newUniqueName("tenant"); + final String namespaceNotExist = BrokerTestUtil.newUniqueName(tenantNotExist + "/default"); + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist + "/tp"); + PulsarClientImpl[] clientArray = getClientsToTest(false); + for (PulsarClientImpl client : clientArray) { + try { + PartitionedTopicMetadata topicMetadata = client + .getPartitionedTopicMetadata(topicNameStr, true, true) + .join(); + log.info("Get topic metadata: {}", topicMetadata.partitions); + fail("Expected a not found ex"); + } catch (Exception ex) { + Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex); + assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException || + unwrapEx instanceof PulsarClientException.TopicDoesNotExistException); + } + } + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + } }