Skip to content

Commit

Permalink
Handling translog metadata update during remote store migration
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 committed May 24, 2024
1 parent 8ac6806 commit e09d1f8
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
Expand All @@ -35,6 +36,9 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
Expand Down Expand Up @@ -118,6 +122,36 @@ public void initDocRepToRemoteMigration() {
);
}

public void enableEnhancedPrefixPath() {
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(
CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(),
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.name()
)
.put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX.name())
.build()
)
.get()
);
}

public void enableRemoteTranslogMetadata() {
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), "true"))
.get()
);
}

public BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -407,6 +408,8 @@ public void testRemotePathMetadataAddedWithFirstPrimaryMovingToRemote() throws E

logger.info("---> Adding 3 remote enabled nodes");
initDocRepToRemoteMigration();
enableEnhancedPrefixPath();
enableRemoteTranslogMetadata();
addRemote = true;
List<String> remoteEnabledNodes = internalCluster().startDataOnlyNodes(
3,
Expand Down Expand Up @@ -512,6 +515,15 @@ private void assertCustomIndexMetadata(String index) {
logger.info("---> Asserting custom index metadata");
IndexMetadata iMd = internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(index);
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY));
assertEquals(
iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(RemoteStoreEnums.PathHashAlgorithm.NAME),
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.name()
);
assertEquals(
iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(RemoteStoreEnums.PathType.NAME),
RemoteStoreEnums.PathType.HASHED_PREFIX.name()
);
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(IndexMetadata.TRANSLOG_METADATA_KEY));
assertEquals(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(IndexMetadata.TRANSLOG_METADATA_KEY), "false");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ protected void clusterManagerOperation(
routingNodes,
state,
clusterInfo,
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
System.nanoTime()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
routingNodes,
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down Expand Up @@ -258,6 +259,7 @@ public ClusterState applyFailedShards(
routingNodes,
tmpState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime
);
Expand Down Expand Up @@ -333,6 +335,7 @@ public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean rer
routingNodes,
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down Expand Up @@ -360,6 +363,7 @@ public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
clusterState.getRoutingNodes(),
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down Expand Up @@ -493,6 +497,7 @@ public CommandsResult reroute(final ClusterState clusterState, AllocationCommand
routingNodes,
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down Expand Up @@ -530,6 +535,7 @@ public ClusterState reroute(ClusterState clusterState, String reason) {
routingNodes,
fixedClusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater;
import org.opensearch.repositories.RepositoriesService;

import java.util.Collections;
import java.util.Comparator;
Expand All @@ -73,6 +74,7 @@ public class IndexMetadataUpdater extends RoutingChangesObserver.AbstractRouting
private final Logger logger = LogManager.getLogger(IndexMetadataUpdater.class);
private final Map<ShardId, Updates> shardChanges = new HashMap<>();
private boolean ongoingRemoteStoreMigration = false;
private RepositoriesService repositoriesService;

@Override
public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) {
Expand Down Expand Up @@ -176,7 +178,7 @@ public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable,
oldMetadata.settings(),
logger
);
migrationImdUpdater.maybeUpdateRemoteStoreCustomMetadata(indexMetadataBuilder, index.getName());
migrationImdUpdater.maybeUpdateRemoteStoreCustomMetadata(indexMetadataBuilder, index.getName(), repositoriesService);
migrationImdUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, index.getName());
}
}
Expand Down Expand Up @@ -408,6 +410,10 @@ public void setOngoingRemoteStoreMigration(boolean ongoingRemoteStoreMigration)
this.ongoingRemoteStoreMigration = ongoingRemoteStoreMigration;
}

public void setRepositoriesService(RepositoriesService repositoriesService) {
this.repositoriesService = repositoriesService;
}

private static class Updates {
private boolean increaseTerm; // whether primary term should be increased
private Set<String> addedAllocationIds = new HashSet<>(); // allocation ids that should be added to the in-sync set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.RestoreService.RestoreInProgressUpdater;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.opensearch.snapshots.SnapshotsInfoService;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -102,6 +105,18 @@ public class RoutingAllocation {
restoreInProgressUpdater
);

// Used for tests
public RoutingAllocation(
AllocationDeciders deciders,
RoutingNodes routingNodes,
ClusterState clusterState,
ClusterInfo clusterInfo,
SnapshotShardSizeInfo shardSizeInfo,
long currentNanoTime
) {
this(deciders, routingNodes, clusterState, clusterInfo, null, shardSizeInfo, currentNanoTime);
}

/**
* Creates a new {@link RoutingAllocation}
* @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
Expand All @@ -114,6 +129,7 @@ public RoutingAllocation(
RoutingNodes routingNodes,
ClusterState clusterState,
ClusterInfo clusterInfo,
SnapshotsInfoService snapshotsInfoService,
SnapshotShardSizeInfo shardSizeInfo,
long currentNanoTime
) {
Expand All @@ -128,6 +144,7 @@ public RoutingAllocation(
this.currentNanoTime = currentNanoTime;
if (isMigratingToRemoteStore(metadata)) {
indexMetadataUpdater.setOngoingRemoteStoreMigration(true);
indexMetadataUpdater.setRepositoriesService(determineRepositoriesService(snapshotsInfoService));
}
}

Expand All @@ -136,6 +153,21 @@ public long getCurrentNanoTime() {
return currentNanoTime;
}

/**
* Extracts instance of {@link RepositoriesService} to be used during the remote store migration flow
* Uses the {@link InternalSnapshotsInfoService} implementation of {@link SnapshotsInfoService}
* which is wired directly from {@link org.opensearch.cluster.ClusterModule} and the reference is passed through {@link AllocationService}
*
* @param snapshotsInfoService SnapshotsInfoService passed from ClusterModule through AllocationService
* @return RepositoriesService reference to be used in the remote migration flow
*/
private RepositoriesService determineRepositoriesService(SnapshotsInfoService snapshotsInfoService) {
assert snapshotsInfoService != null : "Cannot have null snapshotsInfo during remote store migration";
RepositoriesService repositoriesService = ((InternalSnapshotsInfoService) snapshotsInfoService).getRepositoriesService().get();
assert repositoriesService != null : "Cannot have null repositoriesService during remote store migration";
return repositoriesService;
}

/**
* Get {@link AllocationDeciders} used for allocation
* @return {@link AllocationDeciders} used for allocation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -127,12 +128,16 @@ private boolean needsRemoteIndexSettingsUpdate(
* @param indexMetadataBuilder Mutated {@link IndexMetadata.Builder} having the previous state updates
* @param index index name
*/
public void maybeUpdateRemoteStoreCustomMetadata(IndexMetadata.Builder indexMetadataBuilder, String index) {
public void maybeUpdateRemoteStoreCustomMetadata(
IndexMetadata.Builder indexMetadataBuilder,
String index,
RepositoriesService repositoriesService
) {
if (indexHasRemoteCustomMetadata(indexMetadata) == false) {
logger.info("Adding remote store custom data for index [{}] during migration", index);
indexMetadataBuilder.putCustom(
REMOTE_STORE_CUSTOM_KEY,
determineRemoteStoreCustomMetadataDuringMigration(clusterSettings, discoveryNodes)
determineRemoteStoreCustomMetadataDuringMigration(clusterSettings, discoveryNodes, repositoriesService)
);
} else {
logger.debug("Index {} already has remote store custom data", index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -33,6 +35,7 @@
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;

/**
* Utils for remote store
Expand Down Expand Up @@ -203,22 +206,25 @@ public static boolean determineTranslogMetadataEnabled(IndexMetadata indexMetada
*/
public static Map<String, String> determineRemoteStoreCustomMetadataDuringMigration(
Settings clusterSettings,
DiscoveryNodes discoveryNodes
DiscoveryNodes discoveryNodes,
RepositoriesService repositoriesService
) {
Map<String, String> remoteCustomData = new HashMap<>();
Version minNodeVersion = discoveryNodes.getMinNodeVersion();
boolean blobStoreMetadataClusterSettingsEnabled = Version.V_2_15_0.compareTo(minNodeVersion) <= 0
&& CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.get(clusterSettings);

// TODO: During the document replication to a remote store migration, there should be a check to determine if the registered
// translog blobstore supports custom metadata or not.
// Currently, the blobStoreMetadataEnabled flag is set to false because the integration tests run on the local file system, which
// does not support custom metadata.
// https://github.com/opensearch-project/OpenSearch/issues/13745
boolean blobStoreMetadataEnabled = false;
boolean translogMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0
&& CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.get(clusterSettings)
&& blobStoreMetadataEnabled;

remoteCustomData.put(IndexMetadata.TRANSLOG_METADATA_KEY, Boolean.toString(translogMetadata));
if (blobStoreMetadataClusterSettingsEnabled) {
String translogRepositoryName = RemoteStoreUtils.getRemoteStoreRepoName(discoveryNodes)
.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (((BlobStoreRepository) repositoriesService.repository(translogRepositoryName)).blobStore().isBlobMetadataEnabled()) {
logger.debug("Repository {} supports object metadata. Setting translog_metadata to true", translogRepositoryName);
remoteCustomData.put(IndexMetadata.TRANSLOG_METADATA_KEY, Boolean.toString(true));
} else {
logger.debug("Repository {} does not support object metadata. Setting translog_metadata to false", translogRepositoryName);
remoteCustomData.put(IndexMetadata.TRANSLOG_METADATA_KEY, Boolean.toString(false));
}
}

RemoteStoreEnums.PathType pathType = Version.CURRENT.compareTo(minNodeVersion) <= 0
? CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.get(clusterSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ public class InternalSnapshotsInfoService implements ClusterStateListener, Snaps
);

private final ThreadPool threadPool;
private final Supplier<RepositoriesService> repositoriesService;

public final Supplier<RepositoriesService> repositoriesService;

private final Supplier<RerouteService> rerouteService;

/** contains the snapshot shards for which the size is known **/
Expand Down Expand Up @@ -362,6 +364,10 @@ private static Set<SnapshotShard> listOfSnapshotShards(final ClusterState state)
return Collections.unmodifiableSet(snapshotShards);
}

public Supplier<RepositoriesService> getRepositoriesService() {
return repositoriesService;
}

/**
* A snapshot of a shard
*
Expand Down
Loading

0 comments on commit e09d1f8

Please sign in to comment.