Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++ Client - Async call for getConsumerStat #255

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//
// Created by Jai Asher on 3/20/17.
//

#ifndef PULSAR_CPP_BROKERCONSUMERSTATS_H
#define PULSAR_CPP_BROKERCONSUMERSTATS_H

#include <boost/date_time/posix_time/ptime.hpp>
#include <string.h>
#include <iostream>
#include <pulsar/Result.h>
#include <boost/function.hpp>
#include <pulsar/ConsumerType.h>

namespace pulsar {
class BrokerConsumerStats {
public:
/** Returns true if the Message is Expired **/
virtual bool isValid() const;

/** Returns the rate of messages delivered to the consumer. msg/s */
virtual double getMsgRateOut() const;

/** Returns the throughput delivered to the consumer. bytes/s */
virtual double getMsgThroughputOut() const;

/** Returns the rate of messages redelivered by this consumer. msg/s */
virtual double getMsgRateRedeliver() const;

/** Returns the Name of the consumer */
virtual const std::string getConsumerName() const;

/** Returns the Number of available message permits for the consumer */
virtual uint64_t getAvailablePermits() const;

/** Returns the Number of unacknowledged messages for the consumer */
virtual uint64_t getUnackedMessages() const;

/** Returns true if the consumer is blocked due to unacked messages. */
virtual bool isBlockedConsumerOnUnackedMsgs() const;

/** Returns the Address of this consumer */
virtual const std::string getAddress() const;

/** Returns the Timestamp of connection */
virtual const std::string getConnectedSince() const;

/** Returns Whether this subscription is Exclusive or Shared or Failover */
virtual const ConsumerType getType() const;

/** Returns the rate of messages expired on this subscription. msg/s */
virtual double getMsgRateExpired() const;

/** Returns the Number of messages in the subscription backlog */
virtual uint64_t getMsgBacklog() const;

friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats &obj);
};
typedef boost::shared_ptr<BrokerConsumerStats> BrokerConsumerStatsPtr;
typedef boost::function<void(Result result, BrokerConsumerStats& brokerConsumerStats)> BrokerConsumerStatsCallback;

}
#endif //PULSAR_CPP_BROKERCONSUMERSTATS_H
89 changes: 13 additions & 76 deletions pulsar-client-cpp/include/pulsar/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <pulsar/Result.h>
#include <boost/date_time/posix_time/ptime.hpp>
#include <iostream>
#include <pulsar/ConsumerType.h>
#include <pulsar/BrokerConsumerStats.h>
#pragma GCC visibility push(default)

class PulsarFriend;
Expand All @@ -37,80 +39,6 @@ typedef boost::function<void(Result result)> ResultCallback;
/// Callback definition for MessageListener
typedef boost::function<void(Consumer consumer, const Message& msg)> MessageListener;

enum ConsumerType {
/**
* There can be only 1 consumer on the same topic with the same consumerName
*/
ConsumerExclusive,

/**
* Multiple consumers will be able to use the same consumerName and the messages
* will be dispatched according to a round-robin rotation between the connected consumers
*/
ConsumerShared,

/** Only one consumer is active on the subscription; Subscription can have N consumers
* connected one of which will get promoted to master if the current master becomes inactive
*/

ConsumerFailover
};

class BrokerConsumerStats {
private:
/*
* validTillInMs_ - Stats will be valid till this time.
*/
boost::posix_time::ptime validTill_;
public:
BrokerConsumerStats();
BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut,
double msgRateRedeliver, std::string consumerName, int availablePermits,
int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address,
std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog);

/** Returns true if the Message is Expired **/
bool isValid() const;

/** Total rate of messages delivered to the consumer. msg/s */
double msgRateOut_;

/** Total throughput delivered to the consumer. bytes/s */
double msgThroughputOut_;

/** Total rate of messages redelivered by this consumer. msg/s */
double msgRateRedeliver_;

/** Name of the consumer */
std::string consumerName_;

/** Number of available message permits for the consumer */
int availablePermits_;

/** Number of unacknowledged messages for the consumer */
int unackedMessages_;

/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */
bool blockedConsumerOnUnackedMsgs_;

/** Address of this consumer */
std::string address_;

/** Timestamp of connection */
std::string connectedSince_;

/// Whether this subscription is Exclusive or Shared or Failover
std::string type_;

/// Total rate of messages expired on this subscription. msg/s
double msgRateExpired_;

/// Number of messages in the subscription backlog
long msgBacklog_;

friend std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj);
};

/**
* Class specifying the configuration of a consumer.
*/
Expand Down Expand Up @@ -347,12 +275,21 @@ class Consumer {
* still valid.
*
* @param brokerConsumerStats - if the function returns ResultOk, this object will contain consumer stats
* @param partitionIndex - optional parameter which is to be populated only if the topic is partitioned.
*
* @note This is a blocking call with timeout of thirty seconds.
*/
Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1);
Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats);

/**
* Asynchronous call to gets Consumer Stats from broker.
* The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires
* then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are
* still valid.
*
* @param callback - callback function to get the brokerConsumerStats,
* if result is ResultOk then the brokerConsumerStats will be populated
*/
void getConsumerStatsAsync(BrokerConsumerStatsCallback callback);
private:
typedef boost::shared_ptr<ConsumerImplBase> ConsumerImplBasePtr;
friend class PulsarFriend;
Expand Down
28 changes: 28 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerType.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Created by Jai Asher on 3/20/17.
//

#ifndef PULSAR_CPP_CONSUMERTYPE_H
#define PULSAR_CPP_CONSUMERTYPE_H

namespace pulsar {
enum ConsumerType {
/**
* There can be only 1 consumer on the same topic with the same consumerName
*/
ConsumerExclusive,

/**
* Multiple consumers will be able to use the same consumerName and the messages
* will be dispatched according to a round-robin rotation between the connected consumers
*/
ConsumerShared,

/** Only one consumer is active on the subscription; Subscription can have N consumers
* connected one of which will get promoted to master if the current master becomes inactive
*/
ConsumerFailover
};
}

#endif //PULSAR_CPP_CONSUMERTYPE_H
77 changes: 77 additions & 0 deletions pulsar-client-cpp/lib/BrokerConsumerStats.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//
// Created by Jai Asher on 3/20/17.
//
#include <pulsar/BrokerConsumerStats.h>

namespace pulsar {
bool BrokerConsumerStats::isValid() const {
return false;
}

std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) {
os << "\nBrokerConsumerStats ["
<< "validTill_ = " << obj.isValid()
<< ", msgRateOut_ = " << obj.getMsgRateOut()
<< ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
<< ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver()
<< ", consumerName_ = " << obj.getConsumerName()
<< ", availablePermits_ = " << obj.getAvailablePermits()
<< ", unackedMessages_ = " << obj.getUnackedMessages()
<< ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs()
<< ", address_ = " << obj.getAddress()
<< ", connectedSince_ = " << obj.getConnectedSince()
<< ", type_ = " << obj.getType()
<< ", msgRateExpired_ = " << obj.getMsgRateExpired()
<< ", msgBacklog_ = " << obj.getMsgBacklog()
<< "]";
return os;
}

double BrokerConsumerStats::getMsgRateOut() const {
return 0;
}

double BrokerConsumerStats::getMsgThroughputOut() const {
return 0;
}

double BrokerConsumerStats::getMsgRateRedeliver() const {
return 0;
}

const std::string BrokerConsumerStats::getConsumerName() const {
return "";
}

uint64_t BrokerConsumerStats::getAvailablePermits() const {
return 0;
}

uint64_t BrokerConsumerStats::getUnackedMessages() const {
return 0;
}

bool BrokerConsumerStats::isBlockedConsumerOnUnackedMsgs() const {
return false;
}

const std::string BrokerConsumerStats::getAddress() const {
return "";
}

const std::string BrokerConsumerStats::getConnectedSince() const {
return "";
}

const ConsumerType BrokerConsumerStats::getType() const {
return ConsumerExclusive;
}

double BrokerConsumerStats::getMsgRateExpired() const {
return 0;
}

uint64_t BrokerConsumerStats::getMsgBacklog() const {
return 0;
}
}
Loading