From c9d2e2663003532cb6e1e1a6f53c2b3c2449a880 Mon Sep 17 00:00:00 2001 From: gaozhangmin Date: Mon, 9 Jan 2023 19:39:09 +0800 Subject: [PATCH] [improve][admin]internalGetMessageById shouldn't be allowed on partitioned topic (#19013) Co-authored-by: gavingaozhangmin (cherry picked from commit b05fddb1af03456438f27217dc6979be00fac19e) --- .../admin/impl/PersistentTopicsBase.java | 97 ++++++++++--------- .../broker/admin/v1/PersistentTopics.java | 20 ++-- .../broker/admin/v2/PersistentTopics.java | 20 ++-- 3 files changed, 75 insertions(+), 62 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d31092c582945..b5609892b092d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2763,60 +2763,65 @@ private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize, return seekPosition; } - protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId, - boolean authoritative) { - // will redirect if the topic not owned by current broker - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)) - .thenCompose(__ -> { - CompletableFuture ret; - if (topicName.isGlobal()) { - ret = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - ret = CompletableFuture.completedFuture(null); - } - return ret; - }) - .thenCompose(__ -> getTopicReferenceAsync(topicName)) - .thenAccept(topic -> { - ManagedLedgerImpl ledger = - (ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger(); - ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), - new AsyncCallbacks.ReadEntryCallback() { - @Override - public void readEntryFailed(ManagedLedgerException exception, - Object ctx) { - asyncResponse.resume(new RestException(exception)); - } + protected CompletableFuture internalGetMessageById(long ledgerId, long entryId, boolean authoritative) { + CompletableFuture future; + if (topicName.isGlobal()) { + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); + } + return future.thenCompose(__ -> { + if (topicName.isPartitioned()) { + return CompletableFuture.completedFuture(null); + } else { + return getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenAccept(topicMetadata -> { + if (topicMetadata.partitions > 0) { + log.warn("[{}] Not supported getMessageById operation on partitioned-topic {}", + clientAppId(), topicName); + throw new RestException(Status.METHOD_NOT_ALLOWED, + "GetMessageById is not allowed on partitioned-topic"); + } + }); - @Override - public void readEntryComplete(Entry entry, Object ctx) { - try { - asyncResponse.resume(generateResponseWithEntry(entry)); - } catch (IOException exception) { - asyncResponse.resume(new RestException(exception)); - } finally { - if (entry != null) { - entry.release(); - } - } + } + }) + .thenCompose(ignore -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenCompose(topic -> { + CompletableFuture results = new CompletableFuture<>(); + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger(); + ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), + new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryFailed(ManagedLedgerException exception, + Object ctx) { + throw new RestException(exception); + } + + @Override + public void readEntryComplete(Entry entry, Object ctx) { + try { + results.complete(generateResponseWithEntry(entry)); + } catch (IOException exception) { + throw new RestException(exception); + } finally { + if (entry != null) { + entry.release(); } + } + } @Override public String toString() { return String.format("Topic [%s] internal get message by id", PersistentTopicsBase.this.topicName); } - }, null); - }).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", - clientAppId(), ledgerId, entryId, topicName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + }, null); + return results; + }); } protected CompletableFuture internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 8e3f3adbfeeb3..45ee532685ff8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -814,14 +814,18 @@ public void getMessageByID(@Suspended final AsyncResponse asyncResponse, @PathPa @PathParam("topic") @Encoded String encodedTopic, @PathParam("ledgerId") Long ledgerId, @PathParam("entryId") Long entryId, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateTopicName(property, cluster, namespace, encodedTopic); - internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateTopicName(property, cluster, namespace, encodedTopic); + internalGetMessageById(ledgerId, entryId, authoritative) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", + clientAppId(), ledgerId, entryId, topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 23f887e84d2b1..ebb4f3a34de4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1886,14 +1886,18 @@ public void getMessageById( @PathParam("entryId") long entryId, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateTopicName(tenant, namespace, encodedTopic); - internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateTopicName(tenant, namespace, encodedTopic); + internalGetMessageById(ledgerId, entryId, authoritative) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", + clientAppId(), ledgerId, entryId, topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET