Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dual Replication - Primary changes #152

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.blobstore.BlobStoreTestUtil;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;
Expand Down Expand Up @@ -714,7 +715,8 @@ public static final IndexShard newIndexShard(
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
nodeId,
null,
false
false,
BlobStoreTestUtil.mockClusterService()
);
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,26 @@
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.concurrent.ExecutionException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand All @@ -28,9 +40,16 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {

protected Path segmentRepoPath;
protected Path translogRepoPath;

boolean addRemote = false;

private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5)
);

protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
Expand Down Expand Up @@ -64,4 +83,111 @@ protected void setFailRate(String repoName, int value) throws ExecutionException
client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get()
);
}

public void initDocRepToRemoteMigration() {
assertTrue(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
)
.get()
.isAcknowledged()
);
}

public BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
final IndexRequest request = client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.request();
bulkRequest.add(request);
}
return client().bulk(bulkRequest).actionGet();
}

public class AsyncIndexingService {
private AtomicBoolean finished = new AtomicBoolean();
private AtomicInteger numAutoGenDocs = new AtomicInteger();
private Thread indexingThread;
private String indexName;

AsyncIndexingService(String indexName) {
this(indexName, Integer.MAX_VALUE);
}

AsyncIndexingService(String indexName, int maxDocs) {
indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < maxDocs) {
IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex(indexName).setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
logger.info("Indexed {} docs here", numAutoGenDocs.get());
}
});
}

public void stopIndexing() throws InterruptedException {
finished.set(true);
indexingThread.join();
}

public int totalIndexedDocs() {
return numAutoGenDocs.get();
}

public void startIndexing() {
indexingThread.start();
}

public Thread getIndexingThread() {
return indexingThread;
}
}

public class SyncIndexingService {
private int maxDocs;
private int currentIndexedDocs;
private boolean forceStop;
private String indexName;

SyncIndexingService(String indexName) {
this(indexName, Integer.MAX_VALUE);
}

SyncIndexingService(String indexName, int maxDocs) {
this.indexName = indexName;
this.maxDocs = maxDocs;
this.forceStop = false;
}

public void forceStopIndexing() throws InterruptedException {
this.forceStop = true;
}

public int getCurrentIndexedDocs() {
return currentIndexedDocs;
}

public void startIndexing() {
while (currentIndexedDocs < maxDocs && forceStop == false) {
IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete(indexName, "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex(indexName).setSource("auto", true).get();
currentIndexedDocs += 1;
logger.info("Indexed {} docs here", currentIndexedDocs);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ protected long primaryOperationSize(BulkShardRequest request) {

@Override
public ReplicationMode getReplicationMode(IndexShard indexShard) {
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() || indexShard.routingEntry().isAssignedToRemoteStoreNode()) {
return ReplicationMode.PRIMARY_TERM_VALIDATION;
}
return super.getReplicationMode(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,21 @@ protected void performOnReplicaProxy(

@Override
ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) {

// If the current routing is the primary, then it does not need to be replicated
if (shardRouting.isSameAllocation(primaryRouting)) {
return ReplicationMode.NO_REPLICATION;
}

// Perform full replication during primary relocation
if (primaryRouting.relocating() && shardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) {
return ReplicationMode.FULL_REPLICATION;
}

/*
Perform full replication if replica is hosted on a non-remote node.
Only applicable during remote migration
*/
if (shardRouting.isAssignedToRemoteStoreNode() == false) {
return ReplicationMode.FULL_REPLICATION;
}
return replicationModeOverride;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void performOn(
* @return the overridden replication mode.
*/
public ReplicationMode getReplicationMode(IndexShard indexShard) {
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() || indexShard.routingEntry().isAssignedToRemoteStoreNode()) {
return ReplicationMode.NO_REPLICATION;
}
return ReplicationMode.FULL_REPLICATION;
Expand Down Expand Up @@ -642,7 +642,7 @@ public void handleException(TransportException exp) {
primaryRequest.getPrimaryTerm(),
initialRetryBackoffBound,
retryTimeout,
indexShard.isRemoteTranslogEnabled()
indexShard.isRemoteTranslogEnabled() || indexShard.indexSettings().isRemoteNode()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets subsume the check of indexShard.indexSettings().isRemoteNode() inside the indexShard.isRemoteTranslogEnabled() check itself?

? new ReplicationModeAwareProxy<>(getReplicationMode(indexShard), replicasProxy, termValidationProxy)
: new FanoutReplicationProxy<>(replicasProxy)
).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ public class ShardRouting implements Writeable, ToXContentObject {
@Nullable
private final ShardRouting targetRelocatingShard;

/*
Local flag to denote whether the shard copy is assigned to a remote enabled node
Not serialized, meant to be accessed from the data nodes only.
Would always return `false` if accessed from the cluster manager nodes
Set on the `createShard` and `updateShard` flow from IndicesClusterStateService state applier
*/
private Boolean assignedToRemoteStoreNode = Boolean.FALSE;

/**
* A constructor to internally create shard routing instances, note, the internal flag should only be set to true
* by either this class or tests. Visible for testing.
Expand Down Expand Up @@ -878,4 +886,12 @@ public boolean unassignedReasonIndexCreated() {
}
return false;
}

public boolean isAssignedToRemoteStoreNode() {
return assignedToRemoteStoreNode;
}

public void setAssignedToRemoteStoreNode(boolean assignedToRemoteStoreNode) {
this.assignedToRemoteStoreNode = assignedToRemoteStoreNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ public synchronized IndexShard createShard(
clusterRemoteTranslogBufferIntervalSupplier,
nodeEnv.nodeId(),
recoverySettings,
seedRemote
seedRemote,
clusterService
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@

package org.opensearch.index.remote;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;

/**
* Utils for remote store
*
Expand Down Expand Up @@ -101,4 +106,18 @@ public static void verifyNoMultipleWriters(List<String> mdFiles, Function<String
});
}

/**
* Helper method to check the values for the following cluster settings:
* - `remote_store.compatibility_mode` (should be `mixed`)
* - `migration.direction` (should NOT be `none`)
* Used as a source of truth to confirm if a remote store migration is in progress
* @param clusterService Current clusterService ref to fetch cluster settings
*/
public static boolean isMigrationDirectionSet(ClusterService clusterService) {
RemoteStoreNodeService.Direction migrationDirection = clusterService.getClusterSettings().get(MIGRATION_DIRECTION_SETTING);
RemoteStoreNodeService.CompatibilityMode currentCompatiblityMode = clusterService.getClusterSettings()
.get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING);
return currentCompatiblityMode.equals(RemoteStoreNodeService.CompatibilityMode.MIXED) == true
&& migrationDirection.equals(RemoteStoreNodeService.Direction.NONE) == false;
}
}
Loading
Loading