Skip to content

Commit

Permalink
[fix][broker] Correctly set byte and message out totals per subscription
Browse files Browse the repository at this point in the history
Fixes apache#15819

The existing code calculates the pulsar_out_bytes_total and pulsar_out_messages_total
per subscription metrics by adding the values from the currently connected consumers.
This produces incorrect values as soon as one or more of the consumers disconnects
from the subscription.

This changes these two metrics to directly use the subscription stats for these
values, and match the output of `pulsar-admin topic stats`.

Signed-off-by: Paul Gier <paul.gier@datastax.com>
  • Loading branch information
pgier committed Nov 14, 2022
1 parent fdf86d3 commit f61bd1c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl
AggregatedSubscriptionStats subsStats) {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;
subsStats.bytesOutCounter = subscriptionStats.bytesOutCounter;
subsStats.msgOutCounter = subscriptionStats.msgOutCounter;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
Expand All @@ -133,8 +135,6 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.messageAckRate += cStats.messageAckRate;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,88 @@ public void testPerTopicStats() throws Exception {
c2.close();
}

/**
* Test that the total message and byte counts for a topic are not reset when a consumer disconnects.
*
* @throws Exception
*/
@Test
public void testPerTopicStatsReconnect() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();

Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

final int messages = 5;
final int pulsarMessageOverhead = 31; // Number of extra bytes pulsar adds to each message
final int messageSizeBytes = "my-message-n".getBytes().length + pulsarMessageOverhead;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}

for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
}

c1.close();

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}

Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

for (int i = 0; i < messages; i++) {
c2.acknowledge(c2.receive());
}

p1.close();
c2.close();

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);

metrics.entries().forEach(e -> {
System.out.println(e.getKey() + ": " + e.getValue());
});

List<Metric> cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");

cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");
}

@Test
public void testPerTopicExpiredStat() throws Exception {
String ns = "prop/ns-abc1";
Expand Down

0 comments on commit f61bd1c

Please sign in to comment.