Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redelivery rates to Consumer and Subscription stats #95

Merged
merged 1 commit into from
Nov 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Consumer {
private final long consumerId;
private final String consumerName;
private final Rate msgOut;
private final Rate msgRedeliver;

// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
Expand Down Expand Up @@ -90,6 +91,7 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str
this.maxUnackedMessages = maxUnackedMessages;
this.cnx = cnx;
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
this.appId = appId;

stats = new ConsumerStats();
Expand Down Expand Up @@ -357,8 +359,10 @@ private boolean shouldBlockConsumerOnUnackMsgs() {

public void updateRates() {
msgOut.calculateRate();
msgRedeliver.calculateRate();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateRedeliver = msgRedeliver.getRate();
}

public ConsumerStats getStats() {
Expand Down Expand Up @@ -452,6 +456,11 @@ public void redeliverUnacknowledgedMessages() {
subscription.redeliverUnacknowledgedMessages(this);
flowConsumerBlockedPermits(this);
if (pendingAcks != null) {
int totalRedeliveryMessages = 0;
for (Integer batchSize : pendingAcks.values()) {
totalRedeliveryMessages += batchSize;
}
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
pendingAcks.clear();
}

Expand All @@ -474,6 +483,7 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
blockedConsumerOnUnackedMsgs = false;

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);

int numberOfBlockedPermits = Math.min(totalRedeliveryMessages,
permitsReceivedWhileConsumerBlocked.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ public PersistentSubscriptionStats getStats() {
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
subStats.unackedMessages += consumerStats.unackedMessages;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
subscriptions.forEach((subscriptionName, subscription) -> {
double subMsgRateOut = 0;
double subMsgThroughputOut = 0;
double subMsgRateRedeliver = 0;
long subUnackedMessages = 0;

// Start subscription name & consumers
try {
Expand All @@ -834,6 +836,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
ConsumerStats consumerStats = consumer.getStats();
subMsgRateOut += consumerStats.msgRateOut;
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
subUnackedMessages += consumerStats.unackedMessages;

// Populate consumer specific stats here
destStatsStream.startObject();
Expand All @@ -844,6 +848,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
destStatsStream.writePair("connectedSince", consumerStats.connectedSince);
destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
destStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver);
destStatsStream.endObject();
}

Expand All @@ -855,6 +860,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
destStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
destStatsStream.writePair("msgRateOut", subMsgRateOut);
destStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
destStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
destStatsStream.writePair("unackedMessages", subUnackedMessages);
destStatsStream.writePair("type", subscription.getTypeString());

// Close consumers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,96 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
assertEquals(subStats.msgBacklog, 0);
}

@Test
public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/successSharedTopic";
final String subName = "successSharedSub";

PersistentTopicStats stats;
PersistentSubscriptionStats subStats;

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);
Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);

rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();

// subscription stats
assertEquals(stats.subscriptions.keySet().size(), 1);
assertEquals(subStats.msgBacklog, 0);
assertEquals(subStats.consumers.size(), 1);

Producer producer = pulsarClient.createProducer(topicName);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();

// publisher stats
assertEquals(subStats.msgBacklog, 10);
assertEquals(stats.publishers.size(), 1);
assertTrue(stats.publishers.get(0).msgRateIn > 0.0);
assertTrue(stats.publishers.get(0).msgThroughputIn > 0.0);
assertTrue(stats.publishers.get(0).averageMsgSize > 0.0);

// aggregated publish stats
assertEquals(stats.msgRateIn, stats.publishers.get(0).msgRateIn);
assertEquals(stats.msgThroughputIn, stats.publishers.get(0).msgThroughputIn);
double diff = stats.averageMsgSize - stats.publishers.get(0).averageMsgSize;
assertTrue(Math.abs(diff) < 0.000001);

// consumer stats
assertTrue(subStats.consumers.get(0).msgRateOut > 0.0);
assertTrue(subStats.consumers.get(0).msgThroughputOut > 0.0);
assertEquals(subStats.msgRateRedeliver, 0.0);
assertEquals(subStats.consumers.get(0).unackedMessages, 10);

// aggregated consumer stats
assertEquals(subStats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(subStats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
assertEquals(subStats.unackedMessages, subStats.consumers.get(0).unackedMessages);

consumer.redeliverUnacknowledgedMessages();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
assertTrue(subStats.msgRateRedeliver > 0.0);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);

Message msg;
for (int i = 0; i < 10; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();

assertEquals(subStats.msgBacklog, 0);
}

@Test
public void testBrokerStatsMetrics() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/newTopic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public class ConsumerStats {
/** Total throughput delivered to the consumer. bytes/s */
public double msgThroughputOut;

/** Total rate of messages redelivered by this consumer. msg/s */
public double msgRateRedeliver;

/** Name of the consumer */
public String consumerName;

Expand All @@ -45,6 +48,7 @@ public ConsumerStats add(ConsumerStats stats) {
checkNotNull(stats);
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
this.msgRateRedeliver += stats.msgRateRedeliver;
this.availablePermits += stats.availablePermits;
this.unackedMessages += stats.unackedMessages;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@ public class PersistentSubscriptionStats {
/** Total throughput delivered on this subscription. bytes/s */
public double msgThroughputOut;

/** Total rate of messages redelivered on this subscription. msg/s */
public double msgRateRedeliver;

/** Number of messages in the subscription backlog */
public long msgBacklog;

/** Number of unacknowledged messages for the subscription */
public long unackedMessages;

/** whether this subscription is Exclusive or Shared or Failover */
public SubType type;

Expand All @@ -50,7 +56,9 @@ public PersistentSubscriptionStats() {
public void reset() {
msgRateOut = 0;
msgThroughputOut = 0;
msgRateRedeliver = 0;
msgBacklog = 0;
unackedMessages = 0;
msgRateExpired = 0;
consumers.clear();
}
Expand All @@ -61,7 +69,9 @@ public PersistentSubscriptionStats add(PersistentSubscriptionStats stats) {
checkNotNull(stats);
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
this.msgRateRedeliver += stats.msgRateRedeliver;
this.msgBacklog += stats.msgBacklog;
this.unackedMessages += stats.unackedMessages;
this.msgRateExpired += stats.msgRateExpired;
if (this.consumers.size() != stats.consumers.size()) {
for (int i = 0; i < stats.consumers.size(); i++) {
Expand Down