Skip to content

Commit

Permalink
[fix][broker] Clean stats when resource is closed
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Aug 27, 2024
1 parent a5ebf73 commit e6acd0f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.val;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -311,19 +313,20 @@ public void unRegisterNameSpace(String resourceGroupName, NamespaceName fqNamesp
topicToReplicatorsMap.forEach((key, value) -> {
TopicName topicName = TopicName.get(key);
if (topicName.getNamespaceObject().equals(fqNamespaceName)) {
value.forEach(n -> invalidateAllKeyForReplication.add(key));
topicToReplicatorsMap.remove(key);
value.forEach(n -> invalidateAllKeyForReplication.add(getReplicatorKey(topicName.toString(), n)));
}
});
replicationDispatchStats.invalidateAll(invalidateAllKeyForReplication);

Set<TopicName> invalidateAllKeyForConsumer = new HashSet<>();
Set<String> invalidateAllKeyForConsumer = new HashSet<>();
topicConsumeStats.asMap().forEach((key, value) -> {
TopicName topicName = TopicName.get(key);
if (topicName.getNamespaceObject().equals(fqNamespaceName)) {
invalidateAllKeyForConsumer.add(topicName);
invalidateAllKeyForConsumer.add(key);
}
});
topicConsumeStats.invalidate(invalidateAllKeyForConsumer);
topicConsumeStats.invalidateAll(invalidateAllKeyForConsumer);

aggregateLock.unlock();
// Dissociate this NS-name from the RG.
Expand Down Expand Up @@ -369,12 +372,14 @@ public void unRegisterTopic(TopicName topicName) {
remove.registerUsage(topicNameString, ResourceGroupRefTypes.Topics,
false, this.resourceUsageTransportManagerMgr);
rgTopicUnRegisters.labels(remove.resourceGroupName).inc();
topicProduceStats.invalidate(topicNameString);
topicConsumeStats.invalidate(topicNameString);
Set<String> replicators = topicToReplicatorsMap.remove(topicNameString);
if (replicators != null) {
replicationDispatchStats.invalidateAll(replicators);
}
}
topicProduceStats.invalidate(topicNameString);
topicConsumeStats.invalidate(topicNameString);
Set<String> replicators = topicToReplicatorsMap.remove(topicNameString);
if (replicators != null) {
List<String> keys = replicators.stream().map(n -> getReplicatorKey(topicNameString, n))
.collect(Collectors.toList());
replicationDispatchStats.invalidateAll(keys);
}
aggregateLock.unlock();
}
Expand Down Expand Up @@ -577,7 +582,7 @@ protected void updateStatsWithDiff(String topicName, String replicationRemoteClu
String key;
if (monClass == ResourceGroupMonitoringClass.ReplicationDispatch) {
key = getReplicatorKey(topicName, replicationRemoteCluster);
topicToReplicatorsMap.compute(key, (n, value) -> {
topicToReplicatorsMap.compute(topicName, (n, value) -> {
if (value == null) {
value = new CopyOnWriteArraySet<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,9 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep
}

@Test
public void testCleanupStatsWhenNamespaceDeleted()
throws PulsarAdminException, PulsarClientException, InterruptedException {
public void testCleanupStatsWhenUnRegisterTopic()
throws PulsarAdminException {
String tenantName = UUID.randomUUID().toString();
admin.tenants().createTenant(tenantName,
TenantInfo.builder().allowedClusters(new HashSet<>(admin.clusters().getClusters())).build());
String nsName = tenantName + "/" + UUID.randomUUID();
admin.namespaces().createNamespace(nsName);
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig =
new org.apache.pulsar.common.policies.data.ResourceGroup();
final String rgName = UUID.randomUUID().toString();
Expand All @@ -307,32 +303,80 @@ public void testCleanupStatsWhenNamespaceDeleted()
rgConfig.setReplicationDispatchRateInBytes(2000L);
rgConfig.setReplicationDispatchRateInMsgs(400L);

admin.resourcegroups().createResourceGroup(rgName, rgConfig);
admin.namespaces().setNamespaceResourceGroup(nsName, rgName);
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
Assert.assertNotNull(rgs.getNamespaceResourceGroup(NamespaceName.get(nsName)));
});
rgs.resourceGroupCreate(rgName, rgConfig);
String nsName = tenantName + "/" + UUID.randomUUID();
TopicName topicName = TopicName.get(nsName + "/" + UUID.randomUUID());
String topic = topicName.toString();

String topic = nsName + "/" + UUID.randomUUID();
@Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
producer.send("hi".getBytes(StandardCharsets.UTF_8));
rgs.registerTopic(rgName, topicName);

rgs.aggregateResourceGroupLocalUsages();
producer.close();
// Simulate replicator
rgs.updateStatsWithDiff(topic, "remote-cluster", tenantName, nsName, 1, 1,
ResourceGroupMonitoringClass.ReplicationDispatch);
rgs.updateStatsWithDiff(topic, null, tenantName, nsName, 1, 1,
ResourceGroupMonitoringClass.Publish);
rgs.updateStatsWithDiff(topic, null, tenantName, nsName, 1, 1,
ResourceGroupMonitoringClass.Dispatch);
Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 1);
Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 1);
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 1);
Assert.assertEquals(rgs.getTopicToReplicatorsMap().size(), 1);
Set<String> replicators = rgs.getTopicToReplicatorsMap().get(rgs.getTopicToReplicatorsMap().keys().nextElement());
Assert.assertEquals(replicators.size(), 1);

rgs.unRegisterTopic(TopicName.get(topic));

Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 0);
Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 0);
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0);
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(nsName);
admin.resourcegroups().deleteResourceGroup(rgName);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 0);
Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 0);
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0);
Assert.assertNull(rgs.getNamespaceResourceGroup(NamespaceName.get(nsName)));
});
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0);
Assert.assertEquals(rgs.getTopicToReplicatorsMap().size(), 0);

rgs.resourceGroupDelete(rgName);
}

@Test
public void testCleanupStatsWhenUnRegisterNamespace()
throws PulsarAdminException {
String tenantName = UUID.randomUUID().toString();
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig =
new org.apache.pulsar.common.policies.data.ResourceGroup();
final String rgName = UUID.randomUUID().toString();
rgConfig.setPublishRateInBytes(15000L);
rgConfig.setPublishRateInMsgs(100);
rgConfig.setDispatchRateInBytes(40000L);
rgConfig.setDispatchRateInMsgs(500);
rgConfig.setReplicationDispatchRateInBytes(2000L);
rgConfig.setReplicationDispatchRateInMsgs(400L);

rgs.resourceGroupCreate(rgName, rgConfig);
String nsName = tenantName + "/" + UUID.randomUUID();
TopicName topicName = TopicName.get(nsName + "/" + UUID.randomUUID());
String topic = topicName.toString();

rgs.registerNameSpace(rgName, topicName.getNamespaceObject());

// Simulate replicator
rgs.updateStatsWithDiff(topic, "remote-cluster", tenantName, nsName, 1, 1,
ResourceGroupMonitoringClass.ReplicationDispatch);
rgs.updateStatsWithDiff(topic, null, tenantName, nsName, 1, 1,
ResourceGroupMonitoringClass.Publish);
rgs.updateStatsWithDiff(topic, null, tenantName, nsName, 1, 1,
ResourceGroupMonitoringClass.Dispatch);
Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 1);
Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 1);
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 1);
Set<String> replicators = rgs.getTopicToReplicatorsMap().get(rgs.getTopicToReplicatorsMap().keys().nextElement());
Assert.assertEquals(replicators.size(), 1);

rgs.unRegisterNameSpace(rgName, topicName.getNamespaceObject());

Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 0);
Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 0);
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0);
Assert.assertEquals(rgs.getTopicToReplicatorsMap().size(), 0);

rgs.resourceGroupDelete(rgName);
}

@Test
Expand Down

0 comments on commit e6acd0f

Please sign in to comment.