diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 62e4b00620299..0c35f91121059 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -41,6 +41,7 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase { protected Path segmentRepoPath; protected Path translogRepoPath; boolean addRemote = false; + Settings extraSettings = Settings.EMPTY; private final List documentKeys = List.of( randomAlphaOfLength(5), @@ -59,6 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) { logger.info("Adding remote store node"); return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) + .put(extraSettings) .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) .build(); } else { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java index 87cce29dbb5c1..e2318a0803e14 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java @@ -19,17 +19,21 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexService; import org.opensearch.index.remote.RemoteSegmentStats; +import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.indices.IndexingMemoryController; import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; import java.util.Collection; -import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; -import static org.opensearch.indices.stats.IndexStatsIT.persistGlobalCheckpoint; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -40,7 +44,14 @@ public class RemoteDualMigrationIT extends MigrationBaseTestCase { @Override protected Collection> nodePlugins() { - return List.of(InternalSettingsPlugin.class); + /* Adding the following mock plugins: + - InternalSettingsPlugin : To override default intervals of retention lease and global ckp sync + - MockFsRepositoryPlugin and MockTransportService.TestPlugin: To ensure remote interactions are not no-op and retention leases are properly propagated + */ + return Stream.concat( + super.nodePlugins().stream(), + Stream.of(InternalSettingsPlugin.class, MockFsRepositoryPlugin.class, MockTransportService.TestPlugin.class) + ).collect(Collectors.toList()); } /* @@ -62,7 +73,11 @@ public void testRemotePrimaryDocRepReplica() throws Exception { assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); logger.info("---> Creating index with 1 replica"); - Settings oneReplica = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build(); + Settings oneReplica = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s") + .build(); createIndex(REMOTE_PRI_DOCREP_REP, oneReplica); ensureGreen(REMOTE_PRI_DOCREP_REP); @@ -137,7 +152,8 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { logger.info("---> Creating index with 0 replica"); Settings zeroReplicas = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s") .build(); createIndex(REMOTE_PRI_DOCREP_REMOTE_REP, zeroReplicas); ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP); @@ -145,6 +161,7 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { logger.info("---> Starting 1 remote enabled data node"); addRemote = true; + String remoteNodeName = internalCluster().startDataOnlyNode(); internalCluster().validateClusterFormed(); assertEquals( @@ -210,9 +227,10 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { Checks if retention leases are published on primary shard and it's docrep copies, but not on remote copies */ public void testRetentionLeasePresentOnDocrepReplicaButNotRemote() throws Exception { + // Reducing indices.memory.shard_inactive_time to force a flush and trigger translog sync, + // instead of relying on Global CKP Sync action which doesn't run on remote enabled copies + extraSettings = Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), "3s").build(); testRemotePrimaryDocRepAndRemoteReplica(); - // Ensuring global checkpoint is synced across all the shard copies - persistGlobalCheckpoint(REMOTE_PRI_DOCREP_REMOTE_REP); DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); assertBusy(() -> { for (ShardStats shardStats : internalCluster().client() @@ -227,13 +245,15 @@ public void testRetentionLeasePresentOnDocrepReplicaButNotRemote() throws Except if (shardRouting.primary()) { // Primary copy should be on remote node and should have retention leases assertTrue(discoveryNode.isRemoteStoreNode()); + assertCheckpointsConsistency(shardStats); assertRetentionLeaseConsistency(shardStats, retentionLeases); } else { + // Checkpoints and Retention Leases are not synced to remote replicas if (discoveryNode.isRemoteStoreNode()) { - // Replica copy on remote node should not have retention leases assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().isEmpty()); } else { // Replica copy on docrep node should have retention leases + assertCheckpointsConsistency(shardStats); assertRetentionLeaseConsistency(shardStats, retentionLeases); } } @@ -478,6 +498,23 @@ private void assertReplicaAndPrimaryConsistency(String indexName, int firstBatch */ private static void assertRetentionLeaseConsistency(ShardStats shardStats, RetentionLeases retentionLeases) { long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); - assertTrue(retentionLeases.leases().stream().allMatch(l -> l.retainingSequenceNumber() == maxSeqNo + 1)); + for (RetentionLease rl : retentionLeases.leases()) { + assertEquals(maxSeqNo + 1, rl.retainingSequenceNumber()); + } + } + + /** + * For a docrep enabled shard copy or a primary shard copy, + * asserts that local and global checkpoints are up-to-date with maxSeqNo of doc operations + * + * @param shardStats ShardStats object from NodesStats API + */ + private static void assertCheckpointsConsistency(ShardStats shardStats) { + long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); + long localCkp = shardStats.getSeqNoStats().getLocalCheckpoint(); + long globalCkp = shardStats.getSeqNoStats().getGlobalCheckpoint(); + + assertEquals(maxSeqNo, localCkp); + assertEquals(maxSeqNo, globalCkp); } } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 18cf4e33b8c0a..fc7738673852e 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -367,7 +367,7 @@ protected ReplicationOperation.Replicas primaryTermValidationR * Visible for tests * @opensearch.internal */ - public final class PrimaryTermValidationProxy extends WriteActionReplicasProxy { + private final class PrimaryTermValidationProxy extends WriteActionReplicasProxy { @Override public void performOn( diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java index 83003d0cd931d..b61fc6c868b75 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java @@ -9,6 +9,7 @@ package org.opensearch.action.support.replication; import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.core.action.ActionListener; @@ -34,16 +35,20 @@ public class ReplicationModeAwareProxy replicasProxy, - ReplicationOperation.Replicas primaryTermValidationProxy + ReplicationOperation.Replicas primaryTermValidationProxy, + boolean isRemoteStoreIndexSettingEnabled ) { super(replicasProxy); this.replicationModeOverride = Objects.requireNonNull(replicationModeOverride); this.primaryTermValidationProxy = Objects.requireNonNull(primaryTermValidationProxy); this.discoveryNodes = discoveryNodes; + this.isRemoteStoreIndexSettingEnabled = isRemoteStoreIndexSettingEnabled; } @Override @@ -74,11 +79,16 @@ ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting return ReplicationMode.FULL_REPLICATION; } /* - Perform full replication if replica is hosted on a non-remote node. - Only applicable during remote migration + Only applicable during remote store migration. + During the migration process, remote based index settings will not be enabled, + thus we will rely on node attributes to figure out the replication mode */ - if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() == false) { - return ReplicationMode.FULL_REPLICATION; + if (isRemoteStoreIndexSettingEnabled == false) { + DiscoveryNode targetNode = discoveryNodes.get(shardRouting.currentNodeId()); + if (targetNode != null && targetNode.isRemoteStoreNode() == false) { + // Perform full replication if replica is hosted on a non-remote node. + return ReplicationMode.FULL_REPLICATION; + } } return replicationModeOverride; } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 6451dae25b6d2..8d86128e36441 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -647,7 +647,8 @@ public void handleException(TransportException exp) { getReplicationMode(indexShard), clusterState.getNodes(), replicasProxy, - termValidationProxy + termValidationProxy, + indexShard.isRemoteTranslogEnabled() ) : new FanoutReplicationProxy<>(replicasProxy) ).execute(); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 646de7457dbfa..b679f5059411a 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1233,11 +1233,7 @@ public boolean isSegRepEnabledOrRemoteNode() { } public boolean isSegRepLocalEnabled() { - return isSegRepEnabledOrRemoteNode() && !isRemoteStoreEnabled(); - } - - public boolean isSegRepWithRemoteEnabled() { - return isSegRepEnabledOrRemoteNode() && isRemoteNode(); + return ReplicationType.SEGMENT.equals(replicationType) && !isRemoteStoreEnabled(); } /** diff --git a/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java index ca1dfe2d5ad01..0c167d6d80b5c 100644 --- a/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java @@ -135,7 +135,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST && indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint() - && indexShard.isRemoteTranslogEnabled() == false) { + && indexShard.indexSettings().isRemoteNode() == false) { indexShard.sync(); } } diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 3bbf37917cf7a..1c01e1bf659e0 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1094,7 +1094,7 @@ private ReplicationGroup calculateReplicationGroup() { } else { newVersion = replicationGroup.getVersion() + 1; } - assert indexSettings.isRemoteNode() + assert indexSettings.isRemoteTranslogStoreEnabled() // Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node || (replicationGroup != null && replicationGroup.getReplicationTargets() @@ -1457,9 +1457,8 @@ public synchronized void updateFromClusterManager( + " as in-sync but it does not exist locally"; final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - final boolean assignedToRemoteStoreNode = indexSettings.isRemoteNode() - || (routingTable.getByAllocationId(initializingId) != null - && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(initializingId).currentNodeId())); + final boolean assignedToRemoteStoreNode = indexSettings.isRemoteStoreEnabled() + || onRemoteEnabledNode(routingTable, initializingId); checkpoints.put( initializingId, new CheckpointState( @@ -1479,9 +1478,8 @@ public synchronized void updateFromClusterManager( for (String initializingId : initializingAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - final boolean assignedToRemoteStoreNode = indexSettings.isRemoteNode() - || (routingTable.getByAllocationId(initializingId) != null - && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(initializingId).currentNodeId())); + final boolean assignedToRemoteStoreNode = indexSettings().isRemoteStoreEnabled() + || onRemoteEnabledNode(routingTable, initializingId); checkpoints.put( initializingId, new CheckpointState( @@ -1496,10 +1494,8 @@ public synchronized void updateFromClusterManager( for (String inSyncId : inSyncAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - // Handling cases where node leaves the cluster but the insyncAids are not yet updated - final boolean assignedToRemoteStoreNode = indexSettings.isRemoteNode() - || (routingTable.getByAllocationId(inSyncId) != null - && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(inSyncId).currentNodeId())); + final boolean assignedToRemoteStoreNode = indexSettings().isRemoteStoreEnabled() + || onRemoteEnabledNode(routingTable, inSyncId); checkpoints.put( inSyncId, new CheckpointState( @@ -1524,6 +1520,11 @@ public synchronized void updateFromClusterManager( assert invariant(); } + private boolean onRemoteEnabledNode(IndexShardRoutingTable routingTable, String initializingId) { + return routingTable.getByAllocationId(initializingId) != null + && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(initializingId).currentNodeId()); + } + /** * Returns whether the requests are replicated considering the remote translog existence, current/primary/primary target allocation ids. * @@ -1541,8 +1542,7 @@ private boolean isReplicated( // If assigned to a remote node, returns true if given allocation id matches the primary or it's relocation target allocation // primary and primary target allocation id. if (assignedToRemoteStoreNode == true) { - boolean toReturn = allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId); - return toReturn; + return allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId); } // For other case which is local translog, return true as the requests are replicated to all shards in the replication group. return true; diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 675d60ec2b63d..b47025d75282c 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -43,7 +43,7 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode() - && !shard.indexSettings.isSegRepWithRemoteEnabled()) { + && shard.indexSettings.isRemoteNode() == false) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } return true; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e31690410b8cf..ec979e2ca6121 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -520,25 +520,15 @@ public boolean shouldSeedRemoteStore() { * checks does not fail during a cluster manager state update when the latest replication group * calculation is not yet done and the cached replication group details are available */ - public Function isShardOnRemoteEnabledNode = (shardId) -> { - DiscoveryNode discoveryNode = this.discoveryNodes.get(shardId); + public Function isShardOnRemoteEnabledNode = (nodeId) -> { + DiscoveryNode discoveryNode = this.discoveryNodes.get(nodeId); if (discoveryNode != null) { - logger.trace( - "ShardID {} is assigned to Node {} which has remote_enabled as {}", - shardId, - discoveryNode, - discoveryNode.isRemoteStoreNode() - ); + logger.trace("Node {} has remote_enabled as {}", nodeId, discoveryNode.isRemoteStoreNode()); return discoveryNode.isRemoteStoreNode(); } return false; }; - // Only to be used for Unit Tests - public void setDiscoveryNodes(DiscoveryNodes discoveryNodes) { - this.discoveryNodes = discoveryNodes; - } - public boolean isRemoteSeeded() { return shardMigrationState == REMOTE_MIGRATING_SEEDED; } @@ -4028,7 +4018,7 @@ private boolean isRemoteStoreEnabled() { } public boolean isRemoteTranslogEnabled() { - return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); + return indexSettings() != null && (indexSettings().isRemoteTranslogStoreEnabled()); } /** diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 9779a2320d79f..cba1403ed03fc 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -192,7 +192,7 @@ void recoverFromLocalShards( // just trigger a merge to do housekeeping on the // copied segments - we will also see them in stats etc. indexShard.getEngine().forceMerge(false, -1, false, false, false, UUIDs.randomBase64UUID()); - if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) { + if ((indexShard.isRemoteTranslogEnabled() || indexShard.isMigratingToRemote()) && indexShard.shardRouting.primary()) { indexShard.waitForRemoteStoreSync(); if (indexShard.isRemoteSegmentStoreInSync() == false) { throw new IndexShardRecoveryException( @@ -435,7 +435,7 @@ void recoverFromSnapshotAndRemoteStore( } indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); - if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) { + if ((indexShard.isRemoteTranslogEnabled() || indexShard.isMigratingToRemote()) && indexShard.shardRouting.primary()) { indexShard.waitForRemoteStoreSync(); if (indexShard.isRemoteSegmentStoreInSync() == false) { listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store")); @@ -721,7 +721,7 @@ private void restore( } indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); - if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) { + if ((indexShard.isRemoteTranslogEnabled() || indexShard.isMigratingToRemote()) && indexShard.shardRouting.primary()) { indexShard.waitForRemoteStoreSync(); if (indexShard.isRemoteSegmentStoreInSync() == false) { listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store")); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index e26aed52d563b..2c3ffcdd9e0ba 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -669,7 +669,6 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR try { final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id()); logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); - DiscoveryNode localNode = nodes.getLocalNode(); indicesService.createShard( shardRouting, checkpointPublisher, @@ -679,7 +678,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR failedShardHandler, globalCheckpointSyncer, retentionLeaseSyncer, - localNode, + nodes.getLocalNode(), sourceNode, remoteStoreStatsTrackerFactory, nodes diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java index fbaa7d94c3129..96e85154e6248 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java @@ -23,9 +23,7 @@ public static RecoverySourceHandler create( StartRecoveryRequest request, RecoverySettings recoverySettings ) { - boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false - && (shard.isRemoteTranslogEnabled() || shard.isMigratingToRemote()) - && request.targetNode().isRemoteStoreNode(); + boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false && request.targetNode().isRemoteStoreNode(); if (isReplicaRecoveryWithRemoteTranslog) { return new RemoteStorePeerRecoverySourceHandler( shard, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index bfcfc6e9b1894..f47b082de3856 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -213,13 +213,17 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener { state().getIndex().setFileDetailsComplete(); // ops-based recoveries don't send the file details state().getTranslog().totalOperations(totalTranslogOps); - boolean shouldPerformMigrationBasedTasks = indexShard.shouldSeedRemoteStore() && indexShard.routingEntry().primary(); - if (shouldPerformMigrationBasedTasks) { + // Cleanup remote contents before opening new translog. + // This prevents reading from any old Translog UUIDs during re-seeding + // (situation in which primary fails over to docrep replica and is re-seeded to remote again) + // which might end up causing a TranslogCorruptedException + if (indexShard.shouldSeedRemoteStore()) { + assert indexShard.routingEntry().primary() : "Remote seeding should only true be for primary shard copy"; indexShard.deleteRemoteStoreContents(); } indexShard().openEngineAndSkipTranslogRecovery(); // upload to remote store in migration for primary shard - if (shouldPerformMigrationBasedTasks) { + if (indexShard.shouldSeedRemoteStore()) { // This cleans up remote translog's 0 generation, as we don't want to get that uploaded indexShard.sync(); threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { indexShard.refresh("remote store migration"); }); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 852003c9f3e4d..657705a8cd725 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -38,7 +38,7 @@ public SegmentReplicationSourceFactory( } public SegmentReplicationSource get(IndexShard shard) { - if (shard.indexSettings().isSegRepWithRemoteEnabled()) { + if (shard.indexSettings().isRemoteNode()) { return new RemoteStoreReplicationSource(shard); } else { return new PrimaryShardReplicationSource( diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 2d5893d144e6a..4d7d0a8633b5c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -199,8 +199,11 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh Objects.requireNonNull(replica); ActionListener.completeWith(listener, () -> { logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId())); - if (replica.indexSettings().isRemoteNode() == false) { + // Condition for ensuring that we ignore Segrep checkpoints received on Docrep shard copies. + // This case will hit iff the replica hosting node is not remote enabled and replication type != SEGMENT + if (replica.indexSettings().isRemoteNode() == false && replica.indexSettings().isSegRepLocalEnabled() == false) { logger.trace("Received segrep checkpoint on a docrep shard copy during an ongoing remote migration. NoOp."); + return new ReplicaResult(); } if (request.getCheckpoint().getShardId().equals(replica.shardId())) { replicationService.onNewCheckpoint(request.getCheckpoint(), replica); diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java index b977cc741b731..626c2f74f09c4 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java @@ -18,9 +18,17 @@ import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.test.OpenSearchTestCase; +import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; public class ReplicationModeAwareProxyTests extends OpenSearchTestCase { + + /* + Replication action running on the same primary copy from which it originates. + Action should not run and proxy should return ReplicationMode.NO_REPLICATION + */ public void testDetermineReplicationModeTargetRoutingCurrentPrimary() { ShardRouting targetRouting = TestShardRouting.newShardRouting( new ShardId(new Index("test_index", "_na_"), 0), @@ -42,11 +50,16 @@ public void testDetermineReplicationModeTargetRoutingCurrentPrimary() { ReplicationMode.NO_REPLICATION, DiscoveryNodes.builder().add(IndexShardTestUtils.getFakeRemoteEnabledNode("dummy-node")).build(), mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class) + mock(TransportReplicationAction.ReplicasProxy.class), + randomBoolean() ); assertEquals(ReplicationMode.NO_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); } + /* + Replication action originating from failing primary to replica being promoted to primary + Action should run and proxy should return ReplicationMode.FULL_REPLICATION + */ public void testDetermineReplicationModeTargetRoutingRelocatingPrimary() { AllocationId primaryId = AllocationId.newRelocation(AllocationId.newInitializing()); AllocationId relocationTargetId = AllocationId.newTargetRelocation(primaryId); @@ -73,22 +86,27 @@ public void testDetermineReplicationModeTargetRoutingRelocatingPrimary() { .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) .build(), mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class) + mock(TransportReplicationAction.ReplicasProxy.class), + randomBoolean() ); assertEquals(ReplicationMode.FULL_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); } + /* + Replication action originating from remote enabled primary to docrep replica during remote store migration + Action should run and proxy should return ReplicationMode.FULL_REPLICATION + */ public void testDetermineReplicationModeTargetRoutingDocrepShard() { ShardRouting primaryRouting = TestShardRouting.newShardRouting( new ShardId(new Index("test_index", "_na_"), 0), "dummy-node", - false, + true, ShardRoutingState.STARTED ); ShardRouting targetRouting = TestShardRouting.newShardRouting( new ShardId(new Index("test_index", "_na_"), 0), "dummy-node-2", - true, + false, ShardRoutingState.STARTED ); final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( @@ -98,11 +116,16 @@ public void testDetermineReplicationModeTargetRoutingDocrepShard() { .add(IndexShardTestUtils.getFakeDiscoNode(targetRouting.currentNodeId())) .build(), mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class) + mock(TransportReplicationAction.ReplicasProxy.class), + false ); assertEquals(ReplicationMode.FULL_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); } + /* + Replication action originating from remote enabled primary to remote replica during remote store migration + Action should not run and proxy should return ReplicationMode.NO_REPLICATION + */ public void testDetermineReplicationModeTargetRoutingRemoteShard() { ShardRouting primaryRouting = TestShardRouting.newShardRouting( new ShardId(new Index("test_index", "_na_"), 0), @@ -123,8 +146,71 @@ public void testDetermineReplicationModeTargetRoutingRemoteShard() { .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) .build(), mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class) + mock(TransportReplicationAction.ReplicasProxy.class), + false ); assertEquals(ReplicationMode.NO_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); } + + /* + Replication action originating from remote enabled primary to remote enabled replica during remote store migration + with an explicit replication mode specified + Action should run and proxy should return the overridden Replication Mode + */ + public void testDetermineReplicationWithExplicitOverrideTargetRoutingRemoteShard() { + ReplicationMode replicationModeOverride = ReplicationMode.PRIMARY_TERM_VALIDATION; + ShardRouting primaryRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node", + false, + ShardRoutingState.STARTED + ); + ShardRouting targetRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node-2", + true, + ShardRoutingState.STARTED + ); + final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( + replicationModeOverride, + DiscoveryNodes.builder() + .add(IndexShardTestUtils.getFakeRemoteEnabledNode(targetRouting.currentNodeId())) + .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) + .build(), + mock(TransportReplicationAction.ReplicasProxy.class), + mock(TransportReplicationAction.ReplicasProxy.class), + false + ); + assertEquals(replicationModeOverride, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); + } + + /* + Replication action originating from remote enabled primary with remote enabled index settings enabled + Action should not query the DiscoveryNodes object + */ + public void testDetermineReplicationWithRemoteIndexSettingsEnabled() { + DiscoveryNodes mockDiscoveryNodes = mock(DiscoveryNodes.class); + ShardRouting primaryRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node", + false, + ShardRoutingState.STARTED + ); + ShardRouting targetRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node-2", + true, + ShardRoutingState.STARTED + ); + final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( + ReplicationMode.NO_REPLICATION, + mockDiscoveryNodes, + mock(TransportReplicationAction.ReplicasProxy.class), + mock(TransportReplicationAction.ReplicasProxy.class), + true + ); + replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting); + // Verify no interactions with the DiscoveryNodes object + verify(mockDiscoveryNodes, never()).get(anyString()); + } } diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java index 4b54e05c5c859..d55766c75f862 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java @@ -244,7 +244,8 @@ public void testReplicationWithRemoteTranslogEnabled() throws Exception { ReplicationMode.NO_REPLICATION, buildRemoteStoreEnabledDiscoveryNodes(routingTable), replicasProxy, - replicasProxy + replicasProxy, + true ) ); op.execute(); @@ -314,7 +315,8 @@ public void testPrimaryToPrimaryReplicationWithRemoteTranslogEnabled() throws Ex ReplicationMode.NO_REPLICATION, buildRemoteStoreEnabledDiscoveryNodes(routingTable), replicasProxy, - replicasProxy + replicasProxy, + true ) ); op.execute(); @@ -443,7 +445,8 @@ public void testReplicationInDualModeWithDocrepReplica() throws Exception { ReplicationMode.NO_REPLICATION, buildMixedModeEnabledDiscoveryNodes(routingTable), replicasProxy, - replicasProxy + replicasProxy, + false ) ); op.execute(); diff --git a/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java index 47cf025d3ec68..a27f3476888eb 100644 --- a/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -111,6 +111,7 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); + when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); final Translog.Durability durability = randomFrom(Translog.Durability.ASYNC, Translog.Durability.REQUEST); when(indexShard.getTranslogDurability()).thenReturn(durability); @@ -192,6 +193,7 @@ public void testMayBeSyncTranslogWithRemoteTranslog() throws Exception { when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint); when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(globalCheckpoint - 1); when(indexShard.getTranslogDurability()).thenReturn(Translog.Durability.REQUEST); + when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {})); verify(indexShard, never()).sync(); @@ -206,6 +208,7 @@ public void testMayBeSyncTranslogWithLocalTranslog() throws Exception { when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint); when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(globalCheckpoint - 1); when(indexShard.getTranslogDurability()).thenReturn(Translog.Durability.REQUEST); + when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {})); verify(indexShard).sync(); diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 20a82218e70f4..85864eebd6d0d 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -45,7 +45,7 @@ public void testStartSequenceForReplicaRecovery() throws Exception { final Path remoteDir = createTempDir(); final String indexMapping = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": {} }"; try (ReplicationGroup shards = createGroup(0, settings, indexMapping, new NRTReplicationEngineFactory(), remoteDir)) { - shards.startPrimary(true); + shards.startPrimary(); final IndexShard primary = shards.getPrimary(); int numDocs = shards.indexDocs(randomIntBetween(10, 100)); shards.flush(); @@ -117,7 +117,7 @@ public void testNoTranslogHistoryTransferred() throws Exception { try (ReplicationGroup shards = createGroup(0, settings, indexMapping, new NRTReplicationEngineFactory(), remoteDir)) { // Step1 - Start primary, index docs, flush, index more docs, check translog in primary as expected - shards.startPrimary(true); + shards.startPrimary(); final IndexShard primary = shards.getPrimary(); int numDocs = shards.indexDocs(randomIntBetween(10, 100)); shards.flush(); diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 7a67d7ffd1f5d..352f827c74cb2 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -15,6 +15,7 @@ import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; @@ -38,7 +39,9 @@ import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -129,7 +132,9 @@ public void testPublishCheckpointActionOnReplica() { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + when(indexShard.indexSettings()).thenReturn( + createIndexSettings(false, Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), "SEGMENT").build()) + ); final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( @@ -161,6 +166,35 @@ public void testPublishCheckpointActionOnReplica() { } + public void testPublishCheckpointActionOnDocrepReplicaDuringMigration() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); + + final PublishCheckpointAction action = new PublishCheckpointAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + mockTargetService + ); + // no interaction with SegmentReplicationTargetService object + verify(mockTargetService, never()).onNewCheckpoint(any(), any()); + } + public void testGetReplicationModeWithRemoteTranslog() { final PublishCheckpointAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 502ac9ccb4671..3226035bba97b 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -359,13 +359,6 @@ public synchronized DiscoveryNodes generateFakeDiscoveryNodes() { return builder.build(); } - public synchronized void updateDiscoveryNodesOnShards(DiscoveryNodes discoveryNodes) { - primary.setDiscoveryNodes(discoveryNodes); - for (IndexShard replica : replicas) { - replica.setDiscoveryNodes(discoveryNodes); - } - } - public synchronized int startReplicas(int numOfReplicasToStart) throws IOException { if (primary.routingEntry().initializing()) { startPrimary(); @@ -384,10 +377,6 @@ public synchronized int startReplicas(int numOfReplicasToStart) throws IOExcepti } public void startPrimary() throws IOException { - startPrimary(false); - } - - public void startPrimary(boolean remote) throws IOException { recoverPrimary(primary); computeReplicationTargets(); HashSet activeIds = new HashSet<>(); @@ -436,7 +425,6 @@ assert shardRoutings().stream().anyMatch(shardRouting -> shardRouting.isSameAllo if (replicationTargets != null) { replicationTargets.addReplica(replica); } - updateDiscoveryNodesOnShards(generateFakeDiscoveryNodes()); updateAllocationIDsOnPrimary(); }