diff --git a/pulsar-client-cpp/lib/AckGroupingTracker.h b/pulsar-client-cpp/lib/AckGroupingTracker.h index a5b3d66cf7609..f4410e45fab7c 100644 --- a/pulsar-client-cpp/lib/AckGroupingTracker.h +++ b/pulsar-client-cpp/lib/AckGroupingTracker.h @@ -35,11 +35,16 @@ namespace pulsar { * Default ACK grouping tracker, it actually neither tracks ACK requests nor sends them to brokers. * It can be directly used by consumers for non-persistent topics. */ -class AckGroupingTracker { +class AckGroupingTracker : public std::enable_shared_from_this { public: AckGroupingTracker() = default; virtual ~AckGroupingTracker() = default; + /** + * Start tracking the ACK requests. + */ + virtual void start() {} + /** * Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to * discard messages that are being resent after a disconnection and for which the user has @@ -102,7 +107,7 @@ class AckGroupingTracker { const std::set& msgIds); }; // class AckGroupingTracker -using AckGroupingTrackerScopedPtr = std::unique_ptr; +using AckGroupingTrackerPtr = std::shared_ptr; } // namespace pulsar #endif /* LIB_ACKGROUPINGTRACKER_H_ */ diff --git a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc index 5c61ccaf16481..a651c7af4ea58 100644 --- a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc +++ b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc @@ -33,11 +33,11 @@ namespace pulsar { DECLARE_LOG_OBJECT(); -AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler, - uint64_t consumerId, long ackGroupingTimeMs, - long ackGroupingMaxSize) +AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, + const HandlerBasePtr& handlerPtr, uint64_t consumerId, + long ackGroupingTimeMs, long ackGroupingMaxSize) : AckGroupingTracker(), - handler_(handler), + handlerWeakPtr_(handlerPtr), consumerId_(consumerId), nextCumulativeAckMsgId_(MessageId::earliest()), requireCumulativeAck_(false), @@ -51,9 +51,10 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, Ha mutexTimer_() { LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs << "ms, grouping max size " << ackGroupingMaxSize); - this->scheduleTimer(); } +void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); } + bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) { { // Check if the message ID is already ACKed by a previous (or pending) cumulative request. @@ -94,7 +95,12 @@ void AckGroupingTrackerEnabled::close() { } void AckGroupingTrackerEnabled::flush() { - auto cnx = this->handler_.getCnx().lock(); + auto handler = handlerWeakPtr_.lock(); + if (!handler) { + LOG_WARN("Reference to the HandlerBase is not valid."); + return; + } + auto cnx = handler->getCnx().lock(); if (cnx == nullptr) { LOG_DEBUG("Connection is not ready, grouping ACK failed."); return; @@ -143,7 +149,8 @@ void AckGroupingTrackerEnabled::scheduleTimer() { std::lock_guard lock(this->mutexTimer_); this->timer_ = this->executor_->createDeadlineTimer(); this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); - this->timer_->async_wait([this](const boost::system::error_code& ec) -> void { + auto self = shared_from_this(); + this->timer_->async_wait([this, self](const boost::system::error_code& ec) -> void { if (!ec) { this->flush(); this->scheduleTimer(); diff --git a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h index 9fb871a251f0e..c3926aa492bf2 100644 --- a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h +++ b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h @@ -42,14 +42,15 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker { /** * Constructing ACK grouping tracker for peresistent topics. * @param[in] clientPtr pointer to client object. - * @param[in] handler the connection handler. + * @param[in] handlerPtr the shared pointer to connection handler. * @param[in] consumerId consumer ID that this tracker belongs to. * @param[in] ackGroupingTimeMs ACK grouping time window in milliseconds. * @param[in] ackGroupingMaxSize max. number of ACK requests can be grouped. */ - AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler, uint64_t consumerId, + AckGroupingTrackerEnabled(ClientImplPtr clientPtr, const HandlerBasePtr& handlerPtr, uint64_t consumerId, long ackGroupingTimeMs, long ackGroupingMaxSize); + void start() override; bool isDuplicate(const MessageId& msgId) override; void addAcknowledge(const MessageId& msgId) override; void addAcknowledgeCumulative(const MessageId& msgId) override; @@ -62,7 +63,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker { void scheduleTimer(); //! The connection handler. - HandlerBase& handler_; + HandlerBaseWeakPtr handlerWeakPtr_; //! ID of the consumer that this tracker belongs to. uint64_t consumerId_; diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index dedd9b2451a77..e85fbf1a82d10 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -62,6 +62,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, brokerConsumerStats_(), consumerStatsBasePtr_(), negativeAcksTracker_(client, *this, conf), + ackGroupingTrackerPtr_(std::make_shared()), msgCrypto_(), readCompacted_(conf.isReadCompacted()), lastMessageInBroker_(Optional::of(MessageId())) { @@ -102,23 +103,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, if (conf.isEncryptionEnabled()) { msgCrypto_ = std::make_shared(consumerStr_, false); } - - // Initialize ACK grouping tracker. - if (TopicName::get(topic)->isPersistent()) { - // Persistent topic, ACK requests need to be sent to broker. - if (conf.getAckGroupingTimeMs() > 0) { - // Grouping ACK is ENABLED because grouping time value is a positive value. - this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled( - client, *this, this->consumerId_, conf.getAckGroupingTimeMs(), conf.getAckGroupingMaxSize())); - } else { - // Grouping ACK is DISABLED because grouping time value is a non-positive value. - this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, this->consumerId_)); - } - } else { - // Non-persistent topic, ACK requests do NOT need to be sent to broker. - LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic."); - this->ackGroupingTrackerPtr_.reset(new AckGroupingTracker()); - } } ConsumerImpl::~ConsumerImpl() { @@ -143,7 +127,24 @@ const std::string& ConsumerImpl::getSubscriptionName() const { return originalSu const std::string& ConsumerImpl::getTopic() const { return topic_; } -void ConsumerImpl::start() { grabCnx(); } +void ConsumerImpl::start() { + HandlerBase::start(); + + // Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the + // constructor completed. + if (TopicName::get(topic_)->isPersistent()) { + if (config_.getAckGroupingTimeMs() > 0) { + ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled( + client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(), + config_.getAckGroupingMaxSize())); + } else { + ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_)); + } + } else { + LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic."); + } + ackGroupingTrackerPtr_->start(); +} void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { Lock lock(mutex_); @@ -871,7 +872,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { state_ = Closing; // Flush pending grouped ACK requests. - this->ackGroupingTrackerPtr_->close(); + if (ackGroupingTrackerPtr_) { + ackGroupingTrackerPtr_->close(); + } ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 9306a913702bf..e122a4ab10552 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -196,7 +196,7 @@ class ConsumerImpl : public ConsumerImplBase, BatchAcknowledgementTracker batchAcknowledgementTracker_; BrokerConsumerStatsImpl brokerConsumerStats_; NegativeAcksTracker negativeAcksTracker_; - AckGroupingTrackerScopedPtr ackGroupingTrackerPtr_; + AckGroupingTrackerPtr ackGroupingTrackerPtr_; MessageCryptoPtr msgCrypto_; const bool readCompacted_; diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index acad16a0f4791..e3ae2f2837682 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -3513,6 +3513,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerSingleAckBehavior) { // Send ACK. AckGroupingTrackerMock tracker(false); + tracker.start(); for (auto msgIdx = 0; msgIdx < numMsg; ++msgIdx) { auto connPtr = connWeakPtr.lock(); ASSERT_NE(connPtr, nullptr); @@ -3563,6 +3564,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerMultiAckBehavior) { // Send ACK. AckGroupingTrackerMock tracker(false); + tracker.start(); std::set restMsgId(recvMsgId.begin(), recvMsgId.end()); ASSERT_EQ(restMsgId.size(), numMsg); ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr, consumerImpl.getConsumerId(), restMsgId)); @@ -3669,9 +3671,10 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerDisabledCumulativeAck) { class AckGroupingTrackerEnabledMock : public AckGroupingTrackerEnabled { public: - AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, HandlerBase &handler, uint64_t consumerId, - long ackGroupingTimeMs, long ackGroupingMaxSize) - : AckGroupingTrackerEnabled(clientPtr, handler, consumerId, ackGroupingTimeMs, ackGroupingMaxSize) {} + AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, const HandlerBasePtr &handlerPtr, + uint64_t consumerId, long ackGroupingTimeMs, long ackGroupingMaxSize) + : AckGroupingTrackerEnabled(clientPtr, handlerPtr, consumerId, ackGroupingTimeMs, + ackGroupingMaxSize) {} const std::set &getPendingIndividualAcks() { return this->pendingIndividualAcks_; } const long getAckGroupingTimeMs() { return this->ackGroupingTimeMs_; } const long getAckGroupingMaxSize() { return this->ackGroupingMaxSize_; } @@ -3698,7 +3701,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) { Consumer consumer; ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer)); - auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer); + auto consumerImpl = PulsarFriend::getConsumerImplPtr(consumer); // Sending and receiving messages. for (auto count = 0; count < numMsg; ++count) { @@ -3713,22 +3716,23 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) { recvMsgId.emplace_back(msg.getMessageId()); } - AckGroupingTrackerEnabledMock tracker(clientImplPtr, consumerImpl, consumerImpl.getConsumerId(), - ackGroupingTimeMs, ackGroupingMaxSize); - ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0); - ASSERT_EQ(tracker.getAckGroupingTimeMs(), ackGroupingTimeMs); - ASSERT_EQ(tracker.getAckGroupingMaxSize(), ackGroupingMaxSize); + auto tracker = std::make_shared( + clientImplPtr, consumerImpl, consumerImpl->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize); + tracker->start(); + ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0); + ASSERT_EQ(tracker->getAckGroupingTimeMs(), ackGroupingTimeMs); + ASSERT_EQ(tracker->getAckGroupingMaxSize(), ackGroupingMaxSize); for (auto &msgId : recvMsgId) { - ASSERT_FALSE(tracker.isDuplicate(msgId)); - tracker.addAcknowledge(msgId); - ASSERT_TRUE(tracker.isDuplicate(msgId)); + ASSERT_FALSE(tracker->isDuplicate(msgId)); + tracker->addAcknowledge(msgId); + ASSERT_TRUE(tracker->isDuplicate(msgId)); } - ASSERT_EQ(tracker.getPendingIndividualAcks().size(), recvMsgId.size()); + ASSERT_EQ(tracker->getPendingIndividualAcks().size(), recvMsgId.size()); std::this_thread::sleep_for(std::chrono::seconds(2)); - ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0); + ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0); for (auto &msgId : recvMsgId) { - ASSERT_FALSE(tracker.isDuplicate(msgId)); + ASSERT_FALSE(tracker->isDuplicate(msgId)); } consumer.close(); @@ -3757,7 +3761,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) { Consumer consumer; ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer)); - auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer); + auto consumerImpl0 = PulsarFriend::getConsumerImplPtr(consumer); // Sending and receiving messages. for (auto count = 0; count < numMsg; ++count) { @@ -3773,32 +3777,33 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) { } std::sort(recvMsgId.begin(), recvMsgId.end()); - AckGroupingTrackerEnabledMock tracker0(clientImplPtr, consumerImpl0, consumerImpl0.getConsumerId(), - ackGroupingTimeMs, ackGroupingMaxSize); - ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), MessageId::earliest()); - ASSERT_FALSE(tracker0.requireCumulativeAck()); + auto tracker0 = std::make_shared( + clientImplPtr, consumerImpl0, consumerImpl0->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize); + tracker0->start(); + ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), MessageId::earliest()); + ASSERT_FALSE(tracker0->requireCumulativeAck()); auto targetMsgId = recvMsgId[numMsg / 2]; for (auto idx = 0; idx <= numMsg / 2; ++idx) { - ASSERT_FALSE(tracker0.isDuplicate(recvMsgId[idx])); + ASSERT_FALSE(tracker0->isDuplicate(recvMsgId[idx])); } - tracker0.addAcknowledgeCumulative(targetMsgId); + tracker0->addAcknowledgeCumulative(targetMsgId); for (auto idx = 0; idx <= numMsg / 2; ++idx) { - ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx])); + ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx])); } - ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), targetMsgId); - ASSERT_TRUE(tracker0.requireCumulativeAck()); + ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), targetMsgId); + ASSERT_TRUE(tracker0->requireCumulativeAck()); std::this_thread::sleep_for(std::chrono::seconds(2)); - ASSERT_FALSE(tracker0.requireCumulativeAck()); + ASSERT_FALSE(tracker0->requireCumulativeAck()); for (auto idx = 0; idx <= numMsg / 2; ++idx) { - ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx])); + ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx])); } consumer.close(); std::this_thread::sleep_for(std::chrono::seconds(1)); ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer)); - auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer); + auto consumerImpl1 = PulsarFriend::getConsumerImplPtr(consumer); std::set restMsgId(recvMsgId.begin() + numMsg / 2 + 1, recvMsgId.end()); for (auto count = numMsg / 2 + 1; count < numMsg; ++count) { Message msg; @@ -3808,10 +3813,11 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) { Message msg; auto ret = consumer.receive(msg, 1000); ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << msg.getDataAsString(); - AckGroupingTrackerEnabledMock tracker1(clientImplPtr, consumerImpl1, consumerImpl1.getConsumerId(), - ackGroupingTimeMs, ackGroupingMaxSize); - tracker1.addAcknowledgeCumulative(recvMsgId[numMsg - 1]); - tracker1.close(); + auto tracker1 = std::make_shared( + clientImplPtr, consumerImpl1, consumerImpl1->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize); + tracker1->start(); + tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1]); + tracker1->close(); consumer.close(); ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer)); diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index 26a1fe800b844..87b48d2a8b3fe 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -82,6 +82,10 @@ class PulsarFriend { return *consumerImpl; } + static std::shared_ptr getConsumerImplPtr(Consumer consumer) { + return std::static_pointer_cast(consumer.impl_); + } + static std::shared_ptr getClientImplPtr(Client client) { return client.impl_; } static void setNegativeAckEnabled(Consumer consumer, bool enabled) {