Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Commit Failures During Node Joins with Repositories Configured Result in Persistent NullPointerExceptions #16762

Closed
Pranshu-S opened this issue Dec 3, 2024 · 2 comments · Fixed by #16763
Assignees

Comments

@Pranshu-S
Copy link
Contributor

Describe the bug

Issue Overview

During node joins, when a new node containing new repository metadata joins the cluster, the cluster-manager attempts to publish the updated cluster state that includes the node and its metadata. While during this update if the publish operation succeeds and the commit fails due to other issues (like network disruption or joining leader in term), it leads to a persistent cycle of NullPointerExceptions which prevents the cluster to become stable. This is because as part of the publish, the last accepted version and cluster state are updated but due to commits not run, the cluster-state appliers are not executed. This results in the repositories service not in sync with the repositories metadata in the cluster state. Now when the current cluster-manager (leader) steps down and another cluster-manager is elected:

  1. The newly elected cluster-manager attempts to verify repository metadata as part of its leadership transition.
  2. It checks the metadata in the cluster state for the presence of a specific repository. If the repository exists, it attempts to fetch its corresponding object from the repository service.
  3. Since the repository service was not updated earlier (due to the cluster appliers not being executed), this leads to a NullPointerException, causing instability in the cluster-manager election and transition process.

[Logs from IT used to repro the issue]

[2024-12-03T17:32:27,926][INFO ][o.o.c.c.JoinHelper       ] [node_t0] failed to join {node_t0}{HI4LDSiRQserbA7_FDq4Dw}{fWSEQusjSgiDULZF2oBNEw}{127.0.0.1}{127.0.0.1:50328}{m}{shard_indexing_pressure_enabled=true} with JoinRequest{sourceNode={node_t0}{HI4LDSiRQserbA7_FDq4Dw}{fWSEQusjSgiDULZF2oBNEw}{127.0.0.1}{127.0.0.1:50328}{m}{shard_indexing_pressure_enabled=true}, minimumTerm=1, optionalJoin=Optional[Join{term=2, lastAcceptedTerm=1, lastAcceptedVersion=8, sourceNode={node_t0}{HI4LDSiRQserbA7_FDq4Dw}{fWSEQusjSgiDULZF2oBNEw}{127.0.0.1}{127.0.0.1:50328}{m}{shard_indexing_pressure_enabled=true}, targetNode={node_t0}{HI4LDSiRQserbA7_FDq4Dw}{fWSEQusjSgiDULZF2oBNEw}{127.0.0.1}{127.0.0.1:50328}{m}{shard_indexing_pressure_enabled=true}}]}
org.opensearch.transport.RemoteTransportException: [node_t0][127.0.0.1:50328][internal:cluster/coordination/join]
Caused by: java.lang.NullPointerException: Cannot invoke "org.opensearch.repositories.Repository.getRestrictedSystemRepositorySettings()" because "repository" is null
	at org.opensearch.repositories.RepositoriesService.ensureValidSystemRepositoryUpdate(RepositoriesService.java:907) ~[main/:?]
	at org.opensearch.node.remotestore.RemoteStoreNodeService.updateRepositoriesMetadata(RemoteStoreNodeService.java:194) ~[main/:?]
	at org.opensearch.cluster.coordination.JoinTaskExecutor.execute(JoinTaskExecutor.java:210) ~[main/:?]
	at org.opensearch.cluster.coordination.JoinHelper$1.execute(JoinHelper.java:197) ~[main/:?]
	at org.opensearch.cluster.service.MasterService.executeTasks(MasterService.java:917) ~[main/:?]
	at org.opensearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:469) ~[main/:?]
	at org.opensearch.cluster.service.MasterService.runTasks(MasterService.java:329) [main/:?]
	at org.opensearch.cluster.service.MasterService$Batcher.run(MasterService.java:229) [main/:?]
	at org.opensearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:206) [main/:?]
	at org.opensearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:264) [main/:?]
	at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:932) [main/:?]
	at org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedOpenSearchThreadPoolExecutor.java:283) [main/:?]
	at org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedOpenSearchThreadPoolExecutor.java:246) [main/:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:1575) [?:?]

Related component

Cluster Manager

To Reproduce

  1. Create a cluster without repositories configured and allow it to become stable.
  2. Start a new node with repositories configured
  3. Disrupt the cluster-manager by allowing the publish to succeed and the commit to fail

Expected behavior

  1. The repository service should remain in sync with the cluster state, even if a commit failure occurs during node joins.
  2. The newly elected cluster-manager should initialise without errors and correctly fetch repository objects.

Additional Details

Plugins
Please list all plugins currently enabled.

Screenshots
If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

@Pranshu-S
Copy link
Contributor Author

To understand this issue in-depth, as part of node-joins, we run the JoinTaskExecutor task to add the nodes and update the cluster state, here if the node has remote-store configured or remote-publication configured, we update the repositories metadata by reading the node-attributes of the joining node and updating the existing metadata. Notice here, we take the metadata from the current state

RepositoriesMetadata existingRepositoriesMetadata = currentState.getMetadata().custom(RepositoriesMetadata.TYPE);
Map<String, RepositoryMetadata> repositories = new LinkedHashMap<>();
if (existingRepositoriesMetadata != null) {
existingRepositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}
if (remoteDN.isPresent()) {
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
remoteDN.get(),
existingRepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}
if (remotePublicationDN.isPresent()) {
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
remotePublicationDN.get(),
existingRepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}

Note: The current state used here is fetched from the last accepted state from CoordinationState. The

ClusterState getStateForClusterManagerService() {
synchronized (mutex) {
// expose last accepted cluster state as base state upon which the cluster_manager service
// speculatively calculates the next cluster state update
final ClusterState clusterState = coordinationState.get().getLastAcceptedState();
if (mode != Mode.LEADER || clusterState.term() != getCurrentTerm()) {
// the cluster-manager service checks if the local node is the cluster-manager node in order to fail execution of the state
// update early
return clusterStateWithNoClusterManagerBlock(clusterState);
}
return clusterState;
}

Moving forward, as part of RemoteStoreNodeService::updateRepositoriesMetadata, we get the repositories metadata from the joining node and for each of the RepositoriesMetadata coming from the joining node, we check if the metadata existing in our current cluster state and if so, we call RepositoriesService::ensureValidSystemRepositoryUpdate

public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode, RepositoriesMetadata existingRepositories) {
if (joiningNode.isRemoteStoreNode() || joiningNode.isRemoteStatePublicationEnabled()) {
List<RepositoryMetadata> updatedRepositoryMetadataList = new ArrayList<>();
List<RepositoryMetadata> newRepositoryMetadataList = new RemoteStoreNodeAttribute(joiningNode).getRepositoriesMetadata()
.repositories();
if (existingRepositories == null) {
return new RepositoriesMetadata(newRepositoryMetadataList);
} else {
updatedRepositoryMetadataList.addAll(existingRepositories.repositories());
}
for (RepositoryMetadata newRepositoryMetadata : newRepositoryMetadataList) {
boolean repositoryAlreadyPresent = false;
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
try {
// This will help in handling two scenarios -
// 1. When a fresh cluster is formed and a node tries to join the cluster, the repository
// metadata constructed from the node attributes of the joining node will be validated
// against the repository information provided by existing nodes in cluster state.
// 2. It's possible to update repository settings except the restricted ones post the
// creation of a system repository and if a node drops we will need to allow it to join
// even if the non-restricted system repository settings are now different.
repositoriesService.get().ensureValidSystemRepositoryUpdate(newRepositoryMetadata, existingRepositoryMetadata);

As part of ensuring valid system repositories, we fetch the corresponding repository from the repository service and face NPEs when we try to perform the Repository::getRestrictedSystemRepositorySettings as in our case the repository metadata is ahead of the repositories present in the repository service.

public void ensureValidSystemRepositoryUpdate(RepositoryMetadata newRepositoryMetadata, RepositoryMetadata currentRepositoryMetadata) {
if (isSystemRepositorySettingPresent(currentRepositoryMetadata.settings())) {
try {
isValueEqual("type", newRepositoryMetadata.type(), currentRepositoryMetadata.type());
Repository repository = repositories.get(currentRepositoryMetadata.name());
Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings();
Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings();
List<String> restrictedSettings = repository.getRestrictedSystemRepositorySettings()
.stream()
.map(setting -> setting.getKey())
.collect(Collectors.toList());

To understand why our repositories are lagging behind, we see that there are two ways to update the repository in the repository service.

Method 1: updateRepositoriesMap
→ Called by createAndVerifyRepositories
During node bootstrap - if RemoteStoreAttributes are present (node-attributes indicating repositories for remote publication or store), it calls remoteStoreNodeService.createAndVerifyRepositories(discoveryNode)

  1. Gets the repositories from nodeAttributes
  2. Validates the file name of the repository
  3. Creates a repository object
  4. Verifies the repositories
  5. Updates the RepositoriesMap with the repository variable

public void createAndVerifyRepositories(DiscoveryNode localNode) {
RemoteStoreNodeAttribute nodeAttribute = new RemoteStoreNodeAttribute(localNode);
RepositoriesService reposService = repositoriesService.get();
Map<String, Repository> repositories = new HashMap<>();
for (RepositoryMetadata repositoryMetadata : nodeAttribute.getRepositoriesMetadata().repositories()) {
String repositoryName = repositoryMetadata.name();
Repository repository;
RepositoriesService.validate(repositoryName);
// Create Repository
repository = reposService.createRepository(repositoryMetadata);
logger.info(
"remote backed storage repository with name [{}] and type [{}] created",
repository.getMetadata().name(),
repository.getMetadata().type()
);
// Verify Repository
String verificationToken = repository.startVerification();
repository.verify(verificationToken, localNode);
repository.endVerification(verificationToken);
logger.info(() -> new ParameterizedMessage("successfully verified [{}] repository", repositoryName));
repositories.put(repositoryName, repository);
}
// Updating the repositories map in RepositoriesService
reposService.updateRepositoriesMap(repositories);
}

Method 2: applyClusterState

-> Called as part of ClusterApplierService::runTask which in-turn is triggered as part of the apply task submitted when ApplyCommitRequest is received on the nodes

Checks if new repositories appeared in or disappeared from cluster metadata and updates current list of repositories accordingly.

  1. Gets the oldMetaData (current state) and NewMetaData (incoming state)
  2. For repositories in local, we update the repos with the metadata of those same repos in same.
  3. For repositories in local, we check if the newMetaData does not have these repos - we close and repository else we add it to the list of survivors
  4. For each repository in the new state, we check if the repo is present in the survivor list and if it is ... we validate and update the repos else we create it

repositories = Collections.unmodifiableMap(builder);

Now since we know that the existing nodes do not have the node-attributes, we can safely assume that the only path for the existing cluster-manager node to have the state updated is through Method 2.

Since we know that the cluster-state is updated in coordinatorState as part of publish path (in handlePublishRequest), this will lead to cases wherein the cluster-state is ahead of the repository service incase the cluster-manager node fails to send the commit request

public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
final ClusterState clusterState = publishRequest.getAcceptedState();
if (clusterState.term() != getCurrentTerm()) {
logger.debug(
"handlePublishRequest: ignored publish request due to term mismatch (expected: [{}], actual: [{}])",
getCurrentTerm(),
clusterState.term()
);
throw new CoordinationStateRejectedException(
"incoming term " + clusterState.term() + " does not match current term " + getCurrentTerm()
);
}
if (clusterState.term() == getLastAcceptedTerm() && clusterState.version() <= getLastAcceptedVersion()) {
if (clusterState.term() == ZEN1_BWC_TERM
&& clusterState.nodes().getClusterManagerNode().equals(getLastAcceptedState().nodes().getClusterManagerNode()) == false) {
logger.debug(
"handling publish request in compatibility mode despite version mismatch (expected: >[{}], actual: [{}])",
getLastAcceptedVersion(),
clusterState.version()
);
} else {
logger.debug(
"handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])",
getLastAcceptedVersion(),
clusterState.version()
);
throw new CoordinationStateRejectedException(
"incoming version " + clusterState.version() + " lower or equal to current version " + getLastAcceptedVersion()
);
}
}
logger.trace(
"handlePublishRequest: accepting publish request for version [{}] and term [{}]",
clusterState.version(),
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
if (shouldUpdateRemotePersistedState(publishRequest)) {
updateRemotePersistedStateOnPublishRequest(publishRequest);
}
assert getLastAcceptedState() == clusterState;
return new PublishResponse(clusterState.term(), clusterState.version());
}

@rajiv-kv
Copy link
Contributor

rajiv-kv commented Dec 5, 2024

Triage Attendees 1 2

Thanks @Pranshu-S for filing the issue. Please feel free to raise a fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: ✅ Done
2 participants