diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index 27ce07f26c452..1b6f939c1ffb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -454,14 +454,13 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas bytesUsed = monEntity.usedLocallySinceLastReport.bytes; messagesUsed = monEntity.usedLocallySinceLastReport.messages; - + monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0; if (sendReport) { p.setBytesPerPeriod(bytesUsed); p.setMessagesPerPeriod(messagesUsed); monEntity.lastReportedValues.bytes = bytesUsed; monEntity.lastReportedValues.messages = messagesUsed; monEntity.numSuppressedUsageReports = 0; - monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0; monEntity.totalUsedLocally.bytes += bytesUsed; monEntity.totalUsedLocally.messages += messagesUsed; monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java index 476e08003bbce..7ba0ba86d124b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java @@ -72,34 +72,50 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) { rgConfig.setPublishRateInMsgs(2000); service.resourceGroupCreate(rgName, rgConfig); - org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName); BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount(); bytesAndMessagesCount.bytes = 20; bytesAndMessagesCount.messages = 10; - resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount); + + org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName); + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount); + } + + // Case1: Suppress report ResourceUsage. + needReport.set(false); ResourceUsage resourceUsage = new ResourceUsage(); resourceGroup.rgFillResourceUsage(resourceUsage); assertFalse(resourceUsage.hasDispatch()); assertFalse(resourceUsage.hasPublish()); + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + PerMonitoringClassFields monitoredEntity = + resourceGroup.getMonitoredEntity(value); + assertEquals(monitoredEntity.usedLocallySinceLastReport.messages, 0); + assertEquals(monitoredEntity.usedLocallySinceLastReport.bytes, 0); + assertEquals(monitoredEntity.totalUsedLocally.messages, 0); + assertEquals(monitoredEntity.totalUsedLocally.bytes, 0); + assertEquals(monitoredEntity.lastReportedValues.messages, 0); + assertEquals(monitoredEntity.lastReportedValues.bytes, 0); + } - PerMonitoringClassFields publishMonitoredEntity = - resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish); - assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages); - assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes); - assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0); - assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0); - assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0); - assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0); - + // Case2: Report ResourceUsage. + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount); + } needReport.set(true); + resourceUsage = new ResourceUsage(); resourceGroup.rgFillResourceUsage(resourceUsage); assertTrue(resourceUsage.hasDispatch()); assertTrue(resourceUsage.hasPublish()); - assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0); - assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0); - assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages); - assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes); - assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages); - assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes); + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + PerMonitoringClassFields monitoredEntity = + resourceGroup.getMonitoredEntity(value); + assertEquals(monitoredEntity.usedLocallySinceLastReport.messages, 0); + assertEquals(monitoredEntity.usedLocallySinceLastReport.bytes, 0); + assertEquals(monitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages); + assertEquals(monitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes); + assertEquals(monitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages); + assertEquals(monitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes); + } } } \ No newline at end of file