From ae5c5e0004f41f0f8cbd27a5009cef9aa3693f38 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 3 Sep 2024 14:44:56 +0530 Subject: [PATCH] Add testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock Signed-off-by: Sumit Bansal --- .../ClusterManagerTaskThrottlerTests.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index 78238eb7b0f84..17f056331813d 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -33,7 +33,9 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import static java.lang.Thread.sleep; import static org.opensearch.test.ClusterServiceUtils.setState; /** @@ -474,6 +476,80 @@ public void testThrottlingWithLock() { assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); } + public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { + ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); + String taskKey = "test"; + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { + return clusterService.getClusterManagerService().getMinNodeVersion(); + }, throttlingStats); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); + + throttler.updateLimit(taskKey, 5); + + // adding 3 tasks + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + // adding 3 more tasks, these tasks should be throttled + // taskCount in Queue: 3 Threshold: 5 + assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) + ); + assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); + + // remove one task + throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + + // add 3 tasks should pass now. + // taskCount in Queue: 2 Threshold: 5 + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + final CountDownLatch latch = new CountDownLatch(1); + // Taking lock on tasksCount will not impact throttling behaviour now. + var threadToLock = new Thread(() -> { + throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return 10L; + }); + }); + threadToLock.start(); + + // submit 1000 threads to verify throttlingCount behaviour as well + final CountDownLatch latch2 = new CountDownLatch(1000); + IntStream.range(0, 1000).forEach(i -> new Thread(() -> { + // adding one task will throttle + // taskCount in Queue: 5 Threshold: 5 + final ClusterManagerThrottlingException exception = assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + ); + assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); + assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); + latch2.countDown(); + }).start()); + + try { + latch2.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(1003L, throttlingStats.getThrottlingCount(taskKey)); + + try { + latch.countDown(); + // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread + threadToLock.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); + } + private List getMockUpdateTaskList( String taskKey, ClusterManagerTaskThrottler.ThrottlingKey throttlingKey,