Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state #16763

Merged
merged 12 commits into from
Dec 10, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Fix bug in cache stats API ([#16560](https://github.com/opensearch-project/OpenSearch/pull/16560))
- Bound the size of cache in deprecation logger ([16702](https://github.com/opensearch-project/OpenSearch/issues/16702))
- Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644))
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,37 @@
package org.opensearch.discovery;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.coordination.JoinHelper;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PublicationTransportHandler;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Randomness;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.disruption.ServiceDisruptionScheme;
import org.opensearch.test.disruption.SlowClusterStateProcessing;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
import org.junit.Assert;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
Expand Down Expand Up @@ -250,4 +264,142 @@ public void testNodeNotReachableFromClusterManager() throws Exception {
ensureStableCluster(3);
}

/**
* Tests the scenario where-in a cluster-state containing new repository meta-data as part of a node-join from a
* repository-configured node fails on a commit stag and has a master switch. This would lead to master nodes
* doing another round of node-joins with the new cluster-state as the previous attempt had a successful publish.
*/
public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
final String remoteStateRepoName = "remote-state-repo";
final String remoteRoutingTableRepoName = "routing-table-repo";

Settings remotePublicationSettings = buildRemotePublicationNodeAttributes(
remoteStateRepoName,
ReloadableFsRepository.TYPE,
remoteRoutingTableRepoName,
ReloadableFsRepository.TYPE
);
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startDataOnlyNodes(3);

String clusterManagerNode = internalCluster().getClusterManagerName();
List<String> nonClusterManagerNodes = Arrays.stream(internalCluster().getNodeNames())
.filter(node -> !node.equals(clusterManagerNode))
.collect(Collectors.toList());

ensureStableCluster(6);

MockTransportService clusterManagerTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
clusterManagerNode
);
logger.info("Blocking Cluster Manager Commit Request on all nodes");
// This is to allow the new node to have commit failures on the nodes in the send path itself. This will lead to the
// nodes have a successful publish operation but failed commit operation. This will come into play once the new node joins
nonClusterManagerNodes.forEach(node -> {
TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node);
clusterManagerTransportService.addSendBehavior(targetTransportService, (connection, requestId, action, request, options) -> {
if (action.equals(PublicationTransportHandler.COMMIT_STATE_ACTION_NAME)) {
logger.info("--> preventing {} request", PublicationTransportHandler.COMMIT_STATE_ACTION_NAME);
throw new FailedToCommitClusterStateException("Blocking Commit");
}
connection.sendRequest(requestId, action, request, options);
});
});

logger.info("Starting Node with remote publication settings");
// Start a node with remote-publication repositories configured. This will lead to the active cluster-manager create
// a new cluster-state event with the new node-join along with new repositories setup in the cluster meta-data.
internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE);

Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
// Checking if publish succeeded in the nodes before shutting down the blocked cluster-manager
assertBusy(() -> {
String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, randomNode);

ClusterState state = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL).getLastAcceptedState();
RepositoriesMetadata repositoriesMetadata = state.metadata().custom(RepositoriesMetadata.TYPE);
Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

assertNotNull(repositoriesMetadata);
assertNotNull(repositoriesMetadata.repositories());

for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
if (repo.name().equals(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}
}
// Asserting that the metadata is present in the persisted cluster-state
assertTrue(isRemoteStateRepoConfigured);
assertTrue(isRemoteRoutingTableRepoConfigured);
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved

RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);

isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName);
isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName);

// Asserting that the metadata is not present in the repository service.
Assert.assertFalse(isRemoteStateRepoConfigured);
Assert.assertFalse(isRemoteRoutingTableRepoConfigured);
});

logger.info("Stopping current Cluster Manager");
// We stop the current cluster-manager whose outbound paths were blocked. This is to force a new election onto nodes
// we had the new cluster-state published but not commited.
internalCluster().stopCurrentClusterManagerNode();

// We expect that the repositories validations are skipped in this case and node-joins succeeds as expected. The
// repositories validations are skipped because even though the cluster-state is updated in the persisted registry,
// the repository service will not be updated as the commit attempt failed.
ensureStableCluster(6);

String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));

// Checking if the final cluster-state is updated.
RepositoriesMetadata repositoriesMetadata = internalCluster().getInstance(ClusterService.class, randomNode)
.state()
.metadata()
.custom(RepositoriesMetadata.TYPE);

Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
if (repo.name().equals(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}
}

Assert.assertTrue("RemoteState Repo is not set in RepositoriesMetadata", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoriesMetadata", isRemoteRoutingTableRepoConfigured);

RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);

isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName);
isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName);

Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoryService", isRemoteRoutingTableRepoConfigured);

logger.info("Stopping current Cluster Manager");
}

private Boolean isRepoPresentInRepositoryService(RepositoriesService repositoriesService, String repoName) {
try {
Repository remoteStateRepo = repositoriesService.repository(repoName);
if (Objects.nonNull(remoteStateRepo)) {
return Boolean.TRUE;
}
} catch (RepositoryMissingException e) {
return Boolean.FALSE;
}

return Boolean.FALSE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -183,6 +184,20 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
boolean repositoryAlreadyPresent = false;
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
try {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded
// but the commit operation failed, the cluster-state may have the repository metadata which is not applied
// into the repository service. This may lead to assertion failures down the line.
repositoriesService.get().repository(newRepositoryMetadata.name());
} catch (RepositoryMissingException e) {
logger.warn(
"Skipping repositories metadata checks: Remote repository [{}] is in the cluster state but not present "
+ "in the repository service.",
newRepositoryMetadata.name()
);
break;
}

try {
// This will help in handling two scenarios -
// 1. When a fresh cluster is formed and a node tries to join the cluster, the repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -904,6 +905,12 @@
Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings();
Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings();

assert Objects.nonNull(repository) : String.format(
Locale.ROOT,
"repository [%s] not present in RepositoryService",
currentRepositoryMetadata.name()

Check warning on line 911 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L911

Added line #L911 was not covered by tests
);

List<String> restrictedSettings = repository.getRestrictedSystemRepositorySettings()
.stream()
.map(setting -> setting.getKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
Expand Down Expand Up @@ -1378,6 +1379,72 @@ public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() {
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testUpdatesClusterStateWithRepositoryMetadataNotInSync() throws Exception {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
Map<String, String> newNodeAttributes = new HashMap<>();
newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));

final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
RepositoriesService repositoriesService = mock(RepositoriesService.class);
when(repositoriesService.repository(any())).thenThrow(RepositoryMissingException.class);
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, null);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
Settings.EMPTY,
allocationService,
logger,
rerouteService,
remoteStoreNodeService
);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
newNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final RepositoryMetadata clusterStateRepo = buildRepositoryMetadata(clusterManagerNode, CLUSTER_STATE_REPO);
final RepositoryMetadata routingTableRepo = buildRepositoryMetadata(clusterManagerNode, ROUTING_TABLE_REPO);
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>() {
{
add(clusterStateRepo);
add(routingTableRepo);
}
};

final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
.add(clusterManagerNode)
.localNodeId(clusterManagerNode.getId())
.clusterManagerNodeId(clusterManagerNode.getId())
)
.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata)))
.build();

final DiscoveryNode joiningNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
newNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
clusterState,
List.of(new JoinTaskExecutor.Task(joiningNode, "test"))
);
assertThat(result.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);

}

private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2322,10 +2322,24 @@ public List<String> startNodes(int numOfNodes, Settings settings) {
return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public List<String> startNodes(int numOfNodes, Settings settings, Boolean waitForNodeJoin) {
return startNodes(waitForNodeJoin, Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Settings... extraSettings) {
return startNodes(false, extraSettings);
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Boolean waitForNodeJoin, Settings... extraSettings) {
final int newClusterManagerCount = Math.toIntExact(Stream.of(extraSettings).filter(DiscoveryNode::isClusterManagerNode).count());
final int defaultMinClusterManagerNodes;
if (autoManageClusterManagerNodes) {
Expand Down Expand Up @@ -2377,7 +2391,7 @@ public synchronized List<String> startNodes(Settings... extraSettings) {
nodes.add(nodeAndClient);
}
startAndPublishNodesAndClients(nodes);
if (autoManageClusterManagerNodes) {
if (autoManageClusterManagerNodes && !waitForNodeJoin) {
validateClusterFormed();
}
return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
Expand Down Expand Up @@ -2422,6 +2436,10 @@ public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build());
}

public List<String> startDataOnlyNodes(int numNodes, Settings settings, Boolean ignoreNodeJoin) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build(), ignoreNodeJoin);
}

public List<String> startSearchOnlyNodes(int numNodes) {
return startSearchOnlyNodes(numNodes, Settings.EMPTY);
}
Expand Down
Loading
Loading