Skip to content

Commit

Permalink
[C++] Fix hasMessageAvailable returns wrong value for last message (#…
Browse files Browse the repository at this point in the history
…13883)

### Motivation

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

### Modifications

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

### Verifying this change

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.
  • Loading branch information
BewareMyPower authored Jan 24, 2022
1 parent 56c8764 commit e50493e
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 71 deletions.
23 changes: 14 additions & 9 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "ProducerImpl.h"
#include "ConsumerImpl.h"
#include "checksum/ChecksumProvider.h"
#include "MessageIdUtil.h"

DECLARE_LOG_OBJECT()

Expand Down Expand Up @@ -1072,7 +1073,7 @@ void ClientConnection::handleIncomingCommand() {
PendingGetLastMessageIdRequestsMap::iterator it =
pendingGetLastMessageIdRequests_.find(error.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
Promise<Result, MessageId> getLastMessageIdPromise = it->second;
auto getLastMessageIdPromise = it->second;
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();

Expand Down Expand Up @@ -1191,15 +1192,18 @@ void ClientConnection::handleIncomingCommand() {
pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());

if (it != pendingGetLastMessageIdRequests_.end()) {
Promise<Result, MessageId> getLastMessageIdPromise = it->second;
auto getLastMessageIdPromise = it->second;
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();

MessageIdData messageIdData = getLastMessageIdResponse.last_message_id();
MessageId messageId = MessageId(messageIdData.partition(), messageIdData.ledgerid(),
messageIdData.entryid(), messageIdData.batch_index());

getLastMessageIdPromise.setValue(messageId);
if (getLastMessageIdResponse.has_consumer_mark_delete_position()) {
getLastMessageIdPromise.setValue(
{toMessageId(getLastMessageIdResponse.last_message_id()),
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
} else {
getLastMessageIdPromise.setValue(
{toMessageId(getLastMessageIdResponse.last_message_id())});
}
} else {
lock.unlock();
LOG_WARN(
Expand Down Expand Up @@ -1610,9 +1614,10 @@ Commands::ChecksumType ClientConnection::getChecksumType() const {
return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : Commands::None;
}

Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consumerId, uint64_t requestId) {
Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(uint64_t consumerId,
uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, MessageId> promise;
Promise<Result, GetLastMessageIdResponse> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <set>
#include <lib/BrokerConsumerStatsImpl.h>
#include "lib/PeriodicTask.h"
#include "lib/GetLastMessageIdResponse.h"

using namespace pulsar;

Expand Down Expand Up @@ -156,7 +157,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

Future<Result, BrokerConsumerStatsImpl> newConsumerStats(uint64_t consumerId, uint64_t requestId);

Future<Result, MessageId> newGetLastMessageId(uint64_t consumerId, uint64_t requestId);
Future<Result, GetLastMessageIdResponse> newGetLastMessageId(uint64_t consumerId, uint64_t requestId);

Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId);

Expand Down Expand Up @@ -306,7 +307,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap;
PendingConsumerStatsMap pendingConsumerStatsMap_;

typedef std::map<long, Promise<Result, MessageId>> PendingGetLastMessageIdRequestsMap;
typedef std::map<long, Promise<Result, GetLastMessageIdResponse>> PendingGetLastMessageIdRequestsMap;
PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;

typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
Expand Down
112 changes: 68 additions & 44 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "pulsar/Result.h"
#include "pulsar/MessageId.h"
#include "Utils.h"
#include "MessageIdUtil.h"
#include "AckGroupingTracker.h"
#include "AckGroupingTrackerEnabled.h"
#include "AckGroupingTrackerDisabled.h"
Expand All @@ -51,7 +52,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
hasParent_(hasParent),
consumerTopicType_(consumerTopicType),
subscriptionMode_(subscriptionMode),
startMessageId_(startMessageId),
// This is the initial capacity of the queue
incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)),
availablePermits_(0),
Expand All @@ -63,7 +63,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
negativeAcksTracker_(client, *this, conf),
ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
readCompacted_(conf.isReadCompacted()),
lastMessageInBroker_(Optional<MessageId>::of(MessageId())),
startMessageId_(startMessageId),
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoOldestChunkedMessageOnQueueFull()) {
std::stringstream consumerStrStream;
Expand Down Expand Up @@ -161,8 +161,9 @@ void ConsumerImpl::start() {

void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lock(mutex_);
if (state_ == Closed) {
lock.unlock();
const auto state = state_;
lock.unlock();
if (state == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
return;
}
Expand All @@ -171,23 +172,24 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
// sending the subscribe request.
cnx->registerConsumer(consumerId_, shared_from_this());

Lock lockForMessageId(mutexForMessageId_);
Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
unAckedMessageTrackerPtr_->clear();
batchAcknowledgementTracker_.clear();

if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
// Update startMessageId so that we can discard messages after delivery
// restarts
startMessageId_ = firstMessageInQueue;
}
const auto startMessageId = startMessageId_;
lockForMessageId.unlock();

lock.unlock();
unAckedMessageTrackerPtr_->clear();
batchAcknowledgementTracker_.clear();

ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(),
startMessageId, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(),
config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy(),
config_.getPriorityLevel());
cnx->sendRequestWithId(cmd, requestId)
Expand Down Expand Up @@ -538,6 +540,9 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
batchAcknowledgementTracker_.receivedMessage(batchedMessage);
LOG_DEBUG("Received Batch messages of size - " << batchSize
<< " -- msgId: " << batchedMessage.getMessageId());
Lock lock(mutexForMessageId_);
const auto startMessageId = startMessageId_;
lock.unlock();

int skippedMessages = 0;

Expand All @@ -547,14 +552,14 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
msg.impl_->setRedeliveryCount(redeliveryCount);
msg.impl_->setTopicName(batchedMessage.getTopicName());

if (startMessageId_.is_present()) {
if (startMessageId.is_present()) {
const MessageId& msgId = msg.getMessageId();

// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (msgId.ledgerId() == startMessageId_.value().ledgerId() &&
msgId.entryId() == startMessageId_.value().entryId() &&
msgId.batchIndex() <= startMessageId_.value().batchIndex()) {
if (msgId.ledgerId() == startMessageId.value().ledgerId() &&
msgId.entryId() == startMessageId.value().entryId() &&
msgId.batchIndex() <= startMessageId.value().batchIndex()) {
LOG_DEBUG(getName() << "Ignoring message from before the startMessageId"
<< msg.getMessageId());
++skippedMessages;
Expand Down Expand Up @@ -686,7 +691,7 @@ void ConsumerImpl::internalListener() {
trackMessage(msg.getMessageId());
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
lastDequedMessageId_ = msg.getMessageId();
messageListener_(Consumer(shared_from_this()), msg);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
Expand Down Expand Up @@ -820,8 +825,9 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
}

void ConsumerImpl::messageProcessed(Message& msg, bool track) {
Lock lock(mutex_);
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
Lock lock(mutexForMessageId_);
lastDequedMessageId_ = msg.getMessageId();
lock.unlock();

ClientConnectionPtr currentCnx = getCnx().lock();
if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
Expand Down Expand Up @@ -853,11 +859,11 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId() - 1, -1);
}
return Optional<MessageId>::of(previousMessageId);
} else if (lastDequedMessage_.is_present()) {
} else if (lastDequedMessageId_ != MessageId::earliest()) {
// If the queue was empty we need to restart from the message just after the last one that has been
// dequeued
// in the past
return lastDequedMessage_;
return Optional<MessageId>::of(lastDequedMessageId_);
} else {
// No message was received or dequeued by this consumer. Next message would still be the
// startMessageId
Expand Down Expand Up @@ -1193,6 +1199,9 @@ void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsIm

void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
if (result == ResultOk) {
Lock lock(mutexForMessageId_);
lastDequedMessageId_ = MessageId::earliest();
lock.unlock();
LOG_INFO(getName() << "Seek successfully");
} else {
LOG_ERROR(getName() << "Failed to seek: " << strResult(result));
Expand Down Expand Up @@ -1267,37 +1276,42 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {

bool ConsumerImpl::isReadCompacted() { return readCompacted_; }

inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const MessageId& messageId) {
return lastMessageIdInBroker > messageId && lastMessageIdInBroker.entryId() != -1;
}

void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
MessageId lastDequed = this->lastMessageIdDequed();
MessageId lastInBroker = this->lastMessageIdInBroker();
if (lastInBroker > lastDequed && lastInBroker.entryId() != -1) {
callback(ResultOk, true);
return;
}
Lock lock(mutexForMessageId_);
const auto messageId =
(lastDequedMessageId_ == MessageId::earliest()) ? startMessageId_.value() : lastDequedMessageId_;

getLastMessageIdAsync([lastDequed, callback](Result result, MessageId messageId) {
if (result == ResultOk) {
if (messageId > lastDequed && messageId.entryId() != -1) {
callback(ResultOk, true);
if (messageId == MessageId::latest()) {
lock.unlock();
getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {
if (result != ResultOk) {
callback(result, {});
return;
}
if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
// We only care about comparing ledger ids and entry ids as mark delete position doesn't have
// other ids such as batch index
callback(ResultOk, compareLedgerAndEntryId(response.getMarkDeletePosition(),
response.getLastMessageId()) < 0);
} else {
callback(ResultOk, false);
}
} else {
callback(result, false);
}
});
}

void ConsumerImpl::brokerGetLastMessageIdListener(Result res, MessageId messageId,
BrokerGetLastMessageIdCallback callback) {
Lock lock(mutex_);
if (messageId > lastMessageIdInBroker()) {
lastMessageInBroker_ = Optional<MessageId>::of(messageId);
lock.unlock();
callback(res, messageId);
});
} else {
if (hasMoreMessages(lastMessageIdInBroker_, messageId)) {
lock.unlock();
callback(ResultOk, true);
return;
}
lock.unlock();
callback(res, lastMessageIdInBroker());

getLastMessageIdAsync([callback, messageId](Result result, const GetLastMessageIdResponse& response) {
callback(result, (result == ResultOk) && hasMoreMessages(response.getLastMessageId(), messageId));
});
}
}

Expand All @@ -1321,9 +1335,19 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId()
<< ", requestId - " << requestId);

auto self = shared_from_this();
cnx->newGetLastMessageId(consumerId_, requestId)
.addListener(std::bind(&ConsumerImpl::brokerGetLastMessageIdListener, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, callback));
.addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "getLastMessageId: " << response);
Lock lock(mutexForMessageId_);
lastMessageIdInBroker_ = response.getLastMessageId();
lock.unlock();
} else {
LOG_ERROR(getName() << "Failed to getLastMessageId: " << result);
}
callback(result, response);
});
} else {
LOG_ERROR(getName() << " Operation not supported since server protobuf version "
<< cnx->getServerProtocolVersion() << " is older than proto::v12");
Expand Down
23 changes: 8 additions & 15 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "lib/UnAckedMessageTrackerDisabled.h"
#include "MessageCrypto.h"
#include "AckGroupingTracker.h"
#include "GetLastMessageIdResponse.h"

#include "CompressionCodec.h"
#include <boost/dynamic_bitset.hpp>
Expand All @@ -54,7 +55,7 @@ class ExecutorService;
class ConsumerImpl;
class BatchAcknowledgementTracker;
typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
typedef std::function<void(Result result, MessageId messageId)> BrokerGetLastMessageIdCallback;
typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;

enum ConsumerTopicType
{
Expand Down Expand Up @@ -193,10 +194,8 @@ class ConsumerImpl : public ConsumerImplBase,
bool hasParent_;
ConsumerTopicType consumerTopicType_;

Commands::SubscriptionMode subscriptionMode_;
Optional<MessageId> startMessageId_;
const Commands::SubscriptionMode subscriptionMode_;

Optional<MessageId> lastDequedMessage_;
UnboundedBlockingQueue<Message> incomingMessages_;
std::queue<ReceiveCallback> pendingReceives_;
std::atomic_int availablePermits_;
Expand All @@ -217,17 +216,11 @@ class ConsumerImpl : public ConsumerImplBase,
MessageCryptoPtr msgCrypto_;
const bool readCompacted_;

Optional<MessageId> lastMessageInBroker_;
void brokerGetLastMessageIdListener(Result res, MessageId messageId,
BrokerGetLastMessageIdCallback callback);

const MessageId& lastMessageIdDequed() {
return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : MessageId::earliest();
}

const MessageId& lastMessageIdInBroker() {
return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId::earliest();
}
// Make the access to `startMessageId_`, `lastDequedMessageId_` and `lastMessageIdInBroker_` thread safe
mutable std::mutex mutexForMessageId_;
Optional<MessageId> startMessageId_;
MessageId lastDequedMessageId_{MessageId::earliest()};
MessageId lastMessageIdInBroker_{MessageId::earliest()};

class ChunkedMessageCtx {
public:
Expand Down
Loading

0 comments on commit e50493e

Please sign in to comment.