Skip to content

Commit

Permalink
Add testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Bansal <sumitsb@amazon.com>
  • Loading branch information
sumitasr committed Sep 3, 2024
1 parent 9f7edc3 commit ae5c5e0
Showing 1 changed file with 76 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static java.lang.Thread.sleep;
import static org.opensearch.test.ClusterServiceUtils.setState;

/**
Expand Down Expand Up @@ -474,6 +476,80 @@ public void testThrottlingWithLock() {
assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey));
}

public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getClusterManagerService().getMinNodeVersion();
}, throttlingStats);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true);

throttler.updateLimit(taskKey, 5);

// adding 3 tasks
throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));

// adding 3 more tasks, these tasks should be throttled
// taskCount in Queue: 3 Threshold: 5
assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3))
);
assertEquals(3L, throttlingStats.getThrottlingCount(taskKey));

// remove one task
throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1));

// add 3 tasks should pass now.
// taskCount in Queue: 2 Threshold: 5
throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));

final CountDownLatch latch = new CountDownLatch(1);
// Taking lock on tasksCount will not impact throttling behaviour now.
var threadToLock = new Thread(() -> {
throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10L;
});
});
threadToLock.start();

// submit 1000 threads to verify throttlingCount behaviour as well
final CountDownLatch latch2 = new CountDownLatch(1000);
IntStream.range(0, 1000).forEach(i -> new Thread(() -> {
// adding one task will throttle
// taskCount in Queue: 5 Threshold: 5
final ClusterManagerThrottlingException exception = assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1))
);
assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage());
assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey));
latch2.countDown();
}).start());

try {
latch2.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals(1003L, throttlingStats.getThrottlingCount(taskKey));

try {
latch.countDown();
// Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread
threadToLock.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey));
}

private List<TaskBatcherTests.TestTaskBatcher.UpdateTask> getMockUpdateTaskList(
String taskKey,
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey,
Expand Down

0 comments on commit ae5c5e0

Please sign in to comment.