diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 4ba414623ab787..78215d194e4439 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -115,6 +115,8 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl AggregatedSubscriptionStats subsStats) { stats.subscriptionsCount++; stats.msgBacklog += subscriptionStats.msgBacklog; + subsStats.bytesOutCounter = subscriptionStats.bytesOutCounter; + subsStats.msgOutCounter = subscriptionStats.msgOutCounter; subsStats.msgBacklog = subscriptionStats.msgBacklog; subsStats.msgDelayed = subscriptionStats.msgDelayed; subsStats.msgRateExpired = subscriptionStats.msgRateExpired; @@ -133,8 +135,6 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subsStats.msgRateOut += cStats.msgRateOut; subsStats.messageAckRate += cStats.messageAckRate; subsStats.msgThroughputOut += cStats.msgThroughputOut; - subsStats.bytesOutCounter += cStats.bytesOutCounter; - subsStats.msgOutCounter += cStats.msgOutCounter; if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) { subsStats.blockedSubscriptionOnUnackedMsgs = true; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index a33c54a9ed4228..ed50b09f9becbb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -352,6 +352,88 @@ public void testPerTopicStats() throws Exception { c2.close(); } + /** + * Test that the total message and byte counts for a topic are not reset when a consumer disconnects. + * + * @throws Exception + */ + @Test + public void testPerTopicStatsReconnect() throws Exception { + Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); + + Consumer c1 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + final int messages = 5; + final int pulsarMessageOverhead = 31; // Number of extra bytes pulsar adds to each message + final int messageSizeBytes = "my-message-n".getBytes().length + pulsarMessageOverhead; + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + } + + for (int i = 0; i < messages; i++) { + c1.acknowledge(c1.receive()); + } + + c1.close(); + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + } + + Consumer c2 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + for (int i = 0; i < messages; i++) { + c2.acknowledge(c2.receive()); + } + + p1.close(); + c2.close(); + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + + metrics.entries().forEach(e -> { + System.out.println(e.getKey() + ": " + e.getValue()); + }); + + List cm = (List) metrics.get("pulsar_in_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_in_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_out_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + + cm = (List) metrics.get("pulsar_out_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + } + @Test public void testPerTopicExpiredStat() throws Exception { String ns = "prop/ns-abc1";