Skip to content

Commit

Permalink
[fix][broker] Pass subscriptionName to auth service (apache#17123) (a…
Browse files Browse the repository at this point in the history
…pache#19423)

Co-authored-by: Michael Marshall <mmarshall@apache.org>

### Motivation

Cherry-pick apache#17123

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads (10MB)*
  - *Extended integration test for recovery after broker failure*

### Does this pull request potentially affect one of the following parts:

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

*If the box was checked, please highlight the changes*

- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

### Matching PR in forked repository

PR in forked repository: <!-- ENTER URL HERE -->

<!--
After opening this PR, the build in apache/pulsar will fail and instructions will
be provided for opening a PR in the PR author's forked repository.

apache/pulsar pull requests should be first tested in your own fork since the 
apache/pulsar CI based on GitHub Actions has constrained resources and quota.
GitHub Actions provides separate quota for pull requests that are executed in 
a forked repository.

The tests will be run in the forked repository until all PR review comments have
been handled, the tests pass and the PR is approved by a reviewer.
-->
  • Loading branch information
liangyepianzhou authored Feb 3, 2023
1 parent a28ee0e commit 503457b
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1608,7 +1608,7 @@ protected void internalDeleteSubscription(AsyncResponse asyncResponse, String su
private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE))
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
Expand Down Expand Up @@ -1641,7 +1641,7 @@ private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncRes
String subName, Map<String, String> subscriptionProperties,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
Expand Down Expand Up @@ -1740,7 +1740,7 @@ protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE))
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
Expand Down Expand Up @@ -1883,7 +1883,7 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName,
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
Expand Down Expand Up @@ -2289,7 +2289,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic(

validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
validateTopicOperation(topicName, TopicOperation.SUBSCRIBE);
validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName);
return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation);
}).thenApply(optTopic -> {
if (optTopic.isPresent()) {
Expand Down Expand Up @@ -2687,7 +2687,7 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
}

validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES, subName);

if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
Expand Down Expand Up @@ -3575,7 +3575,7 @@ protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, St
}
future.thenCompose(__ ->
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(unused2 ->
// If the topic name is a partition name, no need to get partition topic metadata again
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
Expand Down Expand Up @@ -3724,7 +3724,7 @@ protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, Str
}

future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(__ -> {
log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
Expand Down

0 comments on commit 503457b

Please sign in to comment.