Skip to content

Commit

Permalink
[Backport to 2.x] [Segment Replication] - Update replicas to commit S…
Browse files Browse the repository at this point in the history
…egmentInfos instead of relying on segments_N from primary shards. (#4450)

* Segment Replication - Fix NoSuchFileException errors caused when computing metadata snapshot on primary shards. (#4366)

* Segment Replication - Fix NoSuchFileException errors caused when computing metadata snapshot on primary shards.

This change fixes the errors that occur when computing metadata snapshots on primary shards from the latest in-memory SegmentInfos.  The error occurs when a segments_N file that is referenced by the in-memory infos is deleted as part of a concurrent commit.  The segments themselves are incref'd by IndexWriter.incRefDeleter but the commit file (Segments_N) is not.  This change resolves this by ignoring the segments_N file when computing metadata for CopyState and only sending incref'd segment files to replicas.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix spotless.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Update StoreTests.testCleanupAndPreserveLatestCommitPoint to assert additional segments are deleted.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Rename snapshot to metadataMap in CheckpointInfoResponse.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Refactor segmentReplicationDiff method to compute off two maps instead of MetadataSnapshots.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix spotless.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Revert catchall in SegmentReplicationSourceService.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Revert log lvl change.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix SegmentReplicationTargetTests

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Cleanup unused logger.

Signed-off-by: Marc Handalian <handalm@amazon.com>

Signed-off-by: Marc Handalian <handalm@amazon.com>
Co-authored-by: Suraj Singh <surajrider@gmail.com>

* [Segment Replication] - Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards. (#4402)

* Segment Replication - Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards.

This change updates replicas to commit SegmentInfos before the shard is closed, on receiving a new commit point from a primary, and when a new primary is detected. This change also makes the public commitSegmentInfos on NRTEngine obsolete, refactoring IndexShard to simply call reset on the engine.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove noise & extra log statement.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR feedback.

Signed-off-by: Marc Handalian <handalm@amazon.com>

Signed-off-by: Marc Handalian <handalm@amazon.com>

* [Segment Replicaiton] Fix merge conflict. Update EngineTestCase.getTranslog()

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless check apply fixes

Signed-off-by: Suraj Singh <surajrider@gmail.com>

Signed-off-by: Marc Handalian <handalm@amazon.com>
Signed-off-by: Suraj Singh <surajrider@gmail.com>
Co-authored-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
dreamer-89 and mch2 authored Sep 7, 2022
1 parent ceb0e17 commit 4170d37
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 148 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))

### Deprecated

Expand Down Expand Up @@ -55,6 +56,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
- [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366))
- [Segment Replication] Fix timeout issue by calculating time needed to process getSegmentFiles ([#4434](https://github.com/opensearch-project/OpenSearch/pull/4434))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards.

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class NRTReplicationEngine extends Engine implements LifecycleAware {
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

private static final int SI_COUNTER_INCREMENT = 10;

public NRTReplicationEngine(EngineConfig engineConfig) {
Expand Down Expand Up @@ -120,14 +122,16 @@ public TranslogManager translogManager() {

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
// Commit and roll the xlog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

Expand All @@ -141,20 +145,16 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
public void commitSegmentInfos() throws IOException {
// TODO: This method should wait for replication events to finalize.
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}

protected void commitSegmentInfos() throws IOException {
commitSegmentInfos(getLatestSegmentInfos());
}

@Override
public String getHistoryUUID() {
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
Expand Down Expand Up @@ -405,7 +405,16 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
IOUtils.close(readerManager, translogManager.getTranslog(), store::decRef);
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
* @throws IOException - When Refresh fails with an IOException.
*/
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
// roll over the currentInfo's generation, this ensures the on-disk gen
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefresh();
}
Expand Down
28 changes: 4 additions & 24 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ public void updateShardState(
if (indexSettings.isSegRepEnabled()) {
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
assert newRouting.primary() && currentRouting.primary() == false;
promoteNRTReplicaToPrimary();
resetEngineToGlobalCheckpoint();
}
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
ensurePeerRecoveryRetentionLeasesExist();
Expand Down Expand Up @@ -3571,7 +3571,9 @@ private void innerAcquireReplicaOperationPermit(
currentGlobalCheckpoint,
maxSeqNo
);
if (currentGlobalCheckpoint < maxSeqNo) {
// With Segment Replication enabled, we never want to reset a replica's engine unless
// it is promoted to primary.
if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
Expand Down Expand Up @@ -4132,26 +4134,4 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* With segment replication enabled - prepare the shard's engine to be promoted as the new primary.
*
* If this shard is currently using a replication engine, this method:
* 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point.
* InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos
* that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion.
* 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be
* any ack'd writes that were not copied to this replica before promotion.
*/
private void promoteNRTReplicaToPrimary() {
assert shardRouting.primary() && indexSettings.isSegRepEnabled();
getReplicationEngine().ifPresentOrElse(engine -> {
try {
engine.commitSegmentInfos();
resetEngineToGlobalCheckpoint();
} catch (IOException e) {
throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e);
}
}, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
+ temporaryFileName
+ "] in "
+ Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName));
// With Segment Replication, we will fsync after a full commit has been received.
if (store.indexSettings().isSegRepEnabled() == false) {
store.directory().sync(Collections.singleton(temporaryFileName));
}
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
Expand Down
Loading

0 comments on commit 4170d37

Please sign in to comment.