Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable shard idle with segment replication. #4118

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,18 @@ protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(Search
return readerManager;
}

/**
* Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external
* refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher.
* With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by
* comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true.
* Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos.
*/
@Override
public boolean refreshNeeded() {
return false;
}

@Override
public Closeable acquireHistoryRetentionLock() {
throw new UnsupportedOperationException("Not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3778,6 +3778,10 @@ public boolean scheduledRefresh() {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& isSearchIdle()
&& indexSettings.isExplicitRefresh() == false
&& indexSettings.isSegRepEnabled() == false
// Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only
// after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
// a new set of segments.
&& active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.indices.replication.common.ReplicationType;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

private static final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();

public void testIgnoreShardIdle() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

final int numDocs = shards.indexDocs(randomInt(10));
primary.refresh("test");
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs);

primary.scheduledRefresh();
replica.scheduledRefresh();

primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));

// Update the search_idle setting, this will put both shards into search idle.
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
primary.indexSettings().getScopedSettings().applySettings(updatedSettings);
replica.indexSettings().getScopedSettings().applySettings(updatedSettings);

primary.scheduledRefresh();
replica.scheduledRefresh();

// Shards without segrep will register a new RefreshListener on the engine and return true when registered,
// assert with segrep enabled that awaitShardSearchActive does not register a listener.
primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ protected ReplicationGroup createGroup(int replicas, Settings settings) throws I
return new ReplicationGroup(metadata);
}

protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFactory engineFactory) throws IOException {
IndexMetadata metadata = buildIndexMetadata(replicas, settings, indexMapping);
return new ReplicationGroup(metadata) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return engineFactory;
}
};
}

protected IndexMetadata buildIndexMetadata(int replicas) throws IOException {
return buildIndexMetadata(replicas, indexMapping);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.junit.Assert;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.PlainActionFuture;
Expand All @@ -58,6 +64,7 @@
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.internal.io.IOUtils;
Expand All @@ -82,6 +89,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
Expand All @@ -94,7 +102,14 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.recovery.StartRecoveryRequest;
import org.opensearch.indices.replication.CheckpointInfoResponse;
import org.opensearch.indices.replication.GetSegmentFilesResponse;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.IndexId;
Expand All @@ -112,6 +127,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -122,6 +138,7 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Mockito.mock;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;

/**
Expand Down Expand Up @@ -1133,4 +1150,117 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) {
}
};
}

/**
* Segment Replication specific test method - Replicate segments to a list of replicas from a given primary.
* This test will use a real {@link SegmentReplicationTarget} for each replica with a mock {@link SegmentReplicationSource} that
* writes all segments directly to the target.
*/
public final void replicateSegments(IndexShard primaryShard, List<IndexShard> replicaShards) throws IOException, InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size());
Store.MetadataSnapshot primaryMetadata;
try (final GatedCloseable<SegmentInfos> segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) {
final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get();
primaryMetadata = primaryShard.store().getMetadata(primarySegmentInfos);
}
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard);

final ReplicationCollection<SegmentReplicationTarget> replicationCollection = new ReplicationCollection<>(logger, threadPool);
final SegmentReplicationSource source = new SegmentReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
listener.onResponse(
new CheckpointInfoResponse(
copyState.getCheckpoint(),
copyState.getMetadataSnapshot(),
copyState.getInfosBytes(),
copyState.getPendingDeleteFiles()
)
);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
Store store,
ActionListener<GetSegmentFilesResponse> listener
) {
try (
final ReplicationCollection.ReplicationRef<SegmentReplicationTarget> replicationRef = replicationCollection.get(
replicationId
)
) {
writeFileChunks(replicationRef.get(), primaryShard, filesToFetch.toArray(new StoreFileMetadata[] {}));
} catch (IOException e) {
listener.onFailure(e);
}
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};

for (IndexShard replica : replicaShards) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
ReplicationCheckpoint.empty(replica.shardId),
replica,
source,
new ReplicationListener() {
@Override
public void onDone(ReplicationState state) {
try (final GatedCloseable<SegmentInfos> snapshot = replica.getSegmentInfosSnapshot()) {
final SegmentInfos replicaInfos = snapshot.get();
final Store.MetadataSnapshot replicaMetadata = replica.store().getMetadata(replicaInfos);
final Store.RecoveryDiff recoveryDiff = primaryMetadata.recoveryDiff(replicaMetadata);
assertTrue(recoveryDiff.missing.isEmpty());
assertTrue(recoveryDiff.different.isEmpty());
assertEquals(recoveryDiff.identical.size(), primaryMetadata.size());
assertEquals(primaryMetadata.getCommitUserData(), replicaMetadata.getCommitUserData());
} catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
countDownLatch.countDown();
}

@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
logger.error("Unexpected replication failure in test", e);
Assert.fail("test replication should not fail: " + e);
}
}
);
replicationCollection.start(target, TimeValue.timeValueMillis(5000));
target.startReplication(new ActionListener<>() {
@Override
public void onResponse(Void o) {
replicationCollection.markAsDone(target.getId());
}

@Override
public void onFailure(Exception e) {
replicationCollection.fail(target.getId(), new OpenSearchException("Segment Replication failed", e), true);
}
});
}
countDownLatch.await(3, TimeUnit.SECONDS);
}

private void writeFileChunks(SegmentReplicationTarget target, IndexShard primary, StoreFileMetadata[] files) throws IOException {
for (StoreFileMetadata md : files) {
try (IndexInput in = primary.store().directory().openInput(md.name(), IOContext.READONCE)) {
int pos = 0;
while (pos < md.length()) {
int length = between(1, Math.toIntExact(md.length() - pos));
byte[] buffer = new byte[length];
in.readBytes(buffer, 0, length);
target.writeFileChunk(md, pos, new BytesArray(buffer), pos + length == md.length(), 0, mock(ActionListener.class));
pos += length;
}
}
}
}
}