From fcb359299e9fd784afcebbc3967927a7f1f7c25c Mon Sep 17 00:00:00 2001 From: Jiawen Wang <74594733+summeriiii@users.noreply.github.com> Date: Thu, 24 Oct 2024 17:34:31 +0800 Subject: [PATCH] [fix][client] Fix Reader.hasMessageAvailable return wrong value after seeking by timestamp with startMessageIdInclusive (#23502) --- .../apache/pulsar/client/impl/ReaderTest.java | 40 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 2 + 2 files changed, 42 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 12228220b18bd..a6a3f83ebc37d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -906,6 +906,46 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess } } + @Test + public void testHasMessageAvailableAfterSeekTimestampWithMessageIdInclusive() throws Exception { + final String topic = "persistent://my-property/my-ns/" + + "testHasMessageAvailableAfterSeekTimestampWithMessageInclusive"; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + final long timestampBeforeSend = System.currentTimeMillis(); + final MessageId sentMsgId = producer.send("msg"); + + final List messageIds = new ArrayList<>(); + messageIds.add(MessageId.earliest); + messageIds.add(sentMsgId); + messageIds.add(MessageId.latest); + + for (MessageId messageId : messageIds) { + @Cleanup + Reader reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1) + .startMessageIdInclusive() + .startMessageId(messageId).create(); + assertTrue(reader.hasMessageAvailable()); + + reader.seek(System.currentTimeMillis()); + assertFalse(reader.hasMessageAvailable()); + Message message = reader.readNext(10, TimeUnit.SECONDS); + assertNull(message); + } + + for (MessageId messageId : messageIds) { + @Cleanup + Reader reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1) + .startMessageIdInclusive() + .startMessageId(messageId).create(); + assertTrue(reader.hasMessageAvailable()); + + reader.seek(timestampBeforeSend); + assertTrue(reader.hasMessageAvailable()); + } + } + @Test public void testReaderBuilderStateOnRetryFailure() throws Exception { String ns = "my-property/my-ns"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b7010a1ddc7b4..be01bd00eb300 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2515,6 +2515,8 @@ public CompletableFuture hasMessageAvailableAsync() { .result(); if (lastMessageId.getEntryId() < 0) { completehasMessageAvailableWithValue(booleanFuture, false); + } else if (hasSoughtByTimestamp) { + completehasMessageAvailableWithValue(booleanFuture, result < 0); } else { completehasMessageAvailableWithValue(booleanFuture, resetIncludeHead ? result <= 0 : result < 0);