From f440eea6f2282bbf34b06c6e6d1b7e2ce201ab1a Mon Sep 17 00:00:00 2001 From: Paul Gier Date: Fri, 18 Nov 2022 22:55:39 -0600 Subject: [PATCH] [fix][broker] Correctly set byte and message out totals per subscription (#18451) Signed-off-by: Paul Gier Fixes #15819 The existing code calculates the pulsar_out_bytes_total and pulsar_out_messages_total per subscription metrics by adding the values from the currently connected consumers. This produces incorrect values as soon as one or more of the consumers disconnects from the subscription. This changes these two metrics to directly use the subscription stats for these values, and match the output of `pulsar-admin topic stats`. Signed-off-by: Paul Gier Fixes #15819 ### Motivation The prometheus metrics for pulsar_out_bytes_total and pulsar_out_messages_total should never decrease, and they should match the output seen when using pulsar-admin. ### Modifications Changed the calculation of pulsar_out_bytes_total and pulsar_out_messages_total to directly use the subscription stats instead of calculating these values by summing the values of the currently connected consumers. ### Verifying this change - [X] Make sure that the change passes the CI checks. Added a unit test to cover this case. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [X] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: https://github.com/pgier/pulsar/pull/2 --- .../prometheus/NamespaceStatsAggregator.java | 4 +- .../broker/stats/PrometheusMetricsTest.java | 82 +++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 4ba414623ab787..78215d194e4439 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -115,6 +115,8 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl AggregatedSubscriptionStats subsStats) { stats.subscriptionsCount++; stats.msgBacklog += subscriptionStats.msgBacklog; + subsStats.bytesOutCounter = subscriptionStats.bytesOutCounter; + subsStats.msgOutCounter = subscriptionStats.msgOutCounter; subsStats.msgBacklog = subscriptionStats.msgBacklog; subsStats.msgDelayed = subscriptionStats.msgDelayed; subsStats.msgRateExpired = subscriptionStats.msgRateExpired; @@ -133,8 +135,6 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subsStats.msgRateOut += cStats.msgRateOut; subsStats.messageAckRate += cStats.messageAckRate; subsStats.msgThroughputOut += cStats.msgThroughputOut; - subsStats.bytesOutCounter += cStats.bytesOutCounter; - subsStats.msgOutCounter += cStats.msgOutCounter; if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) { subsStats.blockedSubscriptionOnUnackedMsgs = true; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 3a3dcc1ec64979..989672ac2da978 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -358,6 +358,88 @@ public void testPerTopicStats() throws Exception { c2.close(); } + /** + * Test that the total message and byte counts for a topic are not reset when a consumer disconnects. + * + * @throws Exception + */ + @Test + public void testPerTopicStatsReconnect() throws Exception { + Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); + + Consumer c1 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + final int messages = 5; + final int pulsarMessageOverhead = 31; // Number of extra bytes pulsar adds to each message + final int messageSizeBytes = "my-message-n".getBytes().length + pulsarMessageOverhead; + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + } + + for (int i = 0; i < messages; i++) { + c1.acknowledge(c1.receive()); + } + + c1.close(); + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + } + + Consumer c2 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + for (int i = 0; i < messages; i++) { + c2.acknowledge(c2.receive()); + } + + p1.close(); + c2.close(); + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + + metrics.entries().forEach(e -> { + System.out.println(e.getKey() + ": " + e.getValue()); + }); + + List cm = (List) metrics.get("pulsar_in_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_in_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_out_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + + cm = (List) metrics.get("pulsar_out_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + } + @Test public void testPerTopicExpiredStat() throws Exception { String ns = "prop/ns-abc1";