diff --git a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h new file mode 100644 index 0000000000000..e24606059b2d5 --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h @@ -0,0 +1,63 @@ +// +// Created by Jai Asher on 3/20/17. +// + +#ifndef PULSAR_CPP_BROKERCONSUMERSTATS_H +#define PULSAR_CPP_BROKERCONSUMERSTATS_H + +#include +#include +#include +#include +#include +#include + +namespace pulsar { +class BrokerConsumerStats { + public: + /** Returns true if the Message is Expired **/ + virtual bool isValid() const; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const; + + /** Returns the Name of the consumer */ + virtual const std::string getConsumerName() const; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const; + + /** Returns the Address of this consumer */ + virtual const std::string getAddress() const; + + /** Returns the Timestamp of connection */ + virtual const std::string getConnectedSince() const; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType getType() const; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const; + + friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats &obj); +}; +typedef boost::shared_ptr BrokerConsumerStatsPtr; +typedef boost::function BrokerConsumerStatsCallback; + +} +#endif //PULSAR_CPP_BROKERCONSUMERSTATS_H diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index 598c6b1319a63..d24e1c7aded5d 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -23,6 +23,8 @@ #include #include #include +#include +#include #pragma GCC visibility push(default) class PulsarFriend; @@ -37,80 +39,6 @@ typedef boost::function ResultCallback; /// Callback definition for MessageListener typedef boost::function MessageListener; -enum ConsumerType { - /** - * There can be only 1 consumer on the same topic with the same consumerName - */ - ConsumerExclusive, - - /** - * Multiple consumers will be able to use the same consumerName and the messages - * will be dispatched according to a round-robin rotation between the connected consumers - */ - ConsumerShared, - - /** Only one consumer is active on the subscription; Subscription can have N consumers - * connected one of which will get promoted to master if the current master becomes inactive - */ - - ConsumerFailover -}; - -class BrokerConsumerStats { - private: - /* - * validTillInMs_ - Stats will be valid till this time. - */ - boost::posix_time::ptime validTill_; - public: - BrokerConsumerStats(); - BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut, - double msgRateRedeliver, std::string consumerName, int availablePermits, - int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, - std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog); - - /** Returns true if the Message is Expired **/ - bool isValid() const; - - /** Total rate of messages delivered to the consumer. msg/s */ - double msgRateOut_; - - /** Total throughput delivered to the consumer. bytes/s */ - double msgThroughputOut_; - - /** Total rate of messages redelivered by this consumer. msg/s */ - double msgRateRedeliver_; - - /** Name of the consumer */ - std::string consumerName_; - - /** Number of available message permits for the consumer */ - int availablePermits_; - - /** Number of unacknowledged messages for the consumer */ - int unackedMessages_; - - /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */ - bool blockedConsumerOnUnackedMsgs_; - - /** Address of this consumer */ - std::string address_; - - /** Timestamp of connection */ - std::string connectedSince_; - - /// Whether this subscription is Exclusive or Shared or Failover - std::string type_; - - /// Total rate of messages expired on this subscription. msg/s - double msgRateExpired_; - - /// Number of messages in the subscription backlog - long msgBacklog_; - - friend std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj); -}; - /** * Class specifying the configuration of a consumer. */ @@ -347,12 +275,21 @@ class Consumer { * still valid. * * @param brokerConsumerStats - if the function returns ResultOk, this object will contain consumer stats - * @param partitionIndex - optional parameter which is to be populated only if the topic is partitioned. * * @note This is a blocking call with timeout of thirty seconds. */ - Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1); + Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats); + /** + * Asynchronous call to gets Consumer Stats from broker. + * The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires + * then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are + * still valid. + * + * @param callback - callback function to get the brokerConsumerStats, + * if result is ResultOk then the brokerConsumerStats will be populated + */ + void getConsumerStatsAsync(BrokerConsumerStatsCallback callback); private: typedef boost::shared_ptr ConsumerImplBasePtr; friend class PulsarFriend; diff --git a/pulsar-client-cpp/include/pulsar/ConsumerType.h b/pulsar-client-cpp/include/pulsar/ConsumerType.h new file mode 100644 index 0000000000000..85f2bb3e1a90e --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/ConsumerType.h @@ -0,0 +1,28 @@ +// +// Created by Jai Asher on 3/20/17. +// + +#ifndef PULSAR_CPP_CONSUMERTYPE_H +#define PULSAR_CPP_CONSUMERTYPE_H + +namespace pulsar { + enum ConsumerType { + /** + * There can be only 1 consumer on the same topic with the same consumerName + */ + ConsumerExclusive, + + /** + * Multiple consumers will be able to use the same consumerName and the messages + * will be dispatched according to a round-robin rotation between the connected consumers + */ + ConsumerShared, + + /** Only one consumer is active on the subscription; Subscription can have N consumers + * connected one of which will get promoted to master if the current master becomes inactive + */ + ConsumerFailover + }; +} + +#endif //PULSAR_CPP_CONSUMERTYPE_H diff --git a/pulsar-client-cpp/lib/BrokerConsumerStats.cc b/pulsar-client-cpp/lib/BrokerConsumerStats.cc new file mode 100644 index 0000000000000..249bcb8f4feb8 --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStats.cc @@ -0,0 +1,77 @@ +// +// Created by Jai Asher on 3/20/17. +// +#include + +namespace pulsar { + bool BrokerConsumerStats::isValid() const { + return false; + } + + std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) { + os << "\nBrokerConsumerStats [" + << "validTill_ = " << obj.isValid() + << ", msgRateOut_ = " << obj.getMsgRateOut() + << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() + << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() + << ", consumerName_ = " << obj.getConsumerName() + << ", availablePermits_ = " << obj.getAvailablePermits() + << ", unackedMessages_ = " << obj.getUnackedMessages() + << ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs() + << ", address_ = " << obj.getAddress() + << ", connectedSince_ = " << obj.getConnectedSince() + << ", type_ = " << obj.getType() + << ", msgRateExpired_ = " << obj.getMsgRateExpired() + << ", msgBacklog_ = " << obj.getMsgBacklog() + << "]"; + return os; + } + + double BrokerConsumerStats::getMsgRateOut() const { + return 0; + } + + double BrokerConsumerStats::getMsgThroughputOut() const { + return 0; + } + + double BrokerConsumerStats::getMsgRateRedeliver() const { + return 0; + } + + const std::string BrokerConsumerStats::getConsumerName() const { + return ""; + } + + uint64_t BrokerConsumerStats::getAvailablePermits() const { + return 0; + } + + uint64_t BrokerConsumerStats::getUnackedMessages() const { + return 0; + } + + bool BrokerConsumerStats::isBlockedConsumerOnUnackedMsgs() const { + return false; + } + + const std::string BrokerConsumerStats::getAddress() const { + return ""; + } + + const std::string BrokerConsumerStats::getConnectedSince() const { + return ""; + } + + const ConsumerType BrokerConsumerStats::getType() const { + return ConsumerExclusive; + } + + double BrokerConsumerStats::getMsgRateExpired() const { + return 0; + } + + uint64_t BrokerConsumerStats::getMsgBacklog() const { + return 0; + } +} \ No newline at end of file diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc new file mode 100644 index 0000000000000..fe6e204132c34 --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc @@ -0,0 +1,114 @@ +// +// Created by Jai Asher on 3/20/17. +// +#include +#include + +namespace pulsar { + BrokerConsumerStatsImpl::BrokerConsumerStatsImpl() : validTill_(boost::posix_time::microsec_clock::universal_time()) {}; + + BrokerConsumerStatsImpl::BrokerConsumerStatsImpl(double msgRateOut, double msgThroughputOut, + double msgRateRedeliver, std::string consumerName, + uint64_t availablePermits, + uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, + std::string address, + std::string connectedSince, const std::string& type, + double msgRateExpired, uint64_t msgBacklog) : + msgRateOut_(msgRateOut), + msgThroughputOut_(msgThroughputOut), + msgRateRedeliver_(msgRateRedeliver), + consumerName_(consumerName), + availablePermits_(availablePermits), + unackedMessages_(unackedMessages), + blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs), + address_(address), + connectedSince_(connectedSince), + type_(convertStringToConsumerType(type)), + msgRateExpired_(msgRateExpired), + msgBacklog_(msgBacklog) {} + + bool BrokerConsumerStatsImpl::isValid() const { + return boost::posix_time::microsec_clock::universal_time() <= validTill_; + } + + std::ostream& operator<<(std::ostream &os, const BrokerConsumerStatsImpl& obj) { + os << "\nBrokerConsumerStatsImpl [" + << "validTill_ = " << obj.isValid() + << ", msgRateOut_ = " << obj.getMsgRateOut() + << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() + << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() + << ", consumerName_ = " << obj.getConsumerName() + << ", availablePermits_ = " << obj.getAvailablePermits() + << ", unackedMessages_ = " << obj.getUnackedMessages() + << ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs() + << ", address_ = " << obj.getAddress() + << ", connectedSince_ = " << obj.getConnectedSince() + << ", type_ = " << obj.getType() + << ", msgRateExpired_ = " << obj.getMsgRateExpired() + << ", msgBacklog_ = " << obj.getMsgBacklog() + << "]"; + return os; + } + + double BrokerConsumerStatsImpl::getMsgRateOut() const { + return msgRateOut_; + } + + double BrokerConsumerStatsImpl::getMsgThroughputOut() const { + return msgThroughputOut_; + } + + double BrokerConsumerStatsImpl::getMsgRateRedeliver() const { + return msgRateRedeliver_; + } + + const std::string BrokerConsumerStatsImpl::getConsumerName() const { + return consumerName_; + } + + uint64_t BrokerConsumerStatsImpl::getAvailablePermits() const { + return availablePermits_; + } + + uint64_t BrokerConsumerStatsImpl::getUnackedMessages() const { + return unackedMessages_; + } + + bool BrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs() const { + return blockedConsumerOnUnackedMsgs_; + } + + const std::string BrokerConsumerStatsImpl::getAddress() const { + return address_; + } + + const std::string BrokerConsumerStatsImpl::getConnectedSince() const { + return connectedSince_; + } + + const ConsumerType BrokerConsumerStatsImpl::getType() const { + return type_; + } + + double BrokerConsumerStatsImpl::getMsgRateExpired() const { + return msgRateExpired_; + } + + uint64_t BrokerConsumerStatsImpl::getMsgBacklog() const { + return msgBacklog_; + } + + void BrokerConsumerStatsImpl::setCacheTime(uint64_t cacehTimeInMs) { + validTill_ = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(cacehTimeInMs); + } + + ConsumerType BrokerConsumerStatsImpl::convertStringToConsumerType(const std::string& str) { + if (str == "ConsumerFailover" || str == "Failover") { + return ConsumerFailover; + } else if (str == "ConsumerShared" || str == "Shared") { + return ConsumerShared; + } else { + return ConsumerExclusive; + } + } +} \ No newline at end of file diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h new file mode 100644 index 0000000000000..67fb47b36a70d --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h @@ -0,0 +1,113 @@ +// +// Created by Jai Asher on 3/20/17. +// + +#ifndef PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H +#define PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H + +#include +#include +#include +#include +#include +#include + +namespace pulsar { +class BrokerConsumerStatsImpl : public BrokerConsumerStats { + private: + /** validTill_ - Stats will be valid till this time.*/ + boost::posix_time::ptime validTill_; + + /** Total rate of messages delivered to the consumer. msg/s */ + double msgRateOut_; + + /** Total throughput delivered to the consumer. bytes/s */ + double msgThroughputOut_; + + /** Total rate of messages redelivered by this consumer. msg/s */ + double msgRateRedeliver_; + + /** Name of the consumer */ + std::string consumerName_; + + /** Number of available message permits for the consumer */ + uint64_t availablePermits_; + + /** Number of unacknowledged messages for the consumer */ + uint64_t unackedMessages_; + + /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */ + bool blockedConsumerOnUnackedMsgs_; + + /** Address of this consumer */ + std::string address_; + + /** Timestamp of connection */ + std::string connectedSince_; + + /** Whether this subscription is Exclusive or Shared or Failover */ + ConsumerType type_; + + /** Total rate of messages expired on this subscription. msg/s */ + double msgRateExpired_; + + /** Number of messages in the subscription backlog */ + uint64_t msgBacklog_; + +public: + + BrokerConsumerStatsImpl(); + + BrokerConsumerStatsImpl(double msgRateOut, double msgThroughputOut, double msgRateRedeliver, + std::string consumerName, uint64_t availablePermits, + uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, + std::string address, std::string connectedSince, const std::string& type, + double msgRateExpired, uint64_t msgBacklog); + + /** Returns true if the Message is Expired **/ + virtual bool isValid() const; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const; + + /** Returns the Name of the consumer */ + virtual const std::string getConsumerName() const; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const; + + /** Returns the Address of this consumer */ + virtual const std::string getAddress() const; + + /** Returns the Timestamp of connection */ + virtual const std::string getConnectedSince() const; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType getType() const; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const; + + void setCacheTime(uint64_t cacehTimeInMs); + + friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStatsImpl &obj); + + static ConsumerType convertStringToConsumerType(const std::string& str); +}; +} +#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index bcb35be7a64bd..e3c668e0260e6 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -124,7 +124,6 @@ havePendingPingRequest_(false), keepAliveTimer_(), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), consumerStatsRequestTimer_(executor_->createDeadlineTimer()), -consumerStatsTTLMs_(30 * 1000), numOfPendingLookupRequest_(0), isTlsAllowInsecureConnection_(false) { if (clientConfiguration.isUseTls()) { @@ -205,7 +204,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte } void ClientConnection::startConsumerStatsTimer(std::vector consumerStatsRequests) { - std::vector > consumerStatsPromises; + std::vector > consumerStatsPromises; Lock lock(mutex_); for (int i = 0; i < consumerStatsRequests.size(); i++) { @@ -715,7 +714,7 @@ void ClientConnection::handleIncomingCommand() { PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find( consumerStatsResponse.request_id()); if (it != pendingConsumerStatsMap_.end()) { - Promise consumerStatsPromise = it->second; + Promise consumerStatsPromise = it->second; pendingConsumerStatsMap_.erase(it); lock.unlock(); @@ -730,10 +729,7 @@ void ClientConnection::handleIncomingCommand() { cnxString_ << "ConsumerStatsResponse command - Received consumer stats response from server. req_id: " << consumerStatsResponse.request_id() << " Stats: "); - boost::posix_time::ptime validTill = now() + milliseconds(consumerStatsTTLMs_); - BrokerConsumerStats brokerStats = - BrokerConsumerStats(validTill, - consumerStatsResponse.msgrateout(), + BrokerConsumerStatsImpl brokerStats(consumerStatsResponse.msgrateout(), consumerStatsResponse.msgthroughputout(), consumerStatsResponse.msgrateredeliver(), consumerStatsResponse.consumername(), @@ -923,11 +919,11 @@ void ClientConnection::handleIncomingCommand() { } } -Future +Future ClientConnection::newConsumerStats(const std::string topicName, const std::string subscriptionName, uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); - Promise promise; + Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString_ << " Client is not connected to the broker"); diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 0617b46b58aa3..981e70ac09fbb 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -40,6 +40,7 @@ #include "UtilAllocator.h" #include #include +#include using namespace pulsar; @@ -132,11 +133,9 @@ class ClientConnection : public boost::enable_shared_from_this Commands::ChecksumType getChecksumType() const; - Future newConsumerStats(const std::string topicName, const std::string subscriptionName, + Future newConsumerStats(const std::string topicName, const std::string subscriptionName, uint64_t consumerId, uint64_t requestId) ; private: - long consumerStatsTTLMs_ ; - struct PendingRequestData { Promise promise; DeadlineTimerPtr timer; @@ -254,7 +253,7 @@ class ClientConnection : public boost::enable_shared_from_this typedef std::map ConsumersMap; ConsumersMap consumers_; - typedef std::map > PendingConsumerStatsMap; + typedef std::map > PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc index f05d0035853e2..36b135ab5b555 100644 --- a/pulsar-client-cpp/lib/Consumer.cc +++ b/pulsar-client-cpp/lib/Consumer.cc @@ -18,55 +18,13 @@ #include #include "ConsumerImpl.h" #include "Utils.h" +#include +#include namespace pulsar { const std::string EMPTY_STRING; -BrokerConsumerStats::BrokerConsumerStats():validTill_(now()) {}; - -BrokerConsumerStats::BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut, - double msgRateRedeliver, std::string consumerName, int availablePermits, - int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, - std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog): - validTill_(validTill), - msgRateOut_(msgRateOut), - msgThroughputOut_(msgThroughputOut), - msgRateRedeliver_(msgRateRedeliver), - consumerName_(consumerName), - availablePermits_(availablePermits), - unackedMessages_(unackedMessages), - blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs), - address_(address), - connectedSince_(connectedSince), - type_(type), - msgRateExpired_(msgRateExpired), - msgBacklog_(msgBacklog) -{} - -bool BrokerConsumerStats::isValid() const { - return now() <= validTill_; -} - -std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj) { - os << "\nBrokerConsumerStats [" - << "validTill_ = " << obj.validTill_ - << ", msgRateOut_ = " << obj.msgRateOut_ - << ", msgThroughputOut_ = " << obj.msgThroughputOut_ - << ", msgRateRedeliver_ = " << obj.msgRateRedeliver_ - << ", consumerName_ = " << obj.consumerName_ - << ", availablePermits_ = " << obj.availablePermits_ - << ", unackedMessages_ = " << obj.unackedMessages_ - << ", blockedConsumerOnUnackedMsgs_ = " << obj.blockedConsumerOnUnackedMsgs_ - << ", address_ = " << obj.address_ - << ", connectedSince_ = " << obj.connectedSince_ - << ", type_ = " << obj.type_ - << ", msgRateExpired_ = " << obj.msgRateExpired_ - << ", msgBacklog_ = " << obj.msgBacklog_ - << "]"; - return os; -} - struct ConsumerConfiguration::Impl { long unAckedMessagesTimeoutMs; ConsumerType consumerType; @@ -304,10 +262,38 @@ void Consumer::redeliverUnacknowledgedMessages() { } } -Result Consumer::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) { +static void listener(Result result, BrokerConsumerStats& brokerConsumerStats, + BrokerConsumerStats* stats, Result &res, LatchPtr latchPtr) { + std::cout<<"JAI 1: "<<*stats; + std::cout<<"JAI 2: "<countdown(); +} + +Result Consumer::getConsumerStats(BrokerConsumerStats& brokerConsumerStats) { if (!impl_) { return ResultConsumerNotInitialized; } - return impl_->getConsumerStats(BrokerConsumerStats, partitionIndex); + // Can't use promises or future here since it leads to data being copied which leads to object splicing + Result res; + LatchPtr latchPtr = boost::make_shared(1); + getConsumerStatsAsync(boost::bind(listener, _1, _2, &brokerConsumerStats, boost::ref(res), latchPtr)); + latchPtr->wait(); + std::cout<<"JAI 2: "<getConsumerStatsAsync(callback); } } diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 3f9b8541d8eca..f2b367c670c20 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -687,22 +687,21 @@ int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } -Result ConsumerImpl::getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex) { - if (partitionIndex != -1) { - LOG_WARN(getName() << "Ignoring the partitionIndex since the topic is not partitioned") - } - - if (!isOpen()) { +void ConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); + if (state_ != Ready) { LOG_ERROR(getName() << "Client connection is not open, please try again later.") - return ResultConsumerNotInitialized; + lock.unlock(); + return callback(ResultConsumerNotInitialized, brokerConsumerStats_); } if (brokerConsumerStats_.isValid()) { LOG_DEBUG(getName() << "Serving data from cache"); - brokerConsumerStats = brokerConsumerStats_; - return ResultOk; + BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_; + lock.unlock(); + return callback(ResultOk, brokerConsumerStats); } - + lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -712,19 +711,31 @@ Result ConsumerImpl::getConsumerStats(BrokerConsumerStats& brokerConsumerStats, LOG_DEBUG(getName() << " Sending ConsumerStats Command for Consumer - " << getConsumerId() << ", requestId - "<newConsumerStats(topic_, subscription_, consumerId_, requestId).get(consumerStats); - if (res == ResultOk) { - brokerConsumerStats = brokerConsumerStats_ = consumerStats; - } - return res; + cnx->newConsumerStats(topic_, subscription_, consumerId_, requestId).addListener( + boost::bind(&ConsumerImpl::brokerConsumerStatsListener, shared_from_this(), _1, _2, callback)); + return; } else { LOG_ERROR(getName() << " Operation not supported since server protobuf version " << cnx->getServerProtocolVersion() << " is older than proto::v7"); - return ResultOperationNotSupported; + return callback(ResultUnsupportedVersionError, brokerConsumerStats_); } } LOG_ERROR(getName() << " Client Connection not ready for Consumer"); - return ResultNotConnected; + return callback(ResultNotConnected, brokerConsumerStats_); +} + +void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsImpl brokerConsumerStats + , BrokerConsumerStatsCallback callback) { + + if (res == ResultOk) { + Lock lock(mutex_); + LOG_ERROR("JAI: RECEIVED "< #include "BatchAcknowledgementTracker.h" #include +#include using namespace pulsar; @@ -91,8 +92,8 @@ enum ConsumerTopicType { virtual Result pauseMessageListener(); virtual Result resumeMessageListener(); virtual void redeliverUnacknowledgedMessages(); - virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1); -protected: + virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback); + protected: void connectionOpened(const ClientConnectionPtr& cnx); void connectionFailed(Result result); void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result); @@ -114,6 +115,7 @@ enum ConsumerTopicType { void increaseAvailablePermits(const ClientConnectionPtr& currentCnx); void drainIncomingMessageQueue(size_t count); unsigned int receiveIndividualMessagesFromBatch(Message &batchedMessage); + void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback); boost::mutex mutexForReceiveWithZeroQueueSize; const ConsumerConfiguration config_; @@ -134,7 +136,7 @@ enum ConsumerTopicType { CompressionCodecProvider compressionCodecProvider_; UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_; BatchAcknowledgementTracker batchAcknowledgementTracker_; - BrokerConsumerStats brokerConsumerStats_; + BrokerConsumerStatsImpl brokerConsumerStats_; }; } /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h index 5d48317f8fdc4..d675d6896bda0 100644 --- a/pulsar-client-cpp/lib/ConsumerImplBase.h +++ b/pulsar-client-cpp/lib/ConsumerImplBase.h @@ -47,7 +47,7 @@ class ConsumerImplBase { virtual void redeliverUnacknowledgedMessages() = 0; virtual const std::string& getName() const = 0; virtual int getNumOfPrefetchedMessages() const = 0; - virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1) = 0; + virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0; }; } #endif //PULSAR_CONSUMER_IMPL_BASE_HEADER diff --git a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc new file mode 100644 index 0000000000000..37d45b0d03fa7 --- /dev/null +++ b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc @@ -0,0 +1,154 @@ +// +// Created by Jai Asher on 3/20/17. +// +#include +#include +#include +#include + +namespace pulsar { + + const std::string PartitionedBrokerConsumerStatsImpl::DELIMITER = ";"; + + PartitionedBrokerConsumerStatsImpl::PartitionedBrokerConsumerStatsImpl(size_t size) { + statsList_.resize(size); + } + + bool PartitionedBrokerConsumerStatsImpl::isValid() const { + bool isValid = true; + for (int i = 0; i +#include +#include +#include +#include +#include +#include +#include +#include + +namespace pulsar { +class PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStats { + private: + std::vector statsList_; + static const std::string DELIMITER; + public: + + PartitionedBrokerConsumerStatsImpl(size_t size); + + /** Returns true if the Message is Expired **/ + virtual bool isValid() const; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const; + + /** Returns the Name of the consumer */ + virtual const std::string getConsumerName() const; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const; + + /** Returns the Address of this consumer */ + virtual const std::string getAddress() const; + + /** Returns the Timestamp of connection */ + virtual const std::string getConnectedSince() const; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType getType() const; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const; + + /** Returns the BrokerConsumerStatsImpl at of ith partition */ + BrokerConsumerStatsImpl getBrokerConsumerStats(int index); + + void add(BrokerConsumerStatsImpl stats, int index); + + void clear(); + + friend std::ostream& operator<<(std::ostream &os, const PartitionedBrokerConsumerStatsImpl &obj); +}; +typedef boost::shared_ptr PartitionedBrokerConsumerStatsPtr; + +} +#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 5de6993902c5c..917b765672e9e 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -15,11 +15,6 @@ */ #include "PartitionedConsumerImpl.h" -#include "LogUtils.h" -#include -#include "pulsar/Result.h" -#include "MessageImpl.h" -#include "Utils.h" DECLARE_LOG_OBJECT() @@ -380,12 +375,39 @@ namespace pulsar { return messages_.size(); } - Result PartitionedConsumerImpl::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) { - if (partitionIndex >= numPartitions_ && partitionIndex < 0 && consumers_.size() <= partitionIndex) - { - LOG_ERROR(getName() << " PartitionIndex must be positive and less than number of partitiones") - return ResultInvalidConfiguration; + void PartitionedConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); + PartitionedBrokerConsumerStatsPtr statsPtr = boost::make_shared(numPartitions_); + if (numPartitions_ != consumers_.size()) { + lock.unlock(); + return callback(ResultConsumerNotInitialized, *statsPtr); + } + LatchPtr latchPtr = boost::make_shared(numPartitions_); + ConsumerList consumerList = consumers_; + lock.unlock(); + + for (int i = 0; igetConsumerStatsAsync(boost::bind(&PartitionedConsumerImpl::handleGetConsumerStats, + shared_from_this(), _1, _2, latchPtr, + statsPtr, i, callback)); + } + + } + + void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats& brokerConsumerStats, + LatchPtr latchPtr, PartitionedBrokerConsumerStatsPtr statsPtr, + size_t index, BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); + if (res == ResultOk) { + latchPtr->countdown(); + statsPtr->add((BrokerConsumerStatsImpl&)brokerConsumerStats, index); + } else { + lock.unlock(); + return callback(res, *statsPtr); + } + if (latchPtr->getCount() == 0) { + lock.unlock(); + callback(ResultOk, *statsPtr); } - return consumers_[partitionIndex]->getConsumerStats(BrokerConsumerStats); } } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 3338adda125a2..3f7c0fb77b57f 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -25,6 +25,9 @@ #include "boost/enable_shared_from_this.hpp" #include "ConsumerImplBase.h" #include "lib/UnAckedMessageTrackerDisabled.h" +#include +#include + namespace pulsar { class PartitionedConsumerImpl; class PartitionedConsumerImpl: public ConsumerImplBase, public boost::enable_shared_from_this { @@ -60,7 +63,10 @@ namespace pulsar { virtual void redeliverUnacknowledgedMessages(); virtual const std::string& getName() const; virtual int getNumOfPrefetchedMessages() const ; - virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex); + virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback); + void handleGetConsumerStats(Result , BrokerConsumerStats&, LatchPtr, + PartitionedBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); + private: const ClientImplPtr client_; const std::string subscriptionName_; diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 3225dee3daf29..02868cf8786ee 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -281,7 +281,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { // reserving a spot and going forward - not blocking if (!conf_.getBlockIfQueueFull() && !pendingMessagesQueue_.tryReserve(1)) { LOG_DEBUG(getName() << " - Producer Queue is full"); - // If queue is full sending the batch immediately, no point waiting till batchMessageimeout + // If queue is full sending the batch immediately, no point waiting till batchMessagetimeout if (batchMessageContainer) { LOG_DEBUG(getName() << " - sending batch message immediately"); batchMessageContainer->sendMessage(); diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc index b1bfecba04fd0..917eaa8df2ead 100644 --- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc +++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc @@ -18,21 +18,18 @@ #include #include #include -#include -#include "DestinationName.h" +#include #include -#include #include "boost/date_time/posix_time/posix_time.hpp" #include "CustomRoutingPolicy.h" #include #include "lib/Future.h" #include "lib/Utils.h" -#include -#include "LogUtils.h" #include "PulsarFriend.h" -#include #include "ConsumerTest.h" #include "HttpHelper.h" +#include +#include DECLARE_LOG_OBJECT(); using namespace pulsar; @@ -40,7 +37,20 @@ using namespace pulsar; static std::string lookupUrl = "pulsar://localhost:8885"; static std::string adminUrl = "http://localhost:8765/"; +void callbackFunction(Result result, BrokerConsumerStats& brokerConsumerStats, long expectedBacklog, Latch& latch, int index) { + PartitionedBrokerConsumerStatsImpl stats = (PartitionedBrokerConsumerStatsImpl&) brokerConsumerStats; + LOG_DEBUG(stats); + ASSERT_EQ(expectedBacklog, stats.getBrokerConsumerStats(index).getMsgBacklog()); + latch.countdown(); +} +void simpleCallbackFunction(Result result, BrokerConsumerStats& brokerConsumerStats, Result expectedResult, + uint64_t expectedBacklog, ConsumerType expectedConsumerType) { + LOG_DEBUG(brokerConsumerStats); + ASSERT_EQ(result, expectedResult); + ASSERT_EQ(brokerConsumerStats.getMsgBacklog(), expectedBacklog); + ASSERT_EQ(brokerConsumerStats.getType(), expectedConsumerType); +} TEST(ConsumerStatsTest, testBacklogInfo) { long epochTime=time(NULL); std::string testName="testBacklogInfo-" + boost::lexical_cast(epochTime); @@ -74,12 +84,9 @@ TEST(ConsumerStatsTest, testBacklogInfo) { producer.send(msg); } + LOG_DEBUG("Calling consumer.getConsumerStats"); BrokerConsumerStats consumerStats; - Result res = consumer.getConsumerStats(consumerStats); - ASSERT_EQ(res, ResultOk); - - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + consumer.getConsumerStatsAsync(boost::bind(simpleCallbackFunction, _1, _2, ResultOk, numOfMessages, ConsumerExclusive)); for (int i = numOfMessages; i<(numOfMessages*2); i++) { std::string messageContent = prefix + boost::lexical_cast(i); @@ -88,11 +95,12 @@ TEST(ConsumerStatsTest, testBacklogInfo) { } usleep(35 * 1000 * 1000); - res = consumer.getConsumerStats(consumerStats); + Result res = consumer.getConsumerStats(consumerStats); ASSERT_EQ(res, ResultOk); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, 2 * numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), 2 * numOfMessages); + ASSERT_EQ(consumerStats.getType(), ConsumerExclusive); consumer.unsubscribe(); } @@ -135,7 +143,7 @@ TEST(ConsumerStatsTest, testFailure) { ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); consumer.unsubscribe(); ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); @@ -180,7 +188,7 @@ TEST(ConsumerStatsTest, testCachingMechanism) { ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); for (int i = numOfMessages; i<(numOfMessages*2); i++) { std::string messageContent = prefix + boost::lexical_cast(i); @@ -192,7 +200,7 @@ TEST(ConsumerStatsTest, testCachingMechanism) { ASSERT_TRUE(consumerStats.isValid()); ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); LOG_DEBUG("Still Expecting cached results"); usleep(10 * 1000 * 1000); @@ -200,7 +208,7 @@ TEST(ConsumerStatsTest, testCachingMechanism) { ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); LOG_DEBUG("Now expecting new results"); usleep(25 * 1000 * 1000); @@ -208,8 +216,84 @@ TEST(ConsumerStatsTest, testCachingMechanism) { ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages * 2); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages * 2); consumer.unsubscribe(); ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); +} + + +TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) { + long epochTime=time(NULL); + std::string testName="testAsyncCallOnPartitionedTopic-" + boost::lexical_cast(epochTime); + Client client(lookupUrl); + std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string subName = "subscription-name"; + + // call admin api to create partitioned topics + std::string url = adminUrl + "admin/persistent/property/cluster/namespace/" + testName + "/partitions"; + int res = makePutRequest(url, "7"); + + LOG_INFO("res = "< consumerPromise; + BrokerConsumerStats consumerStats; + client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + Future consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // handling dangling subscriptions + consumer.unsubscribe(); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + client.subscribe(topicName, subName, consumer); + + // Producing messages + Producer producer; + int numOfMessages = 7 * 5; // 5 message per partition + Promise producerPromise; + ProducerConfiguration config; + config.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + client.createProducerAsync(topicName, config, WaitForCallbackValue(producerPromise)); + Future producerFuture = producerPromise.getFuture(); + result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + std::string prefix = testName + "-"; + for (int i = 0; i(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + // Expecting return from 4 callbacks + Latch latch(4); + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 5, latch, 0)); + + // Now we have 10 messages per partition + for (int i = numOfMessages; i<(numOfMessages*2); i++) { + std::string messageContent = prefix + boost::lexical_cast(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + // Expecting cached result + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 5, latch, 0)); + + // Expecting fresh results since the partition index is different + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 10, latch, 2)); + + Message msg; + while (consumer.receive(msg)) { + // Do nothing + } + + // Expecting the backlog to be the same since we didn't acknowledge the messages + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 10, latch, 3)); + + // Wait for ten seconds only + ASSERT_TRUE(latch.wait(milliseconds(10 * 1000))); } \ No newline at end of file