Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Segment Replication] Backport all PR's containing remaining segment replication changes #4243

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.script.MockScriptPlugin;
Expand Down Expand Up @@ -74,7 +77,11 @@ public class WaitUntilRefreshIT extends OpenSearchIntegTestCase {
@Override
public Settings indexSettings() {
// Use a shorter refresh interval to speed up the tests. We'll be waiting on this interval several times.
return Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms").build();
final Settings.Builder builder = Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms");
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
}
return builder.build();
}

@Before
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager = readerManager;
this.readerManager.addListener(completionStatsCache);
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) {
this.readerManager.addListener(listener);
}
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.readerManager.addListener(listener);
}
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManagerRef = new WriteOnlyTranslogManager(
Expand Down Expand Up @@ -199,6 +205,18 @@ protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(Search
return readerManager;
}

/**
* Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external
* refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher.
* With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by
* comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true.
* Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos.
*/
@Override
public boolean refreshNeeded() {
return false;
}

@Override
public boolean isTranslogSyncNeeded() {
return translogManager.getTranslog().syncNeeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
}
}
Expand Down
27 changes: 17 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1382,9 +1382,16 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Returns the lastest Replication Checkpoint that shard received
* Returns the latest ReplicationCheckpoint that shard received.
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
if (indexSettings.isSegRepEnabled() == false) {
return null;
}
if (getEngineOrNull() == null) {
return ReplicationCheckpoint.empty(shardId);
}
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return Optional.ofNullable(snapshot.get())
.map(
Expand All @@ -1396,15 +1403,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
segmentInfos.getVersion()
)
)
.orElse(
new ReplicationCheckpoint(
shardId,
getOperationPrimaryTerm(),
SequenceNumbers.NO_OPS_PERFORMED,
getProcessedLocalCheckpoint(),
SequenceNumbers.NO_OPS_PERFORMED
)
);
.orElse(ReplicationCheckpoint.empty(shardId));
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex);
}
Expand All @@ -1421,6 +1420,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Expand Down Expand Up @@ -3774,6 +3777,10 @@ public boolean scheduledRefresh() {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& isSearchIdle()
&& indexSettings.isExplicitRefresh() == false
&& indexSettings.isSegRepEnabled() == false
// Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only
// after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
// a new set of segments.
&& active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
Expand Down
95 changes: 77 additions & 18 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -1003,7 +1003,12 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
// With segment replication enabled, we compute metadata snapshots from the latest in memory infos.
// In this case we will have SegmentInfos objects fetched from the primary's reader
// where the minSegmentLuceneVersion can be null even though there are segments.
// This is because the SegmentInfos object is not read from a commit/IndexInput, which sets
// minSegmentLuceneVersion.
if (maxVersion == null || version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
Expand Down Expand Up @@ -1097,6 +1102,30 @@ public Map<String, StoreFileMetadata> asMap() {
private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file
private static final String SEGMENT_INFO_EXTENSION = "si";

/**
* Helper method used to group store files according to segment and commit.
*
* @see MetadataSnapshot#recoveryDiff(MetadataSnapshot)
* @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot)
*/
private Iterable<List<StoreFileMetadata>> getGroupedFilesIterable() {
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();
for (StoreFileMetadata meta : this) {
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId)
|| DEL_FILE_EXTENSION.equals(extension)
|| LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
}
}
return Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles));
}

/**
* Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the
* recovery target and this snapshot as the source. The returned diff will hold a list of files that are:
Expand Down Expand Up @@ -1134,23 +1163,8 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();

for (StoreFileMetadata meta : this) {
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId)
|| DEL_FILE_EXTENSION.equals(extension)
|| LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
}
}
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
Expand Down Expand Up @@ -1185,6 +1199,51 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
return recoveryDiff;
}

/**
* Segment Replication method
* Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the
* target and this snapshot as the source. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
*/
public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
// Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates
// documents and generate new files specific to a segment
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
different.add(meta);
} else {
identicalFiles.add(meta);
}
}
if (consistent) {
identical.addAll(identicalFiles);
} else {
different.addAll(identicalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing)
);
return recoveryDiff;
}

/**
* Returns the number of files in this snapshot
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.env.ShardLockObtainFailedException;
Expand All @@ -80,6 +81,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -90,6 +92,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -134,7 +137,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final FailedShardHandler failedShardHandler = new FailedShardHandler();

private final boolean sendRefreshMapping;
private final List<IndexEventListener> buildInIndexListener;
private final List<IndexEventListener> builtInIndexListener;
private final PrimaryReplicaSyncer primaryReplicaSyncer;
private final Consumer<ShardId> globalCheckpointSyncer;
private final RetentionLeaseSyncer retentionLeaseSyncer;
Expand All @@ -148,6 +151,7 @@ public IndicesClusterStateService(
final ClusterService clusterService,
final ThreadPool threadPool,
final PeerRecoveryTargetService recoveryTargetService,
final SegmentReplicationTargetService segmentReplicationTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
Expand All @@ -165,6 +169,7 @@ public IndicesClusterStateService(
clusterService,
threadPool,
checkpointPublisher,
segmentReplicationTargetService,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
Expand All @@ -185,6 +190,7 @@ public IndicesClusterStateService(
final ClusterService clusterService,
final ThreadPool threadPool,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final SegmentReplicationTargetService segmentReplicationTargetService,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
Expand All @@ -198,7 +204,15 @@ public IndicesClusterStateService(
) {
this.settings = settings;
this.checkpointPublisher = checkpointPublisher;
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService);

final List<IndexEventListener> indexEventListeners = new ArrayList<>(
Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService)
);
// if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener.
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
indexEventListeners.add(segmentReplicationTargetService);
}
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down Expand Up @@ -514,7 +528,7 @@ private void createIndices(final ClusterState state) {

AllocatedIndex<? extends Shard> indexService = null;
try {
indexService = indicesService.createIndex(indexMetadata, buildInIndexListener, true);
indexService = indicesService.createIndex(indexMetadata, builtInIndexListener, true);
if (indexService.updateMapping(null, indexMetadata) && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(
state.nodes().getClusterManagerNode(),
Expand Down
Loading