Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[C++] Fix potential crash caused by AckGroupTracker's timer (apache#8519
Browse files Browse the repository at this point in the history
)

### Motivation

The `AckGroupingTrackerEnabled`'s timer callback only captures `this`, which is a weak reference to the `AckGroupingTrackerEnabled ` instance. If the instance went out of the scope and destroyed, `this` would point to an invalid block.

Even if the destructor of `AckGroupingTrackerEnabled` cancels the timer, the callback may not be triggered immediately. There's still a possibility that when the callback is triggered, the error code is 0 but accessing to `this` is invalid. For example, there's a crash caused by the callback in production environment that is hard to reproduce:

```
#6 <signal handler called>
#7 0x00007fb4e67c5cb8 in ?? ()
#8 0x00007fb604981adb in operator() (ec=..., __closure=0x7fb52b0fb230)
   at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc:148
#9 operator() (this=0x7fb52b0fb230) at /usr/local/include/boost/asio/detail/bind_handler.hpp:47
```

### Modifications

- Use `std::shared_ptr` instead of `std::unique_ptr` for `AckGroupingTrackerEnabled`, then capture the shared pointer in timer callback's lambda expression to extend the lifetime of `this`.
- Add `start()` method to `AckGroupingTracker` to avoid `std::bad_weak_ptr` because `shared_from_this()` in a constructor returns a null pointer.
- Use `std::weak_ptr` to reference `HandlerBase` in case that the handler may be invalid when the timer callback is triggered.

(cherry picked from commit cfa65d0)
(cherry picked from commit 98591c4)
  • Loading branch information
BewareMyPower authored and codelipenghui committed Nov 13, 2020
1 parent 10966e6 commit 924471b
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 64 deletions.
9 changes: 7 additions & 2 deletions pulsar-client-cpp/lib/AckGroupingTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ namespace pulsar {
* Default ACK grouping tracker, it actually neither tracks ACK requests nor sends them to brokers.
* It can be directly used by consumers for non-persistent topics.
*/
class AckGroupingTracker {
class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
public:
AckGroupingTracker() = default;
virtual ~AckGroupingTracker() = default;

/**
* Start tracking the ACK requests.
*/
virtual void start() {}

/**
* Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to
* discard messages that are being resent after a disconnection and for which the user has
Expand Down Expand Up @@ -102,7 +107,7 @@ class AckGroupingTracker {
const std::set<MessageId>& msgIds);
}; // class AckGroupingTracker

using AckGroupingTrackerScopedPtr = std::unique_ptr<AckGroupingTracker>;
using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;

} // namespace pulsar
#endif /* LIB_ACKGROUPINGTRACKER_H_ */
21 changes: 14 additions & 7 deletions pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ namespace pulsar {

DECLARE_LOG_OBJECT();

AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler,
uint64_t consumerId, long ackGroupingTimeMs,
long ackGroupingMaxSize)
AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
const HandlerBasePtr& handlerPtr, uint64_t consumerId,
long ackGroupingTimeMs, long ackGroupingMaxSize)
: AckGroupingTracker(),
handler_(handler),
handlerWeakPtr_(handlerPtr),
consumerId_(consumerId),
nextCumulativeAckMsgId_(MessageId::earliest()),
requireCumulativeAck_(false),
Expand All @@ -51,9 +51,10 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, Ha
mutexTimer_() {
LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs << "ms, grouping max size "
<< ackGroupingMaxSize);
this->scheduleTimer();
}

void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); }

bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
{
// Check if the message ID is already ACKed by a previous (or pending) cumulative request.
Expand Down Expand Up @@ -94,7 +95,12 @@ void AckGroupingTrackerEnabled::close() {
}

void AckGroupingTrackerEnabled::flush() {
auto cnx = this->handler_.getCnx().lock();
auto handler = handlerWeakPtr_.lock();
if (!handler) {
LOG_WARN("Reference to the HandlerBase is not valid.");
return;
}
auto cnx = handler->getCnx().lock();
if (cnx == nullptr) {
LOG_DEBUG("Connection is not ready, grouping ACK failed.");
return;
Expand Down Expand Up @@ -143,7 +149,8 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
this->timer_->async_wait([this](const boost::system::error_code& ec) -> void {
auto self = shared_from_this();
this->timer_->async_wait([this, self](const boost::system::error_code& ec) -> void {
if (!ec) {
this->flush();
this->scheduleTimer();
Expand Down
7 changes: 4 additions & 3 deletions pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
/**
* Constructing ACK grouping tracker for peresistent topics.
* @param[in] clientPtr pointer to client object.
* @param[in] handler the connection handler.
* @param[in] handlerPtr the shared pointer to connection handler.
* @param[in] consumerId consumer ID that this tracker belongs to.
* @param[in] ackGroupingTimeMs ACK grouping time window in milliseconds.
* @param[in] ackGroupingMaxSize max. number of ACK requests can be grouped.
*/
AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler, uint64_t consumerId,
AckGroupingTrackerEnabled(ClientImplPtr clientPtr, const HandlerBasePtr& handlerPtr, uint64_t consumerId,
long ackGroupingTimeMs, long ackGroupingMaxSize);

void start() override;
bool isDuplicate(const MessageId& msgId) override;
void addAcknowledge(const MessageId& msgId) override;
void addAcknowledgeCumulative(const MessageId& msgId) override;
Expand All @@ -62,7 +63,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
void scheduleTimer();

//! The connection handler.
HandlerBase& handler_;
HandlerBaseWeakPtr handlerWeakPtr_;

//! ID of the consumer that this tracker belongs to.
uint64_t consumerId_;
Expand Down
41 changes: 22 additions & 19 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
brokerConsumerStats_(),
consumerStatsBasePtr_(),
negativeAcksTracker_(client, *this, conf),
ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
msgCrypto_(),
readCompacted_(conf.isReadCompacted()),
lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
Expand Down Expand Up @@ -102,23 +103,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
if (conf.isEncryptionEnabled()) {
msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false);
}

// Initialize ACK grouping tracker.
if (TopicName::get(topic)->isPersistent()) {
// Persistent topic, ACK requests need to be sent to broker.
if (conf.getAckGroupingTimeMs() > 0) {
// Grouping ACK is ENABLED because grouping time value is a positive value.
this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
client, *this, this->consumerId_, conf.getAckGroupingTimeMs(), conf.getAckGroupingMaxSize()));
} else {
// Grouping ACK is DISABLED because grouping time value is a non-positive value.
this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, this->consumerId_));
}
} else {
// Non-persistent topic, ACK requests do NOT need to be sent to broker.
LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
this->ackGroupingTrackerPtr_.reset(new AckGroupingTracker());
}
}

ConsumerImpl::~ConsumerImpl() {
Expand All @@ -143,7 +127,24 @@ const std::string& ConsumerImpl::getSubscriptionName() const { return originalSu

const std::string& ConsumerImpl::getTopic() const { return topic_; }

void ConsumerImpl::start() { grabCnx(); }
void ConsumerImpl::start() {
HandlerBase::start();

// Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the
// constructor completed.
if (TopicName::get(topic_)->isPersistent()) {
if (config_.getAckGroupingTimeMs() > 0) {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(),
config_.getAckGroupingMaxSize()));
} else {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_));
}
} else {
LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
}
ackGroupingTrackerPtr_->start();
}

void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lock(mutex_);
Expand Down Expand Up @@ -871,7 +872,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
state_ = Closing;

// Flush pending grouped ACK requests.
this->ackGroupingTrackerPtr_->close();
if (ackGroupingTrackerPtr_) {
ackGroupingTrackerPtr_->close();
}

ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class ConsumerImpl : public ConsumerImplBase,
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
AckGroupingTrackerScopedPtr ackGroupingTrackerPtr_;
AckGroupingTrackerPtr ackGroupingTrackerPtr_;

MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
Expand Down
70 changes: 38 additions & 32 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3513,6 +3513,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerSingleAckBehavior) {

// Send ACK.
AckGroupingTrackerMock tracker(false);
tracker.start();
for (auto msgIdx = 0; msgIdx < numMsg; ++msgIdx) {
auto connPtr = connWeakPtr.lock();
ASSERT_NE(connPtr, nullptr);
Expand Down Expand Up @@ -3563,6 +3564,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerMultiAckBehavior) {

// Send ACK.
AckGroupingTrackerMock tracker(false);
tracker.start();
std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
ASSERT_EQ(restMsgId.size(), numMsg);
ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr, consumerImpl.getConsumerId(), restMsgId));
Expand Down Expand Up @@ -3669,9 +3671,10 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerDisabledCumulativeAck) {

class AckGroupingTrackerEnabledMock : public AckGroupingTrackerEnabled {
public:
AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, HandlerBase &handler, uint64_t consumerId,
long ackGroupingTimeMs, long ackGroupingMaxSize)
: AckGroupingTrackerEnabled(clientPtr, handler, consumerId, ackGroupingTimeMs, ackGroupingMaxSize) {}
AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, const HandlerBasePtr &handlerPtr,
uint64_t consumerId, long ackGroupingTimeMs, long ackGroupingMaxSize)
: AckGroupingTrackerEnabled(clientPtr, handlerPtr, consumerId, ackGroupingTimeMs,
ackGroupingMaxSize) {}
const std::set<MessageId> &getPendingIndividualAcks() { return this->pendingIndividualAcks_; }
const long getAckGroupingTimeMs() { return this->ackGroupingTimeMs_; }
const long getAckGroupingMaxSize() { return this->ackGroupingMaxSize_; }
Expand All @@ -3698,7 +3701,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
auto consumerImpl = PulsarFriend::getConsumerImplPtr(consumer);

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Expand All @@ -3713,22 +3716,23 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {
recvMsgId.emplace_back(msg.getMessageId());
}

AckGroupingTrackerEnabledMock tracker(clientImplPtr, consumerImpl, consumerImpl.getConsumerId(),
ackGroupingTimeMs, ackGroupingMaxSize);
ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
ASSERT_EQ(tracker.getAckGroupingTimeMs(), ackGroupingTimeMs);
ASSERT_EQ(tracker.getAckGroupingMaxSize(), ackGroupingMaxSize);
auto tracker = std::make_shared<AckGroupingTrackerEnabledMock>(
clientImplPtr, consumerImpl, consumerImpl->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
tracker->start();
ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
ASSERT_EQ(tracker->getAckGroupingTimeMs(), ackGroupingTimeMs);
ASSERT_EQ(tracker->getAckGroupingMaxSize(), ackGroupingMaxSize);
for (auto &msgId : recvMsgId) {
ASSERT_FALSE(tracker.isDuplicate(msgId));
tracker.addAcknowledge(msgId);
ASSERT_TRUE(tracker.isDuplicate(msgId));
ASSERT_FALSE(tracker->isDuplicate(msgId));
tracker->addAcknowledge(msgId);
ASSERT_TRUE(tracker->isDuplicate(msgId));
}
ASSERT_EQ(tracker.getPendingIndividualAcks().size(), recvMsgId.size());
ASSERT_EQ(tracker->getPendingIndividualAcks().size(), recvMsgId.size());

std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
for (auto &msgId : recvMsgId) {
ASSERT_FALSE(tracker.isDuplicate(msgId));
ASSERT_FALSE(tracker->isDuplicate(msgId));
}
consumer.close();

Expand Down Expand Up @@ -3757,7 +3761,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);
auto consumerImpl0 = PulsarFriend::getConsumerImplPtr(consumer);

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Expand All @@ -3773,32 +3777,33 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
}
std::sort(recvMsgId.begin(), recvMsgId.end());

AckGroupingTrackerEnabledMock tracker0(clientImplPtr, consumerImpl0, consumerImpl0.getConsumerId(),
ackGroupingTimeMs, ackGroupingMaxSize);
ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), MessageId::earliest());
ASSERT_FALSE(tracker0.requireCumulativeAck());
auto tracker0 = std::make_shared<AckGroupingTrackerEnabledMock>(
clientImplPtr, consumerImpl0, consumerImpl0->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
tracker0->start();
ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), MessageId::earliest());
ASSERT_FALSE(tracker0->requireCumulativeAck());

auto targetMsgId = recvMsgId[numMsg / 2];
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
ASSERT_FALSE(tracker0.isDuplicate(recvMsgId[idx]));
ASSERT_FALSE(tracker0->isDuplicate(recvMsgId[idx]));
}
tracker0.addAcknowledgeCumulative(targetMsgId);
tracker0->addAcknowledgeCumulative(targetMsgId);
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
}
ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), targetMsgId);
ASSERT_TRUE(tracker0.requireCumulativeAck());
ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), targetMsgId);
ASSERT_TRUE(tracker0->requireCumulativeAck());

std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_FALSE(tracker0.requireCumulativeAck());
ASSERT_FALSE(tracker0->requireCumulativeAck());
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
}
consumer.close();

std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer);
auto consumerImpl1 = PulsarFriend::getConsumerImplPtr(consumer);
std::set<MessageId> restMsgId(recvMsgId.begin() + numMsg / 2 + 1, recvMsgId.end());
for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
Message msg;
Expand All @@ -3808,10 +3813,11 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << msg.getDataAsString();
AckGroupingTrackerEnabledMock tracker1(clientImplPtr, consumerImpl1, consumerImpl1.getConsumerId(),
ackGroupingTimeMs, ackGroupingMaxSize);
tracker1.addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
tracker1.close();
auto tracker1 = std::make_shared<AckGroupingTrackerEnabledMock>(
clientImplPtr, consumerImpl1, consumerImpl1->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
tracker1->start();
tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
tracker1->close();
consumer.close();

ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ class PulsarFriend {
return *consumerImpl;
}

static std::shared_ptr<ConsumerImpl> getConsumerImplPtr(Consumer consumer) {
return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
}

static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }

static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
Expand Down

0 comments on commit 924471b

Please sign in to comment.