diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 21fdd643dbb6f..9f4ae6d98c868 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -37,6 +37,7 @@ #include "ProducerImpl.h" #include "ConsumerImpl.h" #include "checksum/ChecksumProvider.h" +#include "MessageIdUtil.h" DECLARE_LOG_OBJECT() @@ -1052,7 +1053,7 @@ void ClientConnection::handleIncomingCommand() { PendingGetLastMessageIdRequestsMap::iterator it = pendingGetLastMessageIdRequests_.find(error.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - Promise getLastMessageIdPromise = it->second; + auto getLastMessageIdPromise = it->second; pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); @@ -1168,15 +1169,18 @@ void ClientConnection::handleIncomingCommand() { pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - Promise getLastMessageIdPromise = it->second; + auto getLastMessageIdPromise = it->second; pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); - MessageIdData messageIdData = getLastMessageIdResponse.last_message_id(); - MessageId messageId = MessageId(messageIdData.partition(), messageIdData.ledgerid(), - messageIdData.entryid(), messageIdData.batch_index()); - - getLastMessageIdPromise.setValue(messageId); + if (getLastMessageIdResponse.has_consumer_mark_delete_position()) { + getLastMessageIdPromise.setValue( + {toMessageId(getLastMessageIdResponse.last_message_id()), + toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())}); + } else { + getLastMessageIdPromise.setValue( + {toMessageId(getLastMessageIdResponse.last_message_id())}); + } } else { lock.unlock(); LOG_WARN( @@ -1585,9 +1589,10 @@ Commands::ChecksumType ClientConnection::getChecksumType() const { return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : Commands::None; } -Future ClientConnection::newGetLastMessageId(uint64_t consumerId, uint64_t requestId) { +Future ClientConnection::newGetLastMessageId(uint64_t consumerId, + uint64_t requestId) { Lock lock(mutex_); - Promise promise; + Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString_ << " Client is not connected to the broker"); diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 4ba99fa55fc15..150a37bc0fdba 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -46,6 +46,7 @@ #include #include #include "lib/PeriodicTask.h" +#include "lib/GetLastMessageIdResponse.h" using namespace pulsar; @@ -156,7 +157,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newConsumerStats(uint64_t consumerId, uint64_t requestId); - Future newGetLastMessageId(uint64_t consumerId, uint64_t requestId); + Future newGetLastMessageId(uint64_t consumerId, uint64_t requestId); Future newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); @@ -305,7 +306,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; - typedef std::map> PendingGetLastMessageIdRequestsMap; + typedef std::map> PendingGetLastMessageIdRequestsMap; PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_; typedef std::map> PendingGetNamespaceTopicsMap; diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 4b6479d933a33..fa872f5a086a7 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -25,6 +25,7 @@ #include "pulsar/Result.h" #include "pulsar/MessageId.h" #include "Utils.h" +#include "MessageIdUtil.h" #include "AckGroupingTracker.h" #include "AckGroupingTrackerEnabled.h" #include "AckGroupingTrackerDisabled.h" @@ -50,7 +51,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, hasParent_(hasParent), consumerTopicType_(consumerTopicType), subscriptionMode_(subscriptionMode), - startMessageId_(startMessageId), // This is the initial capacity of the queue incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)), availablePermits_(0), @@ -62,7 +62,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, negativeAcksTracker_(client, *this, conf), ackGroupingTrackerPtr_(std::make_shared()), readCompacted_(conf.isReadCompacted()), - lastMessageInBroker_(Optional::of(MessageId())) { + startMessageId_(startMessageId) { std::stringstream consumerStrStream; consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; consumerStr_ = consumerStrStream.str(); @@ -158,29 +158,31 @@ void ConsumerImpl::start() { void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { Lock lock(mutex_); - if (state_ == Closed) { - lock.unlock(); + const auto state = state_; + lock.unlock(); + if (state == Closed) { LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed"); return; } + Lock lockForMessageId(mutexForMessageId_); Optional firstMessageInQueue = clearReceiveQueue(); - unAckedMessageTrackerPtr_->clear(); - batchAcknowledgementTracker_.clear(); - if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) { // Update startMessageId so that we can discard messages after delivery // restarts startMessageId_ = firstMessageInQueue; } + const auto startMessageId = startMessageId_; + lockForMessageId.unlock(); - lock.unlock(); + unAckedMessageTrackerPtr_->clear(); + batchAcknowledgementTracker_.clear(); ClientImplPtr client = client_.lock(); uint64_t requestId = client->newRequestId(); SharedBuffer cmd = Commands::newSubscribe( topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_, - startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(), + startMessageId, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy()); cnx->sendRequestWithId(cmd, requestId) .addListener( @@ -416,6 +418,9 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection batchAcknowledgementTracker_.receivedMessage(batchedMessage); LOG_DEBUG("Received Batch messages of size - " << batchSize << " -- msgId: " << batchedMessage.getMessageId()); + Lock lock(mutexForMessageId_); + const auto startMessageId = startMessageId_; + lock.unlock(); int skippedMessages = 0; @@ -425,14 +430,14 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection msg.impl_->setRedeliveryCount(redeliveryCount); msg.impl_->setTopicName(batchedMessage.getTopicName()); - if (startMessageId_.is_present()) { + if (startMessageId.is_present()) { const MessageId& msgId = msg.getMessageId(); // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId - if (msgId.ledgerId() == startMessageId_.value().ledgerId() && - msgId.entryId() == startMessageId_.value().entryId() && - msgId.batchIndex() <= startMessageId_.value().batchIndex()) { + if (msgId.ledgerId() == startMessageId.value().ledgerId() && + msgId.entryId() == startMessageId.value().entryId() && + msgId.batchIndex() <= startMessageId.value().batchIndex()) { LOG_DEBUG(getName() << "Ignoring message from before the startMessageId" << msg.getMessageId()); ++skippedMessages; @@ -563,7 +568,7 @@ void ConsumerImpl::internalListener() { trackMessage(msg); try { consumerStatsBasePtr_->receivedMessage(msg, ResultOk); - lastDequedMessage_ = Optional::of(msg.getMessageId()); + lastDequedMessageId_ = msg.getMessageId(); messageListener_(Consumer(shared_from_this()), msg); } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from listener" << e.what()); @@ -697,8 +702,9 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { } void ConsumerImpl::messageProcessed(Message& msg, bool track) { - Lock lock(mutex_); - lastDequedMessage_ = Optional::of(msg.getMessageId()); + Lock lock(mutexForMessageId_); + lastDequedMessageId_ = msg.getMessageId(); + lock.unlock(); ClientConnectionPtr currentCnx = getCnx().lock(); if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) { @@ -730,11 +736,11 @@ Optional ConsumerImpl::clearReceiveQueue() { previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId() - 1, -1); } return Optional::of(previousMessageId); - } else if (lastDequedMessage_.is_present()) { + } else if (lastDequedMessageId_ != MessageId::earliest()) { // If the queue was empty we need to restart from the message just after the last one that has been // dequeued // in the past - return lastDequedMessage_; + return Optional::of(lastDequedMessageId_); } else { // No message was received or dequeued by this consumer. Next message would still be the // startMessageId @@ -1070,6 +1076,9 @@ void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsIm void ConsumerImpl::handleSeek(Result result, ResultCallback callback) { if (result == ResultOk) { + Lock lock(mutexForMessageId_); + lastDequedMessageId_ = MessageId::earliest(); + lock.unlock(); LOG_INFO(getName() << "Seek successfully"); } else { LOG_ERROR(getName() << "Failed to seek: " << strResult(result)); @@ -1144,37 +1153,42 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { bool ConsumerImpl::isReadCompacted() { return readCompacted_; } +inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const MessageId& messageId) { + return lastMessageIdInBroker > messageId && lastMessageIdInBroker.entryId() != -1; +} + void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) { - MessageId lastDequed = this->lastMessageIdDequed(); - MessageId lastInBroker = this->lastMessageIdInBroker(); - if (lastInBroker > lastDequed && lastInBroker.entryId() != -1) { - callback(ResultOk, true); - return; - } + Lock lock(mutexForMessageId_); + const auto messageId = + (lastDequedMessageId_ == MessageId::earliest()) ? startMessageId_.value() : lastDequedMessageId_; - getLastMessageIdAsync([lastDequed, callback](Result result, MessageId messageId) { - if (result == ResultOk) { - if (messageId > lastDequed && messageId.entryId() != -1) { - callback(ResultOk, true); + if (messageId == MessageId::latest()) { + lock.unlock(); + getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { + if (result != ResultOk) { + callback(result, {}); + return; + } + if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { + // We only care about comparing ledger ids and entry ids as mark delete position doesn't have + // other ids such as batch index + callback(ResultOk, compareLedgerAndEntryId(response.getMarkDeletePosition(), + response.getLastMessageId()) < 0); } else { callback(ResultOk, false); } - } else { - callback(result, false); - } - }); -} - -void ConsumerImpl::brokerGetLastMessageIdListener(Result res, MessageId messageId, - BrokerGetLastMessageIdCallback callback) { - Lock lock(mutex_); - if (messageId > lastMessageIdInBroker()) { - lastMessageInBroker_ = Optional::of(messageId); - lock.unlock(); - callback(res, messageId); + }); } else { + if (hasMoreMessages(lastMessageIdInBroker_, messageId)) { + lock.unlock(); + callback(ResultOk, true); + return; + } lock.unlock(); - callback(res, lastMessageIdInBroker()); + + getLastMessageIdAsync([callback, messageId](Result result, const GetLastMessageIdResponse& response) { + callback(result, (result == ResultOk) && hasMoreMessages(response.getLastMessageId(), messageId)); + }); } } @@ -1198,9 +1212,19 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId() << ", requestId - " << requestId); + auto self = shared_from_this(); cnx->newGetLastMessageId(consumerId_, requestId) - .addListener(std::bind(&ConsumerImpl::brokerGetLastMessageIdListener, shared_from_this(), - std::placeholders::_1, std::placeholders::_2, callback)); + .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) { + if (result == ResultOk) { + LOG_DEBUG(getName() << "getLastMessageId: " << response); + Lock lock(mutexForMessageId_); + lastMessageIdInBroker_ = response.getLastMessageId(); + lock.unlock(); + } else { + LOG_ERROR(getName() << "Failed to getLastMessageId: " << result); + } + callback(result, response); + }); } else { LOG_ERROR(getName() << " Operation not supported since server protobuf version " << cnx->getServerProtocolVersion() << " is older than proto::v12"); diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index b17620952ce5b..2ab716fa389c9 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -33,6 +33,7 @@ #include "lib/UnAckedMessageTrackerDisabled.h" #include "MessageCrypto.h" #include "AckGroupingTracker.h" +#include "GetLastMessageIdResponse.h" #include "CompressionCodec.h" #include @@ -53,7 +54,7 @@ class ExecutorService; class ConsumerImpl; class BatchAcknowledgementTracker; typedef std::shared_ptr MessageCryptoPtr; -typedef std::function BrokerGetLastMessageIdCallback; +typedef std::function BrokerGetLastMessageIdCallback; enum ConsumerTopicType { @@ -186,10 +187,8 @@ class ConsumerImpl : public ConsumerImplBase, bool hasParent_; ConsumerTopicType consumerTopicType_; - Commands::SubscriptionMode subscriptionMode_; - Optional startMessageId_; + const Commands::SubscriptionMode subscriptionMode_; - Optional lastDequedMessage_; UnboundedBlockingQueue incomingMessages_; std::queue pendingReceives_; std::atomic_int availablePermits_; @@ -210,17 +209,11 @@ class ConsumerImpl : public ConsumerImplBase, MessageCryptoPtr msgCrypto_; const bool readCompacted_; - Optional lastMessageInBroker_; - void brokerGetLastMessageIdListener(Result res, MessageId messageId, - BrokerGetLastMessageIdCallback callback); - - const MessageId& lastMessageIdDequed() { - return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : MessageId::earliest(); - } - - const MessageId& lastMessageIdInBroker() { - return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId::earliest(); - } + // Make the access to `startMessageId_`, `lastDequedMessageId_` and `lastMessageIdInBroker_` thread safe + mutable std::mutex mutexForMessageId_; + Optional startMessageId_; + MessageId lastDequedMessageId_{MessageId::earliest()}; + MessageId lastMessageIdInBroker_{MessageId::earliest()}; friend class PulsarFriend; diff --git a/pulsar-client-cpp/lib/GetLastMessageIdResponse.h b/pulsar-client-cpp/lib/GetLastMessageIdResponse.h new file mode 100644 index 0000000000000..0acb78394e115 --- /dev/null +++ b/pulsar-client-cpp/lib/GetLastMessageIdResponse.h @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +namespace pulsar { + +class GetLastMessageIdResponse { + friend std::ostream& operator<<(std::ostream& os, const GetLastMessageIdResponse& response) { + os << "lastMessageId: " << response.lastMessageId_; + if (response.hasMarkDeletePosition_) { + os << ", markDeletePosition: " << response.markDeletePosition_; + } + return os; + } + + public: + GetLastMessageIdResponse() = default; + + GetLastMessageIdResponse(const MessageId& lastMessageId) + : lastMessageId_(lastMessageId), hasMarkDeletePosition_{false} {} + + GetLastMessageIdResponse(const MessageId& lastMessageId, const MessageId& markDeletePosition) + : lastMessageId_(lastMessageId), + markDeletePosition_(markDeletePosition), + hasMarkDeletePosition_(true) {} + + const MessageId& getLastMessageId() const noexcept { return lastMessageId_; } + const MessageId& getMarkDeletePosition() const noexcept { return markDeletePosition_; } + bool hasMarkDeletePosition() const noexcept { return hasMarkDeletePosition_; } + + private: + MessageId lastMessageId_; + MessageId markDeletePosition_; + bool hasMarkDeletePosition_; +}; + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/MessageIdUtil.h b/pulsar-client-cpp/lib/MessageIdUtil.h new file mode 100644 index 0000000000000..d6f80a10ea015 --- /dev/null +++ b/pulsar-client-cpp/lib/MessageIdUtil.h @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include "PulsarApi.pb.h" + +namespace pulsar { + +inline MessageId toMessageId(const proto::MessageIdData& messageIdData) { + return MessageId{messageIdData.partition(), static_cast(messageIdData.ledgerid()), + static_cast(messageIdData.entryid()), messageIdData.batch_index()}; +} + +namespace internal { +template +static int compare(T lhs, T rhs) { + return (lhs < rhs) ? -1 : ((lhs == rhs) ? 0 : 1); +} +} // namespace internal + +inline int compareLedgerAndEntryId(const MessageId& lhs, const MessageId& rhs) { + auto result = internal::compare(lhs.ledgerId(), rhs.ledgerId()); + if (result != 0) { + return result; + } + return internal::compare(lhs.entryId(), rhs.entryId()); +} + +} // namespace pulsar diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc b/pulsar-client-cpp/tests/MessageIdTest.cc index 06c25283794aa..55fa181da05f4 100644 --- a/pulsar-client-cpp/tests/MessageIdTest.cc +++ b/pulsar-client-cpp/tests/MessageIdTest.cc @@ -17,6 +17,7 @@ * under the License. */ #include +#include "lib/MessageIdUtil.h" #include "PulsarFriend.h" #include @@ -35,3 +36,24 @@ TEST(MessageIdTest, testSerialization) { ASSERT_EQ(msgId, deserialized); } + +TEST(MessageIdTest, testCompareLedgerAndEntryId) { + MessageId id1(-1, 2L, 1L, 0); + MessageId id2(-1, 2L, 1L, 1); + MessageId id3(-1, 2L, 2L, 0); + MessageId id4(-1, 3L, 0L, 0); + ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0); + ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0); + + ASSERT_EQ(compareLedgerAndEntryId(id1, id3), -1); + ASSERT_EQ(compareLedgerAndEntryId(id3, id1), 1); + + ASSERT_EQ(compareLedgerAndEntryId(id1, id4), -1); + ASSERT_EQ(compareLedgerAndEntryId(id4, id1), 1); + + ASSERT_EQ(compareLedgerAndEntryId(id2, id4), -1); + ASSERT_EQ(compareLedgerAndEntryId(id4, id2), 1); + + ASSERT_EQ(compareLedgerAndEntryId(id3, id4), -1); + ASSERT_EQ(compareLedgerAndEntryId(id4, id3), 1); +} diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc index d95038b3cef3c..8cd535caf7f5e 100644 --- a/pulsar-client-cpp/tests/ReaderTest.cc +++ b/pulsar-client-cpp/tests/ReaderTest.cc @@ -26,6 +26,7 @@ #include #include +#include #include DECLARE_LOG_OBJECT() @@ -577,3 +578,47 @@ TEST(ReaderTest, testIsConnected) { ASSERT_EQ(ResultOk, reader.close()); ASSERT_FALSE(reader.isConnected()); } + +TEST(ReaderTest, testHasMessageAvailableWhenCreated) { + const std::string topic = "testHasMessageAvailableWhenCreated-" + std::to_string(time(nullptr)); + Client client(serviceUrl); + + ProducerConfiguration producerConf; + producerConf.setBatchingMaxMessages(3); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer)); + + std::vector messageIds; + constexpr int numMessages = 7; + Latch latch(numMessages); + for (int i = 0; i < numMessages; i++) { + producer.sendAsync(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), + [i, &messageIds, &latch](Result result, const MessageId& messageId) { + if (result == ResultOk) { + LOG_INFO("Send " << i << " to " << messageId); + messageIds.emplace_back(messageId); + } else { + LOG_ERROR("Failed to send " << i << ": " << messageId); + } + latch.countdown(); + }); + } + latch.wait(std::chrono::seconds(3)); + ASSERT_EQ(messageIds.size(), numMessages); + + Reader reader; + bool hasMessageAvailable; + + for (size_t i = 0; i < messageIds.size() - 1; i++) { + ASSERT_EQ(ResultOk, client.createReader(topic, messageIds[i], {}, reader)); + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + EXPECT_TRUE(hasMessageAvailable); + } + + // The start message ID is exclusive by default, so when we start at the last message, there should be no + // message available. + ASSERT_EQ(ResultOk, client.createReader(topic, messageIds.back(), {}, reader)); + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + EXPECT_FALSE(hasMessageAvailable); + client.close(); +}