Skip to content

Commit

Permalink
Add a unit test to test repeated updates of resource-group.
Browse files Browse the repository at this point in the history
  • Loading branch information
Bharani Chadalavada committed Aug 10, 2021
1 parent 9af5e53 commit 2daf479
Showing 1 changed file with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
import static org.apache.pulsar.common.policies.path.PolicyPath.path;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import com.google.common.collect.Sets;
import java.util.Random;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -34,6 +38,7 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
public class ResourceGroupConfigListenerTest extends MockedPulsarServiceBaseTest {

ResourceGroup testAddRg = new ResourceGroup();
Expand Down Expand Up @@ -166,6 +171,51 @@ public void testResourceGroupCreateMany() throws Exception {
});
}

@Test
public void testResourceGroupUpdateLoop() throws PulsarAdminException {

ResourceGroup zooRg = new ResourceGroup();
pulsar.getPulsarResources().getResourcegroupResources().getStore().registerListener(
notification -> {
String notifyPath = notification.getPath();
String rgName = notifyPath.substring(notifyPath.lastIndexOf('/') + 1);
if (!notifyPath.startsWith(path(RESOURCEGROUPS))) {
return;
}
if (RESOURCEGROUPS.equals(rgName)) {
return;
}
pulsar.getPulsarResources().getResourcegroupResources()
.getAsync(notifyPath).whenComplete((optionalRg, ex) -> {
if (ex != null) {
return;
}
if (optionalRg.isPresent()) {
ResourceGroup resourceGroup = optionalRg.get();

zooRg.setDispatchRateInBytes(resourceGroup.getDispatchRateInBytes());
zooRg.setDispatchRateInMsgs(resourceGroup.getDispatchRateInMsgs());
zooRg.setPublishRateInBytes(resourceGroup.getPublishRateInBytes());
zooRg.setPublishRateInMsgs(resourceGroup.getPublishRateInMsgs());
}
});
}
);
ResourceGroup rg = new ResourceGroup();
rg.setPublishRateInMsgs(-1);
rg.setPublishRateInBytes(10);
rg.setDispatchRateInMsgs(10);
rg.setDispatchRateInBytes(20);
createResourceGroup("myrg", rg);

for (int i = 0; i < 100; i++) {
rg.setPublishRateInMsgs(i);
updateResourceGroup("myrg", rg);
}

Awaitility.await().untilAsserted(() -> assertEquals(zooRg.getPublishRateInMsgs(), rg.getPublishRateInMsgs()));
}

private void prepareData() throws PulsarAdminException {
admin.clusters().createCluster(clusterName, ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());

Expand All @@ -175,4 +225,4 @@ private void prepareData() throws PulsarAdminException {
testAddRg.setDispatchRateInBytes(200);

}
}
}

0 comments on commit 2daf479

Please sign in to comment.