Skip to content

Commit

Permalink
[C++] Fix potential crash caused by UnAckedMessageTrackerEnabled's ti…
Browse files Browse the repository at this point in the history
…mer(issue like #apache#8519)

- pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up `msgId`, not <= `msgId` in  `UnAckedMessageTrackerEnabled::removeMessagesTill`
- potential crash caused by UnAckedMessageTrackerEnabled's timer(see issue like apache#8519)

- When do AcknowledgeCumulative from application, earse <= `msgId` in UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged messages to Broker
- Use std::shared_ptr instead of std::unique_ptr for UnAckedMessageTrackerEnabled
- add `start()`, `close()` method to `UnAckedMessageTrackerEnabled` solve same issue see apache#8519
- add `isEmpty()`, `size()` method to `UnAckedMessageTrackerEnabled` for checking of test case
- when close `UnAckedMessageTrackerEnabled` and `AckGroupingTrackerEnabled`, reset shared_ptr `timer_`
- add unit test for `UnAckedMessageTrackerEnabled`
[C++] Fix potential crash caused by UnAckedMessageTrackerEnabled's timer(issue like #apache#8519)

- pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up `msgId`, not <= `msgId` in  `UnAckedMessageTrackerEnabled::removeMessagesTill`
- potential crash caused by UnAckedMessageTrackerEnabled's timer(see issue like apache#8519)

- When do AcknowledgeCumulative from application, earse <= `msgId` in UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged messages to Broker
- Use std::shared_ptr instead of std::unique_ptr for UnAckedMessageTrackerEnabled
- add `start()`, `close()` method to `UnAckedMessageTrackerEnabled` solve same issue see apache#8519
- add `isEmpty()`, `size()` method to `UnAckedMessageTrackerEnabled` for checking of test case
- when close `UnAckedMessageTrackerEnabled` and `AckGroupingTrackerEnabled`, reset shared_ptr `timer_`
- add unit test for `UnAckedMessageTrackerEnabled`
  • Loading branch information
saosir committed Nov 30, 2020
1 parent c3217ef commit 6f4ff1b
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 20 deletions.
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void AckGroupingTrackerEnabled::close() {
if (this->timer_) {
boost::system::error_code ec;
this->timer_->cancel(ec);
this->timer_.reset();
}
}

Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ void ConsumerImpl::start() {
LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
}
ackGroupingTrackerPtr_->start();
unAckedMessageTrackerPtr_->start();
}

void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Expand Down Expand Up @@ -890,6 +891,10 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
ackGroupingTrackerPtr_->close();
}

if (unAckedMessageTrackerPtr_) {
unAckedMessageTrackerPtr_->close();
}

ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
state_ = Closed;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class ConsumerImpl : public ConsumerImplBase,
bool messageListenerRunning_;
std::mutex messageListenerMutex_;
CompressionCodecProvider compressionCodecProvider_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ void MultiTopicsConsumerImpl::start() {
shared_from_this(), std::placeholders::_1,
std::placeholders::_2, *itr, topicsNeedCreate));
}

unAckedMessageTrackerPtr_->start();
}

void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer,
Expand Down Expand Up @@ -400,6 +402,10 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
callback));
}

if (unAckedMessageTrackerPtr_) {
unAckedMessageTrackerPtr_->close();
}

// fail pending recieve
failPendingReceiveCallback();
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
ExecutorServicePtr listenerExecutor_;
MessageListener messageListener_;
Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string>& topics_;
std::queue<ReceiveCallback> pendingReceives_;

Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ void PartitionedConsumerImpl::start() {
consumer++) {
(*consumer)->start();
}

unAckedMessageTrackerPtr_->start();
}

void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
Expand Down Expand Up @@ -370,6 +372,10 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
}
}
}

if (unAckedMessageTrackerPtr_) {
unAckedMessageTrackerPtr_->close();
}

// fail pending recieve
failPendingReceiveCallback();
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
void failPendingReceiveCallback();
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
std::queue<ReceiveCallback> pendingReceives_;
void runPartitionUpdateTask();
void getPartitionMetadata();
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class UnAckedMessageTrackerDisabled : public UnAckedMessageTrackerInterface {
void removeMessagesTill(const MessageId& msgId) {}
void removeTopicMessage(const std::string& topic) {}

bool isEmpty() { return true; };
long size() { return 0; };
void clear() {}
};
} // namespace pulsar
Expand Down
24 changes: 15 additions & 9 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ DECLARE_LOG_OBJECT();

namespace pulsar {

void UnAckedMessageTrackerEnabled::timeoutHandler() {
timeoutHandlerHelper();
void UnAckedMessageTrackerEnabled::scheduleTimer() {
std::lock_guard<std::mutex> acquire(mutexTimer_);
ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
timer_ = executorService->createDeadlineTimer();
timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_));
timer_->async_wait([&](const boost::system::error_code& ec) {
auto self = shared_from_this();
timer_->async_wait([this, self](const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
} else {
timeoutHandler();
this->timeoutHandlerHelper();
this->scheduleTimer();
}
});
}
Expand Down Expand Up @@ -86,8 +88,6 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long
std::set<MessageId> msgIds;
timePartitions.push_back(msgIds);
}

timeoutHandler();
}

bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
Expand Down Expand Up @@ -158,9 +158,15 @@ void UnAckedMessageTrackerEnabled::clear() {
}
}

UnAckedMessageTrackerEnabled::~UnAckedMessageTrackerEnabled() {
if (timer_) {
timer_->cancel();
void UnAckedMessageTrackerEnabled::start() {
this->scheduleTimer();
}

void UnAckedMessageTrackerEnabled::close() {
std::lock_guard<std::mutex> acquire(mutexTimer_);
if (this->timer_) {
boost::system::error_code ec;
this->timer_.reset();
}
}
} /* namespace pulsar */
14 changes: 8 additions & 6 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,27 @@
namespace pulsar {
class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
public:
~UnAckedMessageTrackerEnabled();
virtual ~UnAckedMessageTrackerEnabled() { this->close(); }
UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&);
UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&);
bool add(const MessageId& m);
bool remove(const MessageId& m);
void removeMessagesTill(const MessageId& msgId);
void removeTopicMessage(const std::string& topic);
void timeoutHandler();

void start() override;
void close() override;
bool isEmpty() override;
long size() override;
void clear();

private:
protected:
void scheduleTimer();
void timeoutHandlerHelper();
bool isEmpty();
long size();
std::map<MessageId, std::set<MessageId>&> messageIdPartitionMap;
std::deque<std::set<MessageId>> timePartitions;
std::mutex lock_;
DeadlineTimerPtr timer_;
std::mutex mutexTimer_;
ConsumerImplBase& consumerReference_;
ClientImplPtr client_;
long timeoutMs_;
Expand Down
8 changes: 6 additions & 2 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,23 @@
#include <boost/asio/error.hpp>
namespace pulsar {

class UnAckedMessageTrackerInterface {
class UnAckedMessageTrackerInterface : public std::enable_shared_from_this<UnAckedMessageTrackerInterface> {
public:
virtual ~UnAckedMessageTrackerInterface() {}
UnAckedMessageTrackerInterface() {}
virtual bool add(const MessageId& m) = 0;
virtual bool remove(const MessageId& m) = 0;
virtual void removeMessagesTill(const MessageId& msgId) = 0;
virtual void start() {}
virtual void close() {}
virtual bool isEmpty() = 0;
virtual long size() = 0;
virtual void clear() = 0;
// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's
// message.
virtual void removeTopicMessage(const std::string& topic) = 0;
};

typedef std::unique_ptr<UnAckedMessageTrackerInterface> UnAckedMessageTrackerScopedPtr;
using UnAckedMessageTrackerPtr = std::shared_ptr<UnAckedMessageTrackerInterface>;
} // namespace pulsar
#endif /* LIB_UNACKEDMESSAGETRACKERINTERFACE_H_ */
Loading

0 comments on commit 6f4ff1b

Please sign in to comment.