-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add unack-message threshold to restrict consumer for receiving messages without acknowledging-msg up to the threshold #48
Conversation
CLA is valid! |
2 similar comments
CLA is valid! |
CLA is valid! |
oldPermits); | ||
if (!blockedConsumerOnUnackedMsgs) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Added more flow control message permits {} (old was: {})", this, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are updating the messagePermits
regardless, we should keep this log outside the if condition.
additionalNumberOfMessages, oldPermits); | ||
} | ||
subscription.consumerFlow(this, additionalNumberOfMessages); | ||
unackedMessages.addAndGet(additionalNumberOfMessages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we update the unackedMessages
while dispatching messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we talked: we are not moving increment of unackedMessages at sendMessages(..) because this method is being called asynchronously and before it updates unackedMessages counter: if flowPermits(...) get triggered then it will add more permits and call dispatcher to dispatch more messages.
@@ -66,14 +66,19 @@ | |||
private final ConcurrentOpenHashSet<PositionImpl> pendingAcks; | |||
|
|||
private final ConsumerStats stats; | |||
|
|||
private final int maxUnackedMessages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to disable this feature easily. What happens if we set maxUnackedMessages
to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To disable feature we can configure maxUnackedMessages =Integer.MAX_VALUE
. Right now, configured value of maxUnackedMessages
is 50000. Should we increase it to higher number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use 0 for "unlimited"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright. I have added method shouldBlockConsumerOnUnackMsgs()
which will enable this feature on sharedSubscription if maxUnackedMessages
is > 0.
58ca6d8
to
923636e
Compare
@@ -97,6 +97,10 @@ tlsTrustCertsFilePath= | |||
# Accept untrusted TLS certificate from client | |||
tlsAllowInsecureConnection=false | |||
|
|||
# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending | |||
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can mention here that setting the value to 0 will make it unlimited
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.. added the documentation..
0ec780f
to
f7c99ff
Compare
@@ -66,14 +66,19 @@ | |||
private final ConcurrentOpenHashSet<PositionImpl> pendingAcks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added change: to consider number of messages in a batch-msg while blocking consumer.
4cfa842
to
910fcf5
Compare
…es without acknowledging-msg up to the threshold
@sboobna can you give a final pass? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Change initial version to 0.0.1
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com> Fix the map of pendingReqs concurrent issue.
In pulsar cpp client, the default event time is set as minus value and caused error when use kafka consumer to consume message produced by pulsar cpp/cgo client. In this change, if we checked the event time is not set, using publish time to set it.
Motivation
If consumer keep tries to receive messages without acknowledging to broker then broker won't know last-deleted/received message entry. and if broker/consumer crashes/restarts in between then next time broker needs to send all the messages again as client didn't ack those messages.
Modifications
maxUnackedMessagesPerConsumer
limit till which consumer can consume messages without sending ackmaxUnackedMessagesPerConsumer
limit, broker stops sending further messages until consumer sends acknowledgement formaxUnackedMessagesPerConsumer/2
messages.Result
maxUnackedMessagesPerConsumer
number of messages.