Skip to content

Commit

Permalink
[improve][admin]internalGetMessageById shouldn't be allowed on partit…
Browse files Browse the repository at this point in the history
…ioned topic (#19013)

Co-authored-by: gavingaozhangmin <gavingaozhangmin@didiglobal.com>
(cherry picked from commit b05fddb)
  • Loading branch information
gaozhangmin authored and lhotari committed Mar 6, 2024
1 parent e530c8b commit c9d2e26
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2763,60 +2763,65 @@ private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize,
return seekPosition;
}

protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId,
boolean authoritative) {
// will redirect if the topic not owned by current broker
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
.thenCompose(__ -> {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
return ret;
})
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
ManagedLedgerImpl ledger =
(ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger();
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId),
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
asyncResponse.resume(new RestException(exception));
}
protected CompletableFuture<Response> internalGetMessageById(long ledgerId, long entryId, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
return future.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(topicMetadata -> {
if (topicMetadata.partitions > 0) {
log.warn("[{}] Not supported getMessageById operation on partitioned-topic {}",
clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"GetMessageById is not allowed on partitioned-topic");
}
});

@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
asyncResponse.resume(generateResponseWithEntry(entry));
} catch (IOException exception) {
asyncResponse.resume(new RestException(exception));
} finally {
if (entry != null) {
entry.release();
}
}
}
})
.thenCompose(ignore -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
CompletableFuture<Response> results = new CompletableFuture<>();
ManagedLedgerImpl ledger =
(ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger();
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId),
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
throw new RestException(exception);
}

@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
results.complete(generateResponseWithEntry(entry));
} catch (IOException exception) {
throw new RestException(exception);
} finally {
if (entry != null) {
entry.release();
}
}
}

@Override
public String toString() {
return String.format("Topic [%s] internal get message by id",
PersistentTopicsBase.this.topicName);
}
}, null);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}, null);
return results;
});
}

protected CompletableFuture<MessageId> internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,14 +814,18 @@ public void getMessageByID(@Suspended final AsyncResponse asyncResponse, @PathPa
@PathParam("topic") @Encoded String encodedTopic, @PathParam("ledgerId") Long ledgerId,
@PathParam("entryId") Long entryId,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetMessageById(ledgerId, entryId, authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1886,14 +1886,18 @@ public void getMessageById(
@PathParam("entryId") long entryId,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateTopicName(tenant, namespace, encodedTopic);
internalGetMessageById(ledgerId, entryId, authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down

0 comments on commit c9d2e26

Please sign in to comment.