Skip to content

Commit

Permalink
[fix][broker] Correctly set byte and message out totals per subscript…
Browse files Browse the repository at this point in the history
…ion (apache#18451)

Signed-off-by: Paul Gier <paul.gier@datastax.com>

Fixes apache#15819

The existing code calculates the pulsar_out_bytes_total and pulsar_out_messages_total per subscription metrics by adding the values from the currently connected consumers. This produces incorrect values as soon as one or more of the consumers disconnects from the subscription.

This changes these two metrics to directly use the subscription stats for these values, and match the output of `pulsar-admin topic stats`.

Signed-off-by: Paul Gier <paul.gier@datastax.com>

Fixes apache#15819

### Motivation

The prometheus metrics for pulsar_out_bytes_total and pulsar_out_messages_total should never decrease,
and they should match the output seen when using pulsar-admin.

### Modifications

Changed the calculation of pulsar_out_bytes_total and pulsar_out_messages_total to directly use the
subscription stats instead of calculating these values by summing the values of the currently connected
consumers.

### Verifying this change

- [X] Make sure that the change passes the CI checks.

Added a unit test to cover this case.

### Does this pull request potentially affect one of the following parts:

*If the box was checked, please highlight the changes*

- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] Anything that affects deployment

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [X] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

### Matching PR in forked repository

PR in forked repository: pgier#2

(cherry picked from commit c03e33e)
  • Loading branch information
pgier authored and michaeljmarshall committed Nov 19, 2022
1 parent fcdf50e commit 54dccf9
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl
AggregatedSubscriptionStats subsStats) {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;
subsStats.bytesOutCounter = subscriptionStats.bytesOutCounter;
subsStats.msgOutCounter = subscriptionStats.msgOutCounter;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
Expand All @@ -127,8 +129,6 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.messageAckRate += cStats.messageAckRate;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,88 @@ public void testPerTopicStats() throws Exception {
c2.close();
}

/**
* Test that the total message and byte counts for a topic are not reset when a consumer disconnects.
*
* @throws Exception
*/
@Test
public void testPerTopicStatsReconnect() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();

Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

final int messages = 5;
final int pulsarMessageOverhead = 31; // Number of extra bytes pulsar adds to each message
final int messageSizeBytes = "my-message-n".getBytes().length + pulsarMessageOverhead;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}

for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
}

c1.close();

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}

Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

for (int i = 0; i < messages; i++) {
c2.acknowledge(c2.receive());
}

p1.close();
c2.close();

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);

metrics.entries().forEach(e -> {
System.out.println(e.getKey() + ": " + e.getValue());
});

List<Metric> cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");

cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");
}

@Test
public void testPerTopicExpiredStat() throws Exception {
String ns = "prop/ns-abc1";
Expand Down

0 comments on commit 54dccf9

Please sign in to comment.