Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Dispatcher will be closing and reopening when Key_shared consumers have different policies (AUTO_SPLIT/STICKY, allowOutOfOrderDelivery true/false) #23272

Closed
2 of 3 tasks
lhotari opened this issue Sep 9, 2024 · 2 comments · Fixed by #23449
Labels
category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost type/bug The PR fixed a bug or issue reported a bug

Comments

@lhotari
Copy link
Member

lhotari commented Sep 9, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

all released versions including master branch

Minimal reproduce step

The conclusion is currently based on the source code:

case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| !((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
.hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
}
break;
default:
return FutureUtil.failedFuture(
new ServerMetadataException("Unsupported subscription type"));
}
if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
}

What did you expect to see?

When multiple consumers are using different policies, this should be properly handled.
One possibility is to keep the policy of the connected consumers and reject any other consumers and return a proper error message.

What did you see instead?

Based on the source code, it looks like the solution cannot work:

  • a new dispatcher is created while the existing dispatcher is active
  • the existing dispatcher is closed asynchronously
  • no proper errors messages are logged or logged that could help resolving the situation

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari lhotari added type/bug The PR fixed a bug or issue reported a bug category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost labels Sep 9, 2024
@pdolif
Copy link
Contributor

pdolif commented Sep 28, 2024

@lhotari I tried to implement this: pdolif@e241cff
When the policy of a new consumer is incompatible with the existing consumers of the subscription it is rejected.

I also wrote a test but it works only when disabling the transaction coordinator in the setup() method. I could not find out how to make the test work with the transaction coordinator being enabled. When calling addConsumer() on the subscription the pendingAckHandleFuture seems to block.

@lhotari
Copy link
Member Author

lhotari commented Oct 14, 2024

@lhotari I tried to implement this: pdolif@e241cff When the policy of a new consumer is incompatible with the existing consumers of the subscription it is rejected.

I also wrote a test but it works only when disabling the transaction coordinator in the setup() method. I could not find out how to make the test work with the transaction coordinator being enabled. When calling addConsumer() on the subscription the pendingAckHandleFuture seems to block.

makes sense. good work, @pdolif! Thanks for the contribution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost type/bug The PR fixed a bug or issue reported a bug
Development

Successfully merging a pull request may close this issue.

2 participants