From e50493ea17dd5f2f9d4527d74cc4f40e12439df2 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 24 Jan 2022 12:06:00 +0800 Subject: [PATCH] [C++] Fix hasMessageAvailable returns wrong value for last message (#13883) ### Motivation In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived. ### Modifications The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client. Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`. To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from https://github.com/apache/pulsar/pull/9652 to compare with `last_message_id` when `startMessageId` is latest. ### Verifying this change This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`. --- pulsar-client-cpp/lib/ClientConnection.cc | 23 ++-- pulsar-client-cpp/lib/ClientConnection.h | 5 +- pulsar-client-cpp/lib/ConsumerImpl.cc | 112 +++++++++++------- pulsar-client-cpp/lib/ConsumerImpl.h | 23 ++-- .../lib/GetLastMessageIdResponse.h | 56 +++++++++ pulsar-client-cpp/lib/MessageIdUtil.h | 44 +++++++ pulsar-client-cpp/lib/ReaderImpl.cc | 4 +- pulsar-client-cpp/tests/MessageIdTest.cc | 22 ++++ pulsar-client-cpp/tests/ReaderTest.cc | 45 +++++++ 9 files changed, 263 insertions(+), 71 deletions(-) create mode 100644 pulsar-client-cpp/lib/GetLastMessageIdResponse.h create mode 100644 pulsar-client-cpp/lib/MessageIdUtil.h diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 79bc1d74aa967..d246bf89287b8 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -37,6 +37,7 @@ #include "ProducerImpl.h" #include "ConsumerImpl.h" #include "checksum/ChecksumProvider.h" +#include "MessageIdUtil.h" DECLARE_LOG_OBJECT() @@ -1072,7 +1073,7 @@ void ClientConnection::handleIncomingCommand() { PendingGetLastMessageIdRequestsMap::iterator it = pendingGetLastMessageIdRequests_.find(error.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - Promise getLastMessageIdPromise = it->second; + auto getLastMessageIdPromise = it->second; pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); @@ -1191,15 +1192,18 @@ void ClientConnection::handleIncomingCommand() { pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - Promise getLastMessageIdPromise = it->second; + auto getLastMessageIdPromise = it->second; pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); - MessageIdData messageIdData = getLastMessageIdResponse.last_message_id(); - MessageId messageId = MessageId(messageIdData.partition(), messageIdData.ledgerid(), - messageIdData.entryid(), messageIdData.batch_index()); - - getLastMessageIdPromise.setValue(messageId); + if (getLastMessageIdResponse.has_consumer_mark_delete_position()) { + getLastMessageIdPromise.setValue( + {toMessageId(getLastMessageIdResponse.last_message_id()), + toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())}); + } else { + getLastMessageIdPromise.setValue( + {toMessageId(getLastMessageIdResponse.last_message_id())}); + } } else { lock.unlock(); LOG_WARN( @@ -1610,9 +1614,10 @@ Commands::ChecksumType ClientConnection::getChecksumType() const { return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : Commands::None; } -Future ClientConnection::newGetLastMessageId(uint64_t consumerId, uint64_t requestId) { +Future ClientConnection::newGetLastMessageId(uint64_t consumerId, + uint64_t requestId) { Lock lock(mutex_); - Promise promise; + Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString_ << " Client is not connected to the broker"); diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 48e6d57a0a23c..7ca5f378f30e5 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -46,6 +46,7 @@ #include #include #include "lib/PeriodicTask.h" +#include "lib/GetLastMessageIdResponse.h" using namespace pulsar; @@ -156,7 +157,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newConsumerStats(uint64_t consumerId, uint64_t requestId); - Future newGetLastMessageId(uint64_t consumerId, uint64_t requestId); + Future newGetLastMessageId(uint64_t consumerId, uint64_t requestId); Future newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); @@ -306,7 +307,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; - typedef std::map> PendingGetLastMessageIdRequestsMap; + typedef std::map> PendingGetLastMessageIdRequestsMap; PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_; typedef std::map> PendingGetNamespaceTopicsMap; diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 4d4a135dbec69..fa817a094ce24 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -25,6 +25,7 @@ #include "pulsar/Result.h" #include "pulsar/MessageId.h" #include "Utils.h" +#include "MessageIdUtil.h" #include "AckGroupingTracker.h" #include "AckGroupingTrackerEnabled.h" #include "AckGroupingTrackerDisabled.h" @@ -51,7 +52,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, hasParent_(hasParent), consumerTopicType_(consumerTopicType), subscriptionMode_(subscriptionMode), - startMessageId_(startMessageId), // This is the initial capacity of the queue incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)), availablePermits_(0), @@ -63,7 +63,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, negativeAcksTracker_(client, *this, conf), ackGroupingTrackerPtr_(std::make_shared()), readCompacted_(conf.isReadCompacted()), - lastMessageInBroker_(Optional::of(MessageId())), + startMessageId_(startMessageId), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoOldestChunkedMessageOnQueueFull()) { std::stringstream consumerStrStream; @@ -161,8 +161,9 @@ void ConsumerImpl::start() { void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { Lock lock(mutex_); - if (state_ == Closed) { - lock.unlock(); + const auto state = state_; + lock.unlock(); + if (state == Closed) { LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed"); return; } @@ -171,23 +172,24 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { // sending the subscribe request. cnx->registerConsumer(consumerId_, shared_from_this()); + Lock lockForMessageId(mutexForMessageId_); Optional firstMessageInQueue = clearReceiveQueue(); - unAckedMessageTrackerPtr_->clear(); - batchAcknowledgementTracker_.clear(); - if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) { // Update startMessageId so that we can discard messages after delivery // restarts startMessageId_ = firstMessageInQueue; } + const auto startMessageId = startMessageId_; + lockForMessageId.unlock(); - lock.unlock(); + unAckedMessageTrackerPtr_->clear(); + batchAcknowledgementTracker_.clear(); ClientImplPtr client = client_.lock(); uint64_t requestId = client->newRequestId(); SharedBuffer cmd = Commands::newSubscribe( topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_, - startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(), + startMessageId, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy(), config_.getPriorityLevel()); cnx->sendRequestWithId(cmd, requestId) @@ -538,6 +540,9 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection batchAcknowledgementTracker_.receivedMessage(batchedMessage); LOG_DEBUG("Received Batch messages of size - " << batchSize << " -- msgId: " << batchedMessage.getMessageId()); + Lock lock(mutexForMessageId_); + const auto startMessageId = startMessageId_; + lock.unlock(); int skippedMessages = 0; @@ -547,14 +552,14 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection msg.impl_->setRedeliveryCount(redeliveryCount); msg.impl_->setTopicName(batchedMessage.getTopicName()); - if (startMessageId_.is_present()) { + if (startMessageId.is_present()) { const MessageId& msgId = msg.getMessageId(); // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId - if (msgId.ledgerId() == startMessageId_.value().ledgerId() && - msgId.entryId() == startMessageId_.value().entryId() && - msgId.batchIndex() <= startMessageId_.value().batchIndex()) { + if (msgId.ledgerId() == startMessageId.value().ledgerId() && + msgId.entryId() == startMessageId.value().entryId() && + msgId.batchIndex() <= startMessageId.value().batchIndex()) { LOG_DEBUG(getName() << "Ignoring message from before the startMessageId" << msg.getMessageId()); ++skippedMessages; @@ -686,7 +691,7 @@ void ConsumerImpl::internalListener() { trackMessage(msg.getMessageId()); try { consumerStatsBasePtr_->receivedMessage(msg, ResultOk); - lastDequedMessage_ = Optional::of(msg.getMessageId()); + lastDequedMessageId_ = msg.getMessageId(); messageListener_(Consumer(shared_from_this()), msg); } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from listener" << e.what()); @@ -820,8 +825,9 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { } void ConsumerImpl::messageProcessed(Message& msg, bool track) { - Lock lock(mutex_); - lastDequedMessage_ = Optional::of(msg.getMessageId()); + Lock lock(mutexForMessageId_); + lastDequedMessageId_ = msg.getMessageId(); + lock.unlock(); ClientConnectionPtr currentCnx = getCnx().lock(); if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) { @@ -853,11 +859,11 @@ Optional ConsumerImpl::clearReceiveQueue() { previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId() - 1, -1); } return Optional::of(previousMessageId); - } else if (lastDequedMessage_.is_present()) { + } else if (lastDequedMessageId_ != MessageId::earliest()) { // If the queue was empty we need to restart from the message just after the last one that has been // dequeued // in the past - return lastDequedMessage_; + return Optional::of(lastDequedMessageId_); } else { // No message was received or dequeued by this consumer. Next message would still be the // startMessageId @@ -1193,6 +1199,9 @@ void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsIm void ConsumerImpl::handleSeek(Result result, ResultCallback callback) { if (result == ResultOk) { + Lock lock(mutexForMessageId_); + lastDequedMessageId_ = MessageId::earliest(); + lock.unlock(); LOG_INFO(getName() << "Seek successfully"); } else { LOG_ERROR(getName() << "Failed to seek: " << strResult(result)); @@ -1267,37 +1276,42 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { bool ConsumerImpl::isReadCompacted() { return readCompacted_; } +inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const MessageId& messageId) { + return lastMessageIdInBroker > messageId && lastMessageIdInBroker.entryId() != -1; +} + void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) { - MessageId lastDequed = this->lastMessageIdDequed(); - MessageId lastInBroker = this->lastMessageIdInBroker(); - if (lastInBroker > lastDequed && lastInBroker.entryId() != -1) { - callback(ResultOk, true); - return; - } + Lock lock(mutexForMessageId_); + const auto messageId = + (lastDequedMessageId_ == MessageId::earliest()) ? startMessageId_.value() : lastDequedMessageId_; - getLastMessageIdAsync([lastDequed, callback](Result result, MessageId messageId) { - if (result == ResultOk) { - if (messageId > lastDequed && messageId.entryId() != -1) { - callback(ResultOk, true); + if (messageId == MessageId::latest()) { + lock.unlock(); + getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { + if (result != ResultOk) { + callback(result, {}); + return; + } + if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { + // We only care about comparing ledger ids and entry ids as mark delete position doesn't have + // other ids such as batch index + callback(ResultOk, compareLedgerAndEntryId(response.getMarkDeletePosition(), + response.getLastMessageId()) < 0); } else { callback(ResultOk, false); } - } else { - callback(result, false); - } - }); -} - -void ConsumerImpl::brokerGetLastMessageIdListener(Result res, MessageId messageId, - BrokerGetLastMessageIdCallback callback) { - Lock lock(mutex_); - if (messageId > lastMessageIdInBroker()) { - lastMessageInBroker_ = Optional::of(messageId); - lock.unlock(); - callback(res, messageId); + }); } else { + if (hasMoreMessages(lastMessageIdInBroker_, messageId)) { + lock.unlock(); + callback(ResultOk, true); + return; + } lock.unlock(); - callback(res, lastMessageIdInBroker()); + + getLastMessageIdAsync([callback, messageId](Result result, const GetLastMessageIdResponse& response) { + callback(result, (result == ResultOk) && hasMoreMessages(response.getLastMessageId(), messageId)); + }); } } @@ -1321,9 +1335,19 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId() << ", requestId - " << requestId); + auto self = shared_from_this(); cnx->newGetLastMessageId(consumerId_, requestId) - .addListener(std::bind(&ConsumerImpl::brokerGetLastMessageIdListener, shared_from_this(), - std::placeholders::_1, std::placeholders::_2, callback)); + .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) { + if (result == ResultOk) { + LOG_DEBUG(getName() << "getLastMessageId: " << response); + Lock lock(mutexForMessageId_); + lastMessageIdInBroker_ = response.getLastMessageId(); + lock.unlock(); + } else { + LOG_ERROR(getName() << "Failed to getLastMessageId: " << result); + } + callback(result, response); + }); } else { LOG_ERROR(getName() << " Operation not supported since server protobuf version " << cnx->getServerProtocolVersion() << " is older than proto::v12"); diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 2bdb82fc7630a..346d3515ad7f6 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -33,6 +33,7 @@ #include "lib/UnAckedMessageTrackerDisabled.h" #include "MessageCrypto.h" #include "AckGroupingTracker.h" +#include "GetLastMessageIdResponse.h" #include "CompressionCodec.h" #include @@ -54,7 +55,7 @@ class ExecutorService; class ConsumerImpl; class BatchAcknowledgementTracker; typedef std::shared_ptr MessageCryptoPtr; -typedef std::function BrokerGetLastMessageIdCallback; +typedef std::function BrokerGetLastMessageIdCallback; enum ConsumerTopicType { @@ -193,10 +194,8 @@ class ConsumerImpl : public ConsumerImplBase, bool hasParent_; ConsumerTopicType consumerTopicType_; - Commands::SubscriptionMode subscriptionMode_; - Optional startMessageId_; + const Commands::SubscriptionMode subscriptionMode_; - Optional lastDequedMessage_; UnboundedBlockingQueue incomingMessages_; std::queue pendingReceives_; std::atomic_int availablePermits_; @@ -217,17 +216,11 @@ class ConsumerImpl : public ConsumerImplBase, MessageCryptoPtr msgCrypto_; const bool readCompacted_; - Optional lastMessageInBroker_; - void brokerGetLastMessageIdListener(Result res, MessageId messageId, - BrokerGetLastMessageIdCallback callback); - - const MessageId& lastMessageIdDequed() { - return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : MessageId::earliest(); - } - - const MessageId& lastMessageIdInBroker() { - return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId::earliest(); - } + // Make the access to `startMessageId_`, `lastDequedMessageId_` and `lastMessageIdInBroker_` thread safe + mutable std::mutex mutexForMessageId_; + Optional startMessageId_; + MessageId lastDequedMessageId_{MessageId::earliest()}; + MessageId lastMessageIdInBroker_{MessageId::earliest()}; class ChunkedMessageCtx { public: diff --git a/pulsar-client-cpp/lib/GetLastMessageIdResponse.h b/pulsar-client-cpp/lib/GetLastMessageIdResponse.h new file mode 100644 index 0000000000000..0acb78394e115 --- /dev/null +++ b/pulsar-client-cpp/lib/GetLastMessageIdResponse.h @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +namespace pulsar { + +class GetLastMessageIdResponse { + friend std::ostream& operator<<(std::ostream& os, const GetLastMessageIdResponse& response) { + os << "lastMessageId: " << response.lastMessageId_; + if (response.hasMarkDeletePosition_) { + os << ", markDeletePosition: " << response.markDeletePosition_; + } + return os; + } + + public: + GetLastMessageIdResponse() = default; + + GetLastMessageIdResponse(const MessageId& lastMessageId) + : lastMessageId_(lastMessageId), hasMarkDeletePosition_{false} {} + + GetLastMessageIdResponse(const MessageId& lastMessageId, const MessageId& markDeletePosition) + : lastMessageId_(lastMessageId), + markDeletePosition_(markDeletePosition), + hasMarkDeletePosition_(true) {} + + const MessageId& getLastMessageId() const noexcept { return lastMessageId_; } + const MessageId& getMarkDeletePosition() const noexcept { return markDeletePosition_; } + bool hasMarkDeletePosition() const noexcept { return hasMarkDeletePosition_; } + + private: + MessageId lastMessageId_; + MessageId markDeletePosition_; + bool hasMarkDeletePosition_; +}; + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/MessageIdUtil.h b/pulsar-client-cpp/lib/MessageIdUtil.h new file mode 100644 index 0000000000000..d6f80a10ea015 --- /dev/null +++ b/pulsar-client-cpp/lib/MessageIdUtil.h @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include "PulsarApi.pb.h" + +namespace pulsar { + +inline MessageId toMessageId(const proto::MessageIdData& messageIdData) { + return MessageId{messageIdData.partition(), static_cast(messageIdData.ledgerid()), + static_cast(messageIdData.entryid()), messageIdData.batch_index()}; +} + +namespace internal { +template +static int compare(T lhs, T rhs) { + return (lhs < rhs) ? -1 : ((lhs == rhs) ? 0 : 1); +} +} // namespace internal + +inline int compareLedgerAndEntryId(const MessageId& lhs, const MessageId& rhs) { + auto result = internal::compare(lhs.ledgerId(), rhs.ledgerId()); + if (result != 0) { + return result; + } + return internal::compare(lhs.entryId(), rhs.entryId()); +} + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc index 48f5d5866f453..0a7b3215437bf 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.cc +++ b/pulsar-client-cpp/lib/ReaderImpl.cc @@ -139,7 +139,9 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { } void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) { - consumer_->getLastMessageIdAsync(callback); + consumer_->getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { + callback(result, response.getLastMessageId()); + }); } ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; } diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc b/pulsar-client-cpp/tests/MessageIdTest.cc index 06c25283794aa..55fa181da05f4 100644 --- a/pulsar-client-cpp/tests/MessageIdTest.cc +++ b/pulsar-client-cpp/tests/MessageIdTest.cc @@ -17,6 +17,7 @@ * under the License. */ #include +#include "lib/MessageIdUtil.h" #include "PulsarFriend.h" #include @@ -35,3 +36,24 @@ TEST(MessageIdTest, testSerialization) { ASSERT_EQ(msgId, deserialized); } + +TEST(MessageIdTest, testCompareLedgerAndEntryId) { + MessageId id1(-1, 2L, 1L, 0); + MessageId id2(-1, 2L, 1L, 1); + MessageId id3(-1, 2L, 2L, 0); + MessageId id4(-1, 3L, 0L, 0); + ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0); + ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0); + + ASSERT_EQ(compareLedgerAndEntryId(id1, id3), -1); + ASSERT_EQ(compareLedgerAndEntryId(id3, id1), 1); + + ASSERT_EQ(compareLedgerAndEntryId(id1, id4), -1); + ASSERT_EQ(compareLedgerAndEntryId(id4, id1), 1); + + ASSERT_EQ(compareLedgerAndEntryId(id2, id4), -1); + ASSERT_EQ(compareLedgerAndEntryId(id4, id2), 1); + + ASSERT_EQ(compareLedgerAndEntryId(id3, id4), -1); + ASSERT_EQ(compareLedgerAndEntryId(id4, id3), 1); +} diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc index d95038b3cef3c..8cd535caf7f5e 100644 --- a/pulsar-client-cpp/tests/ReaderTest.cc +++ b/pulsar-client-cpp/tests/ReaderTest.cc @@ -26,6 +26,7 @@ #include #include +#include #include DECLARE_LOG_OBJECT() @@ -577,3 +578,47 @@ TEST(ReaderTest, testIsConnected) { ASSERT_EQ(ResultOk, reader.close()); ASSERT_FALSE(reader.isConnected()); } + +TEST(ReaderTest, testHasMessageAvailableWhenCreated) { + const std::string topic = "testHasMessageAvailableWhenCreated-" + std::to_string(time(nullptr)); + Client client(serviceUrl); + + ProducerConfiguration producerConf; + producerConf.setBatchingMaxMessages(3); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer)); + + std::vector messageIds; + constexpr int numMessages = 7; + Latch latch(numMessages); + for (int i = 0; i < numMessages; i++) { + producer.sendAsync(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), + [i, &messageIds, &latch](Result result, const MessageId& messageId) { + if (result == ResultOk) { + LOG_INFO("Send " << i << " to " << messageId); + messageIds.emplace_back(messageId); + } else { + LOG_ERROR("Failed to send " << i << ": " << messageId); + } + latch.countdown(); + }); + } + latch.wait(std::chrono::seconds(3)); + ASSERT_EQ(messageIds.size(), numMessages); + + Reader reader; + bool hasMessageAvailable; + + for (size_t i = 0; i < messageIds.size() - 1; i++) { + ASSERT_EQ(ResultOk, client.createReader(topic, messageIds[i], {}, reader)); + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + EXPECT_TRUE(hasMessageAvailable); + } + + // The start message ID is exclusive by default, so when we start at the last message, there should be no + // message available. + ASSERT_EQ(ResultOk, client.createReader(topic, messageIds.back(), {}, reader)); + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + EXPECT_FALSE(hasMessageAvailable); + client.close(); +}