From f61bd1cf552185216aa5ff77d06ecfba4d5b05f0 Mon Sep 17 00:00:00 2001 From: Paul Gier Date: Sun, 13 Nov 2022 12:16:02 -0600 Subject: [PATCH] [fix][broker] Correctly set byte and message out totals per subscription Fixes #15819 The existing code calculates the pulsar_out_bytes_total and pulsar_out_messages_total per subscription metrics by adding the values from the currently connected consumers. This produces incorrect values as soon as one or more of the consumers disconnects from the subscription. This changes these two metrics to directly use the subscription stats for these values, and match the output of `pulsar-admin topic stats`. Signed-off-by: Paul Gier --- .../prometheus/NamespaceStatsAggregator.java | 4 +- .../broker/stats/PrometheusMetricsTest.java | 82 +++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 4ba414623ab787..78215d194e4439 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -115,6 +115,8 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl AggregatedSubscriptionStats subsStats) { stats.subscriptionsCount++; stats.msgBacklog += subscriptionStats.msgBacklog; + subsStats.bytesOutCounter = subscriptionStats.bytesOutCounter; + subsStats.msgOutCounter = subscriptionStats.msgOutCounter; subsStats.msgBacklog = subscriptionStats.msgBacklog; subsStats.msgDelayed = subscriptionStats.msgDelayed; subsStats.msgRateExpired = subscriptionStats.msgRateExpired; @@ -133,8 +135,6 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subsStats.msgRateOut += cStats.msgRateOut; subsStats.messageAckRate += cStats.messageAckRate; subsStats.msgThroughputOut += cStats.msgThroughputOut; - subsStats.bytesOutCounter += cStats.bytesOutCounter; - subsStats.msgOutCounter += cStats.msgOutCounter; if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) { subsStats.blockedSubscriptionOnUnackedMsgs = true; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index a33c54a9ed4228..ed50b09f9becbb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -352,6 +352,88 @@ public void testPerTopicStats() throws Exception { c2.close(); } + /** + * Test that the total message and byte counts for a topic are not reset when a consumer disconnects. + * + * @throws Exception + */ + @Test + public void testPerTopicStatsReconnect() throws Exception { + Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); + + Consumer c1 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + final int messages = 5; + final int pulsarMessageOverhead = 31; // Number of extra bytes pulsar adds to each message + final int messageSizeBytes = "my-message-n".getBytes().length + pulsarMessageOverhead; + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + } + + for (int i = 0; i < messages; i++) { + c1.acknowledge(c1.receive()); + } + + c1.close(); + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + } + + Consumer c2 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + for (int i = 0; i < messages; i++) { + c2.acknowledge(c2.receive()); + } + + p1.close(); + c2.close(); + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + + metrics.entries().forEach(e -> { + System.out.println(e.getKey() + ": " + e.getValue()); + }); + + List cm = (List) metrics.get("pulsar_in_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_in_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_out_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + + cm = (List) metrics.get("pulsar_out_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + } + @Test public void testPerTopicExpiredStat() throws Exception { String ns = "prop/ns-abc1";