Skip to content

Commit

Permalink
Running Spotless checks
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 committed Mar 15, 2024
1 parent 791e2da commit f32ee23
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 127 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,29 @@

package org.opensearch.remotemigration;

<<<<<<< HEAD
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
=======
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.UUIDs;
>>>>>>> Primary changes to support dual mode replication
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
<<<<<<< HEAD
import java.util.concurrent.ExecutionException;
=======
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
>>>>>>> Primary changes to support dual mode replication

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand Down Expand Up @@ -93,11 +86,19 @@ protected void setFailRate(String repoName, int value) throws ExecutionException
}

public void initDocRepToRemoteMigration() {
assertTrue(internalCluster().client().admin().cluster().prepareUpdateSettings().setPersistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
).get().isAcknowledged());
assertTrue(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
)
.get()
.isAcknowledged()
);
}

public BulkResponse indexBulk(String indexName, int numDocs) {
Expand Down Expand Up @@ -152,9 +153,9 @@ public void startIndexing() {
public Thread getIndexingThread() {
return indexingThread;
}
}
}

public class SyncIndexingService {
public class SyncIndexingService {
private int maxDocs;
private int currentIndexedDocs;
private boolean forceStop;
Expand All @@ -170,24 +171,24 @@ public class SyncIndexingService {
this.forceStop = false;
}

public void forceStopIndexing() throws InterruptedException {
this.forceStop = true;
}

public int getCurrentIndexedDocs() {
return currentIndexedDocs;
}

public void startIndexing() {
while (currentIndexedDocs < maxDocs && forceStop == false) {
IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete(indexName, "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex(indexName).setSource("auto", true).get();
currentIndexedDocs += 1;
logger.info("Indexed {} docs here", currentIndexedDocs);
}
}
}
public void forceStopIndexing() throws InterruptedException {
this.forceStop = true;
}

public int getCurrentIndexedDocs() {
return currentIndexedDocs;
}

public void startIndexing() {
while (currentIndexedDocs < maxDocs && forceStop == false) {
IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete(indexName, "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex(indexName).setSource("auto", true).get();
currentIndexedDocs += 1;
logger.info("Indexed {} docs here", currentIndexedDocs);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ public void testMixedModeRelocation() throws Exception {
assertEquals(0, clusterHealthResponse.getRelocatingShards());
logger.info("--> relocation from remote to remote complete");
refresh("test");
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), asyncIndexingService.totalIndexedDocs());
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test").setTrackTotalHits(true).get(),
asyncIndexingService.totalIndexedDocs()
);
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test")
.setTrackTotalHits(true)// extra paranoia ;)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.action.support.replication;

import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.action.ActionListener;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

package org.opensearch.index.remote;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.util.Arrays;
Expand Down Expand Up @@ -108,7 +106,6 @@ public static void verifyNoMultipleWriters(List<String> mdFiles, Function<String
});
}


/**
* Helper method to check the values for the following cluster settings:
* - `remote_store.compatibility_mode` (should be `mixed`)
Expand All @@ -118,7 +115,9 @@ public static void verifyNoMultipleWriters(List<String> mdFiles, Function<String
*/
public static boolean isMigrationDirectionSet(ClusterService clusterService) {
RemoteStoreNodeService.Direction migrationDirection = clusterService.getClusterSettings().get(MIGRATION_DIRECTION_SETTING);
RemoteStoreNodeService.CompatibilityMode currentCompatiblityMode = clusterService.getClusterSettings().get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING);
return currentCompatiblityMode.equals(RemoteStoreNodeService.CompatibilityMode.MIXED) == true && migrationDirection.equals(RemoteStoreNodeService.Direction.NONE) == false;
RemoteStoreNodeService.CompatibilityMode currentCompatiblityMode = clusterService.getClusterSettings()
.get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING);
return currentCompatiblityMode.equals(RemoteStoreNodeService.CompatibilityMode.MIXED) == true
&& migrationDirection.equals(RemoteStoreNodeService.Direction.NONE) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,8 @@ && isPrimaryRelocation(allocationId) == false
- Segrep enabled without remote store
- Destination replica shard is hosted on a remote store enabled node (Remote store enabled nodes have segrep enabled implicitly)
*/
&& (indexSettings.isSegRepLocalEnabled() == true || routingTable.getByAllocationId(allocationId).isAssignedToRemoteStoreNode() == true)) {
&& (indexSettings.isSegRepLocalEnabled() == true
|| routingTable.getByAllocationId(allocationId).isAssignedToRemoteStoreNode() == true)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer());
logger.trace(
() -> new ParameterizedMessage(
Expand Down Expand Up @@ -1448,7 +1449,8 @@ public synchronized void updateFromClusterManager(
+ " as in-sync but it does not exist locally";
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
final boolean assignedToRemoteStoreNode = routingTable.getByAllocationId(initializingId).isAssignedToRemoteStoreNode();
final boolean assignedToRemoteStoreNode = routingTable.getByAllocationId(initializingId)
.isAssignedToRemoteStoreNode();
checkpoints.put(
initializingId,
new CheckpointState(
Expand Down Expand Up @@ -1518,7 +1520,12 @@ public synchronized void updateFromClusterManager(
* @param primaryTargetAllocationId primary target allocation id
* @return the replication mode.
*/
private boolean isReplicated(String allocationId, String primaryAllocationId, String primaryTargetAllocationId, boolean assignedToRemoteStoreNode) {
private boolean isReplicated(
String allocationId,
String primaryAllocationId,
String primaryTargetAllocationId,
boolean assignedToRemoteStoreNode
) {
/*
- If remote translog is enabled, then returns replication mode checking current allocation id against the primary and primary target allocation id.
- If remote translog is enabled, then returns true if given allocation id matches the primary or it's relocation target allocation primary and primary target allocation id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.replication.*;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationRequest;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.ReplicationTask;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand Down Expand Up @@ -87,6 +91,7 @@ protected Logger getLogger() {
}

private final ClusterService clusterService;

@Inject
public RetentionLeaseBackgroundSyncAction(
final Settings settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.WriteResponse;
import org.opensearch.action.support.replication.*;
import org.opensearch.action.support.replication.ReplicatedWriteRequest;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.ReplicationTask;
import org.opensearch.action.support.replication.TransportWriteAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.service.ClusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3494,7 +3494,8 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S
* update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint.
*/
assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED)
|| indexSettings.isRemoteTranslogStoreEnabled() || indexSettings.isRemoteNode() : "supposedly in-sync shard copy received a global checkpoint ["
|| indexSettings.isRemoteTranslogStoreEnabled()
|| indexSettings.isRemoteNode() : "supposedly in-sync shard copy received a global checkpoint ["
+ globalCheckpoint
+ "] "
+ "that is higher than its local checkpoint ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.seqno.GlobalCheckpointSyncAction;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public static RecoverySourceHandler create(
RecoverySettings recoverySettings
) {
boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false
&& (shard.isRemoteTranslogEnabled() || shard.isMigratingToRemote()) && request.targetNode().isRemoteStoreNode();
&& (shard.isRemoteTranslogEnabled() || shard.isMigratingToRemote())
&& request.targetNode().isRemoteStoreNode();
if (isReplicaRecoveryWithRemoteTranslog) {
return new RemoteStorePeerRecoverySourceHandler(
shard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh
Objects.requireNonNull(replica);
ActionListener.completeWith(listener, () -> {
logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId()));
// Ignore replica operation if there is an ongoing remote store migration and the replica copy is assigned to a docrep enabled node
if (RemoteStoreUtils.isMigrationDirectionSet(clusterService) == true && replica.routingEntry().isAssignedToRemoteStoreNode() == false) {
// Ignore replica operation if there is an ongoing remote store migration and the replica copy is assigned to a docrep enabled
// node
if (RemoteStoreUtils.isMigrationDirectionSet(clusterService) == true
&& replica.routingEntry().isAssignedToRemoteStoreNode() == false) {
logger.trace("Received segrep checkpoint on a docrep shard copy during an ongoing remote migration. NoOp.");
return new ReplicaResult();
}
Expand Down

0 comments on commit f32ee23

Please sign in to comment.