Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout. #17318

Merged
merged 12 commits into from
Oct 11, 2022

Conversation

shibd
Copy link
Member

@shibd shibd commented Aug 28, 2022

Motivation

Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout.

Modifications

  • Add consumer epoch on notifyPendingBatchReceivedCallBack.
  • Reuse notifyPendingBatchReceivedCallBack logic.

Verifying this change

  • testBatchReceiveRedeliveryAddEpoch unit test covers the scene.

Documentation

  • doc-not-needed

Matching PR in forked repository

shibd#16

@shibd shibd changed the title [fix][client] When using batch receive timeout, will receive duplicate messages. [fix][client] Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout. Aug 28, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 28, 2022
@shibd
Copy link
Member Author

shibd commented Aug 29, 2022

/pulsarbot run-failure-checks

@shibd shibd force-pushed the refactor_consuemr_br branch from 3912b48 to b4e7a52 Compare September 6, 2022 07:45
@shibd
Copy link
Member Author

shibd commented Sep 6, 2022

@nodece Thanks, all comments fixed, PTAL.

@shibd
Copy link
Member Author

shibd commented Sep 14, 2022

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

@congbobo184 Do you remember why the consumer epoch check happened after polling the messages from the receiver queue?

@shibd @congbobo184
Can we move it to the beginning of message receiving

void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, ClientCnx cnx) {
List<Long> ackSet = Collections.emptyList();
if (cmdMessage.getAckSetsCount() > 0) {
ackSet = new ArrayList<>(cmdMessage.getAckSetsCount());
for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
ackSet.add(cmdMessage.getAckSetAt(i));
}
}
int redeliveryCount = cmdMessage.getRedeliveryCount();
MessageIdData messageId = cmdMessage.getMessageId();
long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
// if broker send messages to client with consumerEpoch, we should set consumerEpoch to message
if (cmdMessage.hasConsumerEpoch()) {
consumerEpoch = cmdMessage.getConsumerEpoch();
}

It looks like if the message with invalid epoch, we don't need to add it to the receiver queue.
And we will also introduce client-side stats issues, because users haven't seen that message, but from stats they see the received messages are increased.

protected synchronized void messageProcessed(Message<?> msg) {
ClientCnx currentCnx = cnx();
ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
lastDequeuedMessageId = msg.getMessageId();
if (msgCnx != currentCnx) {
// The processed message did belong to the old queue that was cleared after reconnection.
} else {
if (listener == null && !parentConsumerHasListener) {
increaseAvailablePermits(currentCnx);
}
stats.updateNumMsgsReceived(msg);
trackMessage(msg);
}
decreaseIncomingMessageSize(msg);
}

The messages with invalid epoch should be the skipped messages like here

It also makes sense to add skipped messages in ConsumerStats

@shibd shibd force-pushed the refactor_consuemr_br branch from a51f891 to ad81e5a Compare September 26, 2022 23:43
@shibd shibd force-pushed the refactor_consuemr_br branch from a2a73be to 2d8c1e6 Compare October 8, 2022 13:08
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@shibd I have left some minor comment, PTAL.

@shibd
Copy link
Member Author

shibd commented Oct 10, 2022

/pulsarbot run-failure-checks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client doc-not-needed Your PR changes do not impact docs type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants