diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java index 72ca8bff4087d..faab2f405010a 100644 --- a/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java @@ -8,6 +8,8 @@ package org.opensearch.index; +import org.hamcrest.Matcher; +import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.cluster.service.ClusterService; @@ -23,6 +25,10 @@ import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + public class ShardIndexingPressureConcurrentExecutionTests extends OpenSearchTestCase { private final Settings settings = Settings.builder() @@ -34,8 +40,8 @@ public class ShardIndexingPressureConcurrentExecutionTests extends OpenSearchTes .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 100) .build(); - final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - final ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private final ClusterService clusterService = new ClusterService(settings, clusterSettings, null); public enum OperationType { COORDINATING, @@ -71,15 +77,11 @@ public void testCoordinatingPrimaryThreadedUpdateToShardLimits() throws Exceptio NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1).getCurrentCombinedCoordinatingAndPrimaryBytes() ); - assertTrue( + MatcherAssert.assertThat( (double) (NUM_THREADS * 15) / shardIndexingPressure.shardStats() .getIndexingPressureShardStats(shardId1) - .getCurrentPrimaryAndCoordinatingLimits() < 0.95 - ); - assertTrue( - (double) (NUM_THREADS * 15) / shardIndexingPressure.shardStats() - .getIndexingPressureShardStats(shardId1) - .getCurrentPrimaryAndCoordinatingLimits() > 0.75 + .getCurrentPrimaryAndCoordinatingLimits(), + isInOperatingFactorRange() ); for (int i = 0; i < NUM_THREADS; i++) { @@ -112,15 +114,11 @@ public void testReplicaThreadedUpdateToShardLimits() throws Exception { Releasable[] releasable = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.REPLICA); assertEquals(NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes()); - assertTrue( - (double) (NUM_THREADS * 15) / shardIndexingPressure.shardStats() - .getIndexingPressureShardStats(shardId1) - .getCurrentReplicaLimits() < 0.95 - ); - assertTrue( + MatcherAssert.assertThat( (double) (NUM_THREADS * 15) / shardIndexingPressure.shardStats() .getIndexingPressureShardStats(shardId1) - .getCurrentReplicaLimits() > 0.75 + .getCurrentReplicaLimits(), + isInOperatingFactorRange() ); for (int i = 0; i < NUM_THREADS; i++) { @@ -1087,4 +1085,11 @@ private void fireConcurrentAndParallelRequestsForUniformThroughPut( t.join(); } } + + private Matcher isInOperatingFactorRange() { + return allOf( + greaterThan(ShardIndexingPressureMemoryManager.LOWER_OPERATING_FACTOR.get(settings)), + lessThanOrEqualTo(ShardIndexingPressureMemoryManager.UPPER_OPERATING_FACTOR.get(settings)) + ); + } }