Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned Consumer and Listener Threads #80

Closed
sschepens opened this issue Oct 24, 2016 · 4 comments
Closed

Partitioned Consumer and Listener Threads #80

sschepens opened this issue Oct 24, 2016 · 4 comments
Assignees
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Milestone

Comments

@sschepens
Copy link
Contributor

sschepens commented Oct 24, 2016

I understand that Pulsar Client could and should definitely be shared among Publisher and Consumer instances to take advantage of Netty's IO threads and share them among all instances.

But there seems to be an issue particularly with PartitionedConsumerImpl because it uses a messageListener to receive messages form it's internal ConsumerImpls.

Listener are executed in a pool of configurable size, but PartitionedConsumerImpl does blocking calls, so this is definitely a problem. If, for some reason, a PartitionedConsumer stops consuming messages, it will block listener threads equal to the amount of partitions it has, or maybe even more.

I don't really have a solution, but we should definitely have a better approach to PartitionedConsumerImpl

@merlimat
Copy link
Contributor

The PartitionedConsumerImpl uses a specific internal executor to handle that. See at :

https://github.com/yahoo/pulsar/blob/master/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java#L47

The individual ConsumerImpl instances inside the PartitionedConsumerImpl will use that internalListenerExecutor to push messages into the common queue. If the application stops calling receive(), the common queue will get full and that will block the internal listener thread, but that will not block the IO threads, since they will just enqueue on the individual queues (which don't have size limit) and move on.

@sschepens
Copy link
Contributor Author

sschepens commented Oct 24, 2016

Yes, I realize it will not block IO threads, but it will block other PartitionedConsumerImpl that are using the same internalListenerExecutor, a Client creates an internalListenerExecutor of size 1 by default, but even increasing the size is not a really good solution. If we intend on creating many consumers, it's not a good solution to have 1 thread per consumer, it doesn't scale.
It would be great if PartitionedConsumerImpl could be a little more smart about handling messages, but this would probably need more communication between PartitionedConsumerImpl and its internal ConsumerImpl than we currently have. For example, ConsumerImpl could try and add a message to PartitionedConsumerImpl if it doesn't succeed then it could just remain in the queue, then, PartitionedConsumerImpl could somehow notifying it's internal consumers that it has space so they pop messages from their queues.

@merlimat
Copy link
Contributor

OK, that is a good point.

The key part is to leave the messages in the single topics queues, so that we can stop the broker from pushing more messages. If that can be done without recurring to blocking the intermediate listener thread, that would be great.

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Oct 24, 2016
merlimat added a commit to merlimat/pulsar that referenced this issue Nov 14, 2016
merlimat added a commit that referenced this issue Nov 14, 2016
@merlimat
Copy link
Contributor

Fixed by #106

@merlimat merlimat added this to the 1.16 milestone Nov 14, 2016
@merlimat merlimat self-assigned this Nov 14, 2016
sijie added a commit to sijie/pulsar that referenced this issue Mar 4, 2018
hrsakai pushed a commit to hrsakai/pulsar that referenced this issue Dec 10, 2020
hangc0276 pushed a commit to hangc0276/pulsar that referenced this issue May 26, 2021
current code will cause too much lookup. This PR try to cache the topic lookup result for topics that served by this broker.  For the topics that not served by this broker, we should not cache it, since it is not able to track the topic reloading, and will cause conflict.
dragonls pushed a commit to dragonls/pulsar that referenced this issue Oct 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

No branches or pull requests

2 participants