Skip to content

Commit

Permalink
Bump 'mockito' from 5.2.0 to 5.4.0 (#8181)
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
reta authored and ashking94 committed Jun 22, 2023
1 parent 1f6a3bc commit e574297
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.networknt:json-schema-validator` from 1.0.83 to 1.0.84 (#8141)
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.1.3 to 12.1.4 (#8139)
- Bump `commons-io:commons-io` from 2.12.0 to 2.13.0 in /plugins/discovery-azure-classic ([#8140](https://github.com/opensearch-project/OpenSearch/pull/8140))
- Bump `mockito` from 5.2.0 to 5.4.0 ([#8181](https://github.com/opensearch-project/OpenSearch/pull/8181))

### Changed
- Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https://github.com/opensearch-project/OpenSearch/pull/7836))
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ bouncycastle=1.70
randomizedrunner = 2.7.1
junit = 4.13.2
hamcrest = 2.1
mockito = 5.2.0
mockito = 5.4.0
objenesis = 3.2
bytebuddy = 1.14.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected void deleteRepo() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
Expand All @@ -88,13 +88,14 @@ protected void setup(Path repoLocation, double ioFailureRate, String skipExcepti
.put("max_failure_number", maxFailure)
);

internalCluster().startDataOnlyNodes(1);
String dataNodeName = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> Cluster is yellow with no initializing shards");
ensureGreen(INDEX_NAME);
logger.info("--> Cluster is green");
return dataNodeName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,25 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT;
import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -59,4 +67,116 @@ public void testWritesRejected() {
assertTrue(stats.rejectionCount > 0);
deleteRepo();
}

public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception {
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), 15, "failure_streak_count");
}

public void testWritesRejectedDueToBytesLagBreach() throws Exception {
validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), 15, "bytes_lag");
}

public void testWritesRejectedDueToTimeLagBreach() throws Exception {
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag");
}

private void validateBackpressure(
int initialDocSize,
int initialDocsToIndex,
int onFailureDocSize,
int onFailureDocsToIndex,
String breachMode
) throws Exception {
Path location = randomRepoPath().toAbsolutePath();
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);

Settings request = Settings.builder()
.put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true)
.put(MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 10)
.build();
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(request)
.get();
assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");
assertEquals(clusterUpdateResponse.getPersistentSettings().get(MIN_CONSECUTIVE_FAILURES_LIMIT.getKey()), "10");

logger.info("--> Indexing data");

String jsonString = generateString(initialDocSize);
BytesReference initialSource = new BytesArray(jsonString);
indexDocAndRefresh(initialSource, initialDocsToIndex);

((MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName).repository(REPOSITORY_NAME))
.setRandomControlIOExceptionRate(1d);

jsonString = generateString(onFailureDocSize);
BytesReference onFailureSource = new BytesArray(jsonString);
OpenSearchRejectedExecutionException ex = assertThrows(
OpenSearchRejectedExecutionException.class,
() -> indexDocAndRefresh(onFailureSource, onFailureDocsToIndex)
);
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
assertTrue(ex.getMessage().contains(breachMode));

RemoteRefreshSegmentTracker.Stats stats = stats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
assertTrue(stats.rejectionCount > 0);

((MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName).repository(REPOSITORY_NAME))
.setRandomControlIOExceptionRate(0d);

assertBusy(() -> {
RemoteRefreshSegmentTracker.Stats finalStats = stats();
assertEquals(0, finalStats.bytesLag);
assertEquals(0, finalStats.refreshTimeLagMs);
assertEquals(0, finalStats.localRefreshNumber - finalStats.remoteRefreshNumber);
}, 30, TimeUnit.SECONDS);
deleteRepo();
}

private RemoteRefreshSegmentTracker.Stats stats() {
String shardId = "0";
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
return matches.get(0).getStats();
}

private void indexDocAndRefresh(BytesReference source, int iterations) {
for (int i = 0; i < iterations; i++) {
client().prepareIndex(INDEX_NAME).setSource(source, XContentType.JSON).get();
refresh(INDEX_NAME);
}
}

/**
* Generates string of given sizeInBytes
*
* @param sizeInBytes size of the string
* @return the generated string
*/
private String generateString(int sizeInBytes) {
StringBuilder sb = new StringBuilder();
sb.append("{");
int i = 0;
// Based on local tests, 1 char is occupying 1 byte
while (sb.length() < sizeInBytes) {
String key = "field" + i;
String value = "value" + i;
sb.append("\"").append(key).append("\":\"").append(value).append("\",");
i++;
}
if (sb.length() > 1 && sb.charAt(sb.length() - 1) == ',') {
sb.setLength(sb.length() - 1);
}
sb.append("}");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public long getFailureCount() {
return failureCounter.get();
}

private final double randomControlIOExceptionRate;
private volatile double randomControlIOExceptionRate;

private final double randomDataFileIOExceptionRate;

Expand Down Expand Up @@ -246,6 +246,10 @@ public synchronized void unblock() {
this.notifyAll();
}

public void setRandomControlIOExceptionRate(double randomControlIOExceptionRate) {
this.randomControlIOExceptionRate = randomControlIOExceptionRate;
}

public void blockOnDataFiles(boolean blocked) {
blockOnDataFiles = blocked;
}
Expand Down

0 comments on commit e574297

Please sign in to comment.