Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Fix Flaky in Test SegmentReplicationRelocationIT #6637

Merged
merged 10 commits into from
Mar 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
Expand All @@ -33,6 +34,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

/**
* This test class verifies primary shard relocation with segment replication as replication strategy.
Expand Down Expand Up @@ -494,4 +496,73 @@ public void testAddNewReplicaFailure() throws Exception {
assertTrue(clusterHealthResponse.isTimedOut());
ensureYellow(INDEX_NAME);
}

public void testFlushAfterRelocation() throws Exception {
// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to control refreshes
.put("index.refresh_interval", -1)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// Start another empty node for relocation
final String newPrimary = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.execute()
.actionGet();
assertEquals(clusterHealthResponse.isTimedOut(), false);
ensureGreen(INDEX_NAME);

// Start indexing docs
final int initialDocCount = scaledRandomIntBetween(2000, 3000);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

// Verify segment replication event never happened on replica shard
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
assertFalse(segmentReplicationStatsResponse.hasSegmentReplicationStats());

// Relocate primary to new primary. When new primary starts it does perform a flush.
logger.info("--> relocate the shard from primary to newPrimary");
ActionFuture<ClusterRerouteResponse> relocationListener = client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, primaryNode, newPrimary))
.execute();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertEquals(clusterHealthResponse.isTimedOut(), false);

// Verify if all docs are present in replica after relocation, if new relocated primary doesn't flush after relocation the below
// assert
// will fail
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be accomodated in a single line.

assertBusy(
() -> {
assertHitCount(
client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(),
initialDocCount
);
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,22 @@ public List<Segment> segments(boolean verbose) {
}

@Override
public void refresh(String source) throws EngineException {}
public void refresh(String source) throws EngineException {
maybeRefresh(source);
}

@Override
public boolean maybeRefresh(String source) throws EngineException {
return false;
try {
return readerManager.maybeRefresh();
} catch (IOException e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {
@Override
protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException {
Objects.requireNonNull(referenceToRefresh);
// checks if an actual refresh (change in segments) happened
if (unwrapStandardReader(referenceToRefresh).getSegmentInfos().version == currentInfos.version) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pls add a unit test for this change in NRTReplicationEngineTests ?

return null;
}
final List<LeafReader> subs = new ArrayList<>();
final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh);
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ public void updateShardState(
: "a primary relocation is completed by the cluster-managerr, but primary mode is not active " + currentRouting;

changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");

// Flush here after relocation of primary, so that replica get all changes from new primary rather than waiting for more
// docs to get indexed.
if (indexSettings.isSegRepEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here on why this is required? I think this is so new primary shards will push to replicas after relocation rather than waiting for more docs to get indexed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Also, it would be better to add a test verifying segrep is forced on replica copies.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, new primary shards will push to replicas after relocation rather than waiting for more docs to get indexed

flush(new FlushRequest().waitIfOngoing(true).force(true));
}
} else if (currentRouting.primary()
&& currentRouting.relocating()
&& replicationTracker.isRelocated()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,12 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.ReplicationTask;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -33,18 +27,22 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

import org.opensearch.action.support.replication.ReplicationMode;

/**
* Replication action responsible for publishing checkpoint to a replica shard.
*
Expand Down Expand Up @@ -110,32 +108,35 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) {
* Publish checkpoint request to shard
*/
final void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
String primaryAllocationId = indexShard.routingEntry().allocationId().getId();
long primaryTerm = indexShard.getPendingPrimaryTerm();
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
transportService.sendChildRequest(
indexShard.recoveryState().getTargetNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

final List<ShardRouting> replicationTargets = indexShard.getReplicationGroup().getReplicationTargets();
for (ShardRouting replicationTarget : replicationTargets) {
if (replicationTarget.primary()) {
continue;
}
final DiscoveryNode node = clusterService.state().nodes().get(replicationTarget.currentNodeId());
final ConcreteReplicaRequest<PublishCheckpointRequest> replicaRequest = new ConcreteReplicaRequest<>(
request,
replicationTarget.allocationId().getId(),
primaryTerm,
indexShard.getLastKnownGlobalCheckpoint(),
indexShard.getMaxSeqNoOfUpdatesOrDeletes()
);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
ActionListener<ReplicationOperation.ReplicaResponse> listener = new ActionListener<>() {
@Override
public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) {
public void handleResponse(ReplicationResponse response) {
timer.stop();
logger.trace(
() -> new ParameterizedMessage(
Expand All @@ -150,36 +151,29 @@ public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) {
}

@Override
public void onFailure(Exception e) {
public void handleException(TransportException e) {
timer.stop();
logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time());
task.setPhase("finished");
taskManager.unregister(task);
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down
return;
}
if (ExceptionsHelper.unwrap(
e,
NodeClosedException.class,
IndexNotFoundException.class,
AlreadyClosedException.class,
IndexShardClosedException.class
IndexShardClosedException.class,
ShardNotInPrimaryModeException.class
) != null) {
// the index was deleted or the shard is closed
// Node is shutting down or the index was deleted or the shard is closed
return;
}
logger.warn(
new ParameterizedMessage("{} segment replication checkpoint publishing failed", indexShard.shardId()),
e
);
}
};
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(
listener,
ReplicaResponse::new
);
transportService.sendChildRequest(node, transportReplicaAction, replicaRequest, task, transportOptions, handler);
}
}
);
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] Publishing replication checkpoint [{}]",
Expand All @@ -196,7 +190,7 @@ protected void shardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<PublishCheckpointRequest, ReplicationResponse>> listener
) {
throw new OpenSearchException("PublishCheckpointAction should not hit primary shards");
ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.seqno.LocalCheckpointTracker;
Expand Down Expand Up @@ -182,6 +184,28 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep
}
}

public void testRefreshOnNRTEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());

ReferenceManager<OpenSearchDirectoryReader> referenceManager = nrtEngine.getReferenceManager(Engine.SearcherScope.EXTERNAL);
OpenSearchDirectoryReader readerBeforeRefresh = referenceManager.acquire();

nrtEngine.refresh("test refresh");
OpenSearchDirectoryReader readerAfterRefresh = referenceManager.acquire();

// Verify both readers before and after refresh are same and no change in segments
assertSame(readerBeforeRefresh, readerAfterRefresh);

}
}

public void testTrimTranslogOps() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.index.shard;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.junit.Assert;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -569,21 +568,15 @@ public void testReplicaReceivesLowerGeneration() throws Exception {
numDocs = randomIntBetween(numDocs + 1, numDocs + 10);
shards.indexDocs(numDocs);
flushShard(primary, false);
assertLatestCommitGen(4, primary);
replicateSegments(primary, List.of(replica_1));

assertEqualCommittedSegments(primary, replica_1);
assertLatestCommitGen(4, primary);
assertLatestCommitGen(5, replica_1);
assertLatestCommitGen(3, replica_2);

shards.promoteReplicaToPrimary(replica_2).get();
primary.close("demoted", false);
primary.store().close();
IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId());
shards.recoverReplica(oldPrimary);
assertLatestCommitGen(5, oldPrimary);
assertLatestCommitGen(5, replica_2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this, this test is flaky and we really don't care about the generation only that the doc counts are equal.


numDocs = randomIntBetween(numDocs + 1, numDocs + 10);
shards.indexDocs(numDocs);
Expand Down Expand Up @@ -1078,14 +1071,6 @@ private IndexShard failAndPromoteRandomReplica(ReplicationGroup shards) throws I
return newPrimary;
}

private void assertLatestCommitGen(long expected, IndexShard... shards) throws IOException {
for (IndexShard indexShard : shards) {
try (final GatedCloseable<IndexCommit> commit = indexShard.acquireLastIndexCommit(false)) {
assertEquals(expected, commit.get().getGeneration());
}
}
}

private void assertEqualCommittedSegments(IndexShard primary, IndexShard... replicas) throws IOException {
for (IndexShard replica : replicas) {
final SegmentInfos replicaInfos = replica.store().readLastCommittedSegmentsInfo();
Expand Down
Loading