Skip to content

Commit

Permalink
[fix][client] Fix Reader.hasMessageAvailable return wrong value after…
Browse files Browse the repository at this point in the history
… seeking by timestamp with startMessageIdInclusive (apache#23502)
  • Loading branch information
summeriiii authored Oct 24, 2024
1 parent ff4a25e commit fcb3592
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,46 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess
}
}

@Test
public void testHasMessageAvailableAfterSeekTimestampWithMessageIdInclusive() throws Exception {
final String topic = "persistent://my-property/my-ns/" +
"testHasMessageAvailableAfterSeekTimestampWithMessageInclusive";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
final long timestampBeforeSend = System.currentTimeMillis();
final MessageId sentMsgId = producer.send("msg");

final List<MessageId> messageIds = new ArrayList<>();
messageIds.add(MessageId.earliest);
messageIds.add(sentMsgId);
messageIds.add(MessageId.latest);

for (MessageId messageId : messageIds) {
@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageIdInclusive()
.startMessageId(messageId).create();
assertTrue(reader.hasMessageAvailable());

reader.seek(System.currentTimeMillis());
assertFalse(reader.hasMessageAvailable());
Message<String> message = reader.readNext(10, TimeUnit.SECONDS);
assertNull(message);
}

for (MessageId messageId : messageIds) {
@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageIdInclusive()
.startMessageId(messageId).create();
assertTrue(reader.hasMessageAvailable());

reader.seek(timestampBeforeSend);
assertTrue(reader.hasMessageAvailable());
}
}

@Test
public void testReaderBuilderStateOnRetryFailure() throws Exception {
String ns = "my-property/my-ns";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2515,6 +2515,8 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
.result();
if (lastMessageId.getEntryId() < 0) {
completehasMessageAvailableWithValue(booleanFuture, false);
} else if (hasSoughtByTimestamp) {
completehasMessageAvailableWithValue(booleanFuture, result < 0);
} else {
completehasMessageAvailableWithValue(booleanFuture,
resetIncludeHead ? result <= 0 : result < 0);
Expand Down

0 comments on commit fcb3592

Please sign in to comment.