Skip to content

Commit

Permalink
fix incorrect message data
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Feb 24, 2024
1 parent c16040d commit 605e9d9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ public void testPermitsIfHalfAckBatchMessage() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
final String subName = "s1";
final int receiverQueueSize = 1000;
final int ackedMessagesCountInTheFistStep = 2;
admin.topics().createNonPartitionedTopic(topicName);
admin.topics(). createSubscription(topicName, subName, MessageId.earliest);
ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING)
Expand All @@ -432,15 +433,15 @@ public void testPermitsIfHalfAckBatchMessage() throws Exception {
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.create();
CompletableFuture<MessageId> lastSent = null;
for (int i = 0; i < 100; i++) {
for (int i = 1; i <= 100; i++) {
lastSent = producer. sendAsync(i + "");
}
producer.flush();
lastSent.join();

// Ack 2 messages, and trigger a redelivery.
Consumer<String> consumer1 = consumerBuilder.subscribe();
for (int i = 0; i < 2; i++) {
for (int i = 0; i < ackedMessagesCountInTheFistStep; i++) {
Message msg = consumer1. receive(2, TimeUnit.SECONDS);
assertNotNull(msg);
consumer1.acknowledge(msg);
Expand All @@ -452,12 +453,15 @@ public void testPermitsIfHalfAckBatchMessage() throws Exception {
ConsumerImpl<String> consumer2 = (ConsumerImpl<String>) consumerBuilder.subscribe();
Awaitility.await().until(() -> consumer2.isConnected());
List<MessageId> messages = new ArrayList<>();
int nextMessageValue = ackedMessagesCountInTheFistStep + 1;
while (true) {
Message msg = consumer2. receive(2, TimeUnit.SECONDS);
Message<String> msg = consumer2.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
assertEquals(msg.getValue(), nextMessageValue + "");
messages.add(msg.getMessageId());
nextMessageValue++;
}
assertEquals(messages.size(), 98);
consumer2.acknowledge(messages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,10 @@ protected <V> MessageImpl<V> newSingleMessage(final int index,
return null;
}

if (ackBitSet != null && !ackBitSet.get(index)) {
return null;
}

BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), index, numMessages, ackSetInMessageId);

Expand Down Expand Up @@ -1635,16 +1639,18 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
int skippedMessages = 0;
try {
for (int i = 0; i < batchSize; ++i) {
if (ackBitSet != null && !ackBitSet.get(i)) {
// If it is not in ackBitSet, it means Broker does not want to deliver it to the client, and did not
// decrease the permits in the broker-side.
// So do not acquire more permits for this message.
continue;
}
final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
singleMessageMetadata, uncompressedPayload, batchMessage, schema, true,
ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch);
if (message == null) {
if (ackBitSet != null && !ackBitSet.get(i)) {
// If it is not in ackBitSet, it means Broker does not want to deliver it to the client, and
// did not decrease the permits in the broker-side.
// So do not acquire more permits for this message.
// Why not skip this single message in the first line of for-loop block? We need call
// "newSingleMessage" to move "payload.readerIndex" to a correct value to get the correct data.
continue;
}
skippedMessages++;
continue;
}
Expand Down

0 comments on commit 605e9d9

Please sign in to comment.