Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Replication - Fix NoSuchFileException errors caused when computing metadata snapshot on primary shards. #4366

Merged
merged 12 commits into from
Sep 6, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352))
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))
- Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366))
- [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414))
- Fixed the `_cat/shards/10_basic.yml` test cases fix.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.lucene.index.SegmentInfos;
import org.junit.BeforeClass;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
Expand Down Expand Up @@ -586,13 +585,56 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException {
ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState();
final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId());
IndexShard indexShard = getIndexShard(replicaNode.getName());
final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory());
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName);
indexShard.store().readLastCommittedSegmentsInfo();
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

public void testDropPrimaryDuringReplication() throws Exception {
final Settings settings = Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
createIndex(INDEX_NAME, settings);
internalCluster().startDataOnlyNodes(6);
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
// don't wait for replication to complete, stop the primary immediately.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
ensureYellow(INDEX_NAME);

// start another replica.
internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// index another doc and refresh - without this the new replica won't catch up.
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
assertSegmentStats(6);
}
}

/**
* Waits until the replica is caught up to the latest primary segments gen.
* @throws Exception if assertion fails
Expand All @@ -611,10 +653,12 @@ private void waitForReplicaUpdate() throws Exception {
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);
// if we don't have any segments yet, proceed.
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
logger.debug("Primary Segments: {}", primaryShardSegments.getSegments());
if (primaryShardSegments.getSegments().isEmpty() == false) {
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments());
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
Expand Down
133 changes: 68 additions & 65 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -122,6 +123,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata;

/**
* A Store provides plain access to files written by an opensearch index shard. Each shard
Expand Down Expand Up @@ -334,6 +336,51 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio
return new MetadataSnapshot(segmentInfos, directory, logger);
}

/**
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
* Segment Replication method - Fetch a map of StoreFileMetadata for segments, ignoring Segment_N files.
* @param segmentInfos {@link SegmentInfos} from which to compute metadata.
* @return {@link Map} map file name to {@link StoreFileMetadata}.
*/
public Map<String, StoreFileMetadata> getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException {
assert indexSettings.isSegRepEnabled();
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
}

/**
* Segment Replication method
* Returns a diff between the Maps of StoreFileMetadata that can be used for getting list of files to copy over to a replica for segment replication. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both maps and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both maps but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
*/
public static RecoveryDiff segmentReplicationDiff(Map<String, StoreFileMetadata> source, Map<String, StoreFileMetadata> target) {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
for (StoreFileMetadata value : source.values()) {
if (value.name().startsWith(IndexFileNames.SEGMENTS)) {
continue;
}
if (target.containsKey(value.name()) == false) {
missing.add(value);
} else {
final StoreFileMetadata fileMetadata = target.get(value.name());
if (fileMetadata.isSame(value)) {
identical.add(value);
} else {
different.add(value);
}
}
}
return new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing)
);
}

/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
Expand Down Expand Up @@ -709,31 +756,34 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* Segment Replication method -
* This method deletes every file in this store that is not referenced by the passed in SegmentInfos or
* part of the latest on-disk commit point.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException {
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
assert indexSettings.isSegRepEnabled();
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo()));
cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true));
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection<String> additionalFiles)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
|| (additionalFiles != null && additionalFiles.contains(existingFile))) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand Down Expand Up @@ -825,17 +875,9 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
latestSegmentInfos.setUserData(userData, true);
latestSegmentInfos.commit(directory());

// similar to TrimUnsafeCommits, create a commit with an appending IW, this will delete old commits and ensure all files
// associated with the SegmentInfos.commit are fsynced.
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(directory);
assert existingCommits.isEmpty() == false : "Expected at least one commit but none found";
final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1);
assert latestSegmentInfos.getSegmentsFileName().equals(lastIndexCommit.getSegmentsFileName());
try (IndexWriter writer = newAppendingIndexWriter(directory, lastIndexCommit)) {
writer.setLiveCommitData(lastIndexCommit.getUserData().entrySet());
writer.commit();
}
directory.sync(latestSegmentInfos.files(true));
directory.syncMetaData();
cleanupAndPreserveLatestCommitPoint("After commit", latestSegmentInfos);
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
} finally {
metadataLock.writeLock().unlock();
}
Expand Down Expand Up @@ -1033,6 +1075,11 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
return loadMetadata(segmentInfos, directory, logger, false);
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile)
throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
commitUserDataBuilder.putAll(segmentInfos.getUserData());
Expand Down Expand Up @@ -1067,8 +1114,10 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
if (ignoreSegmentsFile == false) {
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
}
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
}

Expand Down Expand Up @@ -1148,7 +1197,6 @@ public Map<String, StoreFileMetadata> asMap() {
* Helper method used to group store files according to segment and commit.
*
* @see MetadataSnapshot#recoveryDiff(MetadataSnapshot)
* @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot)
*/
private Iterable<List<StoreFileMetadata>> getGroupedFilesIterable() {
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
Expand Down Expand Up @@ -1241,51 +1289,6 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
return recoveryDiff;
}

/**
* Segment Replication method
* Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the
* target and this snapshot as the source. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
*/
public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
// Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates
// documents and generate new files specific to a segment
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
different.add(meta);
} else {
identicalFiles.add(meta);
}
}
if (consistent) {
identical.addAll(identicalFiles);
} else {
different.addAll(identicalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing)
);
return recoveryDiff;
}

/**
* Returns the number of files in this snapshot
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Set;
import java.util.Map;

/**
* Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos
Expand All @@ -28,52 +27,41 @@
public class CheckpointInfoResponse extends TransportResponse {

private final ReplicationCheckpoint checkpoint;
private final Store.MetadataSnapshot snapshot;
private final Map<String, StoreFileMetadata> metadataMap;
private final byte[] infosBytes;
// pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos
// but are still referenced by the latest commit point (Segments_N).
private final Set<StoreFileMetadata> pendingDeleteFiles;

public CheckpointInfoResponse(
final ReplicationCheckpoint checkpoint,
final Store.MetadataSnapshot snapshot,
final byte[] infosBytes,
final Set<StoreFileMetadata> additionalFiles
final Map<String, StoreFileMetadata> metadataMap,
final byte[] infosBytes
) {
this.checkpoint = checkpoint;
this.snapshot = snapshot;
this.metadataMap = metadataMap;
this.infosBytes = infosBytes;
this.pendingDeleteFiles = additionalFiles;
}

public CheckpointInfoResponse(StreamInput in) throws IOException {
this.checkpoint = new ReplicationCheckpoint(in);
this.snapshot = new Store.MetadataSnapshot(in);
this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new);
this.infosBytes = in.readByteArray();
this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
checkpoint.writeTo(out);
snapshot.writeTo(out);
out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeByteArray(infosBytes);
out.writeCollection(pendingDeleteFiles);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}

public Store.MetadataSnapshot getSnapshot() {
return snapshot;
public Map<String, StoreFileMetadata> getMetadataMap() {
return metadataMap;
}

public byte[] getInfosBytes() {
return infosBytes;
}

public Set<StoreFileMetadata> getPendingDeleteFiles() {
return pendingDeleteFiles;
}
}
Loading