Skip to content

Commit

Permalink
Merge branch 'main' into publish-disable-it
Browse files Browse the repository at this point in the history
  • Loading branch information
shiv0408 committed Sep 5, 2024
2 parents c5920cf + 1935650 commit 6320bc0
Show file tree
Hide file tree
Showing 20 changed files with 619 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -60,19 +64,27 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";
private static final String INDEX_NAME = "test-index";
private static final String REMOTE_STATE_PREFIX = "!";
private static final String REMOTE_ROUTING_PREFIX = "_";
private boolean isRemoteStateEnabled = true;
private String isRemotePublicationEnabled = "true";
private boolean hasRemoteStateCharPrefix;
private boolean hasRemoteRoutingCharPrefix;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
isRemoteStateEnabled = true;
isRemotePublicationEnabled = "true";
hasRemoteStateCharPrefix = randomBoolean();
hasRemoteRoutingCharPrefix = randomBoolean();
}

@Override
Expand All @@ -93,6 +105,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
routingTableRepoName
);

return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), isRemoteStateEnabled)
Expand All @@ -103,6 +116,19 @@ protected Settings nodeSettings(int nodeOrdinal) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
)
.put(
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""
)
.put(
RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX.getKey(),
hasRemoteRoutingCharPrefix ? REMOTE_ROUTING_PREFIX : ""
)
.put(RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX.toString())
.put(
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING.getKey(),
PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString()
)
.build();
}

Expand Down Expand Up @@ -146,6 +172,27 @@ public void testPublication() throws Exception {
Map<String, Integer> manifestFiles = getMetadataFiles(repository, RemoteClusterMetadataManifest.MANIFEST);
assertTrue(manifestFiles.containsKey(RemoteClusterMetadataManifest.MANIFEST));

RemoteClusterStateService remoteClusterStateService = internalCluster().getInstance(
RemoteClusterStateService.class,
internalCluster().getClusterManagerName()
);
ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
assertThat(manifest.getIndices().size(), is(1));
if (hasRemoteStateCharPrefix) {
for (UploadedIndexMetadata md : manifest.getIndices()) {
assertThat(md.getUploadedFilename(), startsWith(REMOTE_STATE_PREFIX));
}
}
assertThat(manifest.getIndicesRouting().size(), is(1));
if (hasRemoteRoutingCharPrefix) {
for (UploadedIndexMetadata md : manifest.getIndicesRouting()) {
assertThat(md.getUploadedFilename(), startsWith(REMOTE_ROUTING_PREFIX));
}
}

// get settings from each node and verify that it is updated
Settings settings = clusterService().getSettings();
logger.info("settings : {}", settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,17 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
internalCluster().stopAllNodes();
// Step - 3 Delete index metadata file in remote
try {
Files.move(
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
segmentRepoPath.resolve("cluster-state/")
RemoteClusterStateService remoteClusterStateService = internalCluster().getInstance(
RemoteClusterStateService.class,
internalCluster().getClusterManagerName()
);
ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
for (UploadedIndexMetadata md : manifest.getIndices()) {
Files.move(segmentRepoPath.resolve(md.getUploadedFilename()), segmentRepoPath.resolve("cluster-state/"));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.metadata.ViewMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand Down Expand Up @@ -479,4 +480,7 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard
allocationService.setExistingShardsAllocators(existingShardsAllocators);
}

public void setRerouteServiceForAllocator(RerouteService rerouteService) {
shardsAllocator.setRerouteService(rerouteService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
Expand All @@ -49,12 +50,14 @@
import org.opensearch.cluster.routing.allocation.RebalanceParameter;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.ShardAllocationDecision;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -202,6 +205,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
private long startTime;
private RerouteService rerouteService;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand Down Expand Up @@ -231,6 +235,12 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
}

@Override
public void setRerouteService(RerouteService rerouteService) {
assert this.rerouteService == null : "RerouteService is already set";
this.rerouteService = rerouteService;
}

/**
* Changes in deprecated setting SHARD_MOVE_PRIMARY_FIRST_SETTING affect value of its replacement setting SHARD_MOVEMENT_STRATEGY_SETTING.
*/
Expand Down Expand Up @@ -342,6 +352,7 @@ public void allocate(RoutingAllocation allocation) {
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();
scheduleRerouteIfAllocatorTimedOut();

final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
Expand Down Expand Up @@ -404,6 +415,20 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
}
}

private void scheduleRerouteIfAllocatorTimedOut() {
if (allocatorTimedOut()) {
assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out";
rerouteService.reroute(
"reroute after balanced shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after balanced shards allocator timed out completed"),
e -> logger.debug("reroute after balanced shards allocator timed out failed", e)
)
);
}
}

/**
* Returns the currently configured delta threshold
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.routing.allocation.allocator;

import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.MoveDecision;
Expand Down Expand Up @@ -73,4 +74,6 @@ public interface ShardsAllocator {
* the cluster explain API, then this method should throw a {@code UnsupportedOperationException}.
*/
ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation);

default void setRerouteService(RerouteService rerouteService) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ public String[] getBlobPathTokens() {

public abstract String generateBlobFileName();

/**
* Generate the blob path for the remote entity by adding a custom prefix.
* This custom prefix may be generated by any of the strategies defined in {@link org.opensearch.index.remote.RemoteStoreEnums}
* The default implementation returns the same path as passed in the argument.
* @param blobPath The remote path on which the remote entity is to be uploaded
* @return The modified remote path after adding a custom prefix at which the remote entity will be uploaded.
*/
public BlobPath getPrefixedPath(BlobPath blobPath) {
return blobPath;
}

public String clusterUUID() {
return clusterUUID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<T> obj) {
for (String token : obj.getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
return obj.getPrefixedPath(blobPath);
}

public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity<T> obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteIndexMetadataManager;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -736,13 +737,17 @@ public void apply(Settings value, Settings current, Settings previous) {
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.REMOTE_STATE_READ_TIMEOUT_SETTING,
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX,
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING,
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING,
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING,
RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX,

// Admission Control Settings
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ public void cleanCaches() {

// for tests
protected ShardsBatchGatewayAllocator() {
this(DEFAULT_SHARD_BATCH_SIZE);
this(DEFAULT_SHARD_BATCH_SIZE, null);
}

protected ShardsBatchGatewayAllocator(long batchSize) {
this.rerouteService = null;
protected ShardsBatchGatewayAllocator(long batchSize, RerouteService rerouteService) {
this.rerouteService = rerouteService;
this.batchStartedAction = null;
this.primaryShardBatchAllocator = null;
this.batchStoreAction = null;
Expand Down Expand Up @@ -297,6 +297,18 @@ public void run() {
public void onComplete() {
logger.trace("Triggering oncomplete after timeout for [{}] primary shards", timedOutPrimaryShardIds.size());
primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutPrimaryShardIds, allocation, true);
if (timedOutPrimaryShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for primary shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
};
} else {
Expand All @@ -320,6 +332,18 @@ public void run() {
public void onComplete() {
logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size());
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false);
if (timedOutReplicaShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for replica shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ public class RemoteClusterStateService implements Closeable {
Setting.Property.NodeScope
);

/**
* Controls the fixed prefix for the cluster state path on remote store.
*/
public static final Setting<String> CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX = Setting.simpleString(
"cluster.remote_store.state.path.prefix",
"",
Property.NodeScope,
Property.Final
);

/**
* Validation mode for cluster state checksum.
* None: Validation will be disabled.
Expand Down Expand Up @@ -211,6 +221,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private final boolean isPublicationEnabled;
private final String remotePathPrefix;

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
Expand Down Expand Up @@ -252,6 +263,7 @@ public RemoteClusterStateService(
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
Expand Down Expand Up @@ -728,7 +740,10 @@ UploadedMetadataResults writeMetadataInParallel(
indexMetadata,
clusterState.metadata().clusterUUID(),
blobStoreRepository.getCompressor(),
blobStoreRepository.getNamedXContentRegistry()
blobStoreRepository.getNamedXContentRegistry(),
remoteIndexMetadataManager.getPathTypeSetting(),
remoteIndexMetadataManager.getPathHashAlgoSetting(),
remotePathPrefix
),
listener
);
Expand Down
Loading

0 comments on commit 6320bc0

Please sign in to comment.