Skip to content

Commit

Permalink
Code changes to do the repository verification at the time of node bo…
Browse files Browse the repository at this point in the history
…otstrap.

Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
  • Loading branch information
psychbot authored and Sachin Kale committed Aug 31, 2023
1 parent d6c7866 commit f1409b1
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,55 +22,49 @@
import java.util.stream.Collectors;

/**
* This is an extension of {@Code DiscoveryNode} which provides an abstraction for validating and storing information
* specific to remote backed storage nodes.
* This is an abstraction for validating and storing information specific to remote backed storage nodes.
*
* @opensearch.internal
*/
public class RemoteStoreNode extends DiscoveryNode {
public class RemoteStoreNode {

public static final String REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX = "remote_store";
public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.segment.repository";
public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.translog.repository";
public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type";
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings.";
private final DiscoveryNode node;
private final RepositoriesMetadata repositoriesMetadata;
private final DiscoveryNode node;

/**
* Creates a new {@link RemoteStoreNode}
*/
public RemoteStoreNode(DiscoveryNode node) {
super(node.getName(), node.getId(), node.getAddress(), node.getAttributes(), node.getRoles(), node.getVersion());
this.node = node;
this.repositoriesMetadata = buildRepositoriesMetadata(node);
this.repositoriesMetadata = buildRepositoriesMetadata();
}

private String validateAttributeNonNull(DiscoveryNode node, String attributeKey) {
private String validateAttributeNonNull(String attributeKey) {
String attributeValue = node.getAttributes().get(attributeKey);
if (attributeValue == null || attributeValue.isEmpty()) {
throw new IllegalStateException("joining node [" + node + "] doesn't have the node attribute [" + attributeKey + "].");
throw new IllegalStateException("joining node [" + this + "] doesn't have the node attribute [" + attributeKey + "].");
}

return attributeValue;
}

private Map<String, String> validateSettingsAttributesNonNull(DiscoveryNode node, String settingsAttributeKeyPrefix) {
private Map<String, String> validateSettingsAttributesNonNull(String settingsAttributeKeyPrefix) {
return node.getAttributes()
.keySet()
.stream()
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> validateAttributeNonNull(node, key)));
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> validateAttributeNonNull(key)));
}

// TODO: Add logic to mark these repository as System Repository once thats merged.
private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
String type = validateAttributeNonNull(
node,
String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)
);
private RepositoryMetadata buildRepositoryMetadata(String name) {
String type = validateAttributeNonNull(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));
Map<String, String> settingsMap = validateSettingsAttributesNonNull(
node,
String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name)
);

Expand All @@ -80,15 +74,15 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na
return new RepositoryMetadata(name, type, settings.build());
}

private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
private RepositoriesMetadata buildRepositoriesMetadata() {
String segmentRepositoryName = node.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
String translogRepositoryName = node.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (segmentRepositoryName.equals(translogRepositoryName)) {
return new RepositoriesMetadata(Collections.singletonList(buildRepositoryMetadata(node, segmentRepositoryName)));
return new RepositoriesMetadata(Collections.singletonList(buildRepositoryMetadata(segmentRepositoryName)));
} else {
List<RepositoryMetadata> repositoryMetadataList = new ArrayList<>();
repositoryMetadataList.add(buildRepositoryMetadata(node, segmentRepositoryName));
repositoryMetadataList.add(buildRepositoryMetadata(node, translogRepositoryName));
repositoryMetadataList.add(buildRepositoryMetadata(segmentRepositoryName));
repositoryMetadataList.add(buildRepositoryMetadata(translogRepositoryName));
return new RepositoriesMetadata(repositoryMetadataList);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -35,37 +34,37 @@ public class RemoteStoreService {

private static final Logger logger = LogManager.getLogger(RemoteStoreService.class);
private final Supplier<RepositoriesService> repositoriesService;
public static final Setting<String> REMOTE_STORE_MIGRATION_SETTING = Setting.simpleString(
"remote_store.migration",
MigrationTypes.NOT_MIGRATING.value,
MigrationTypes::validate,
public static final Setting<String> REMOTE_STORE_COMPATIBILITY_MODE_SETTING = Setting.simpleString(
"remote_store.compatibility_mode",
CompatibilityMode.ALLOW_ONLY_REMOTE_STORE_NODES.value,
CompatibilityMode::validate,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public enum MigrationTypes {
NOT_MIGRATING("not_migrating"),
MIGRATING_TO_REMOTE_STORE("migrating_to_remote_store"),
public enum CompatibilityMode {
ALLOW_ONLY_REMOTE_STORE_NODES("allow_only_remote_store_nodes"),
ALLOW_ALL_NODES("allow_all_nodes"),
MIGRATING_TO_HOT("migrating_to_hot");

public static MigrationTypes validate(String migrationType) {
public static CompatibilityMode validate(String compatibilityMode) {
try {
return MigrationTypes.valueOf(migrationType.toUpperCase(Locale.ROOT));
return CompatibilityMode.valueOf(compatibilityMode.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"["
+ migrationType
+ "] migration type is not supported. "
+ "Supported migration types are ["
+ MigrationTypes.values().toString()
+ compatibilityMode
+ "] compatibility mode is not supported. "
+ "supported modes are ["
+ CompatibilityMode.values().toString()
+ "]"
);
}
}

public final String value;

MigrationTypes(String value) {
CompatibilityMode(String value) {
this.value = value;
}
}
Expand All @@ -74,7 +73,7 @@ public RemoteStoreService(Supplier<RepositoriesService> repositoriesService) {
this.repositoriesService = repositoriesService;
}

public void verifyRepository(RepositoryMetadata repositoryMetadata) {
public void verifyRepository(List<Repository> repositories) {
ActionListener<VerifyRepositoryResponse> listener = new ActionListener<>() {

@Override
Expand All @@ -88,31 +87,40 @@ public void onFailure(Exception e) {
}
};

repositoriesService.get()
.verifyRepository(
repositoryMetadata.name(),
ActionListener.delegateFailure(
listener,
(delegatedListener, verifyResponse) -> delegatedListener.onResponse(
new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))
for (Repository repository : repositories) {
repositoriesService.get()
.verifyRepository(
repository.getMetadata().name(),
ActionListener.delegateFailure(
listener,
(delegatedListener, verifyResponse) -> delegatedListener.onResponse(
new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))
)
)
)
);
);
}
}

private ClusterState createRepository(RepositoryMetadata newRepositoryMetadata, ClusterState currentState) {
RepositoriesService.validate(newRepositoryMetadata.name());

Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
if (repositories == null) {
Repository repository = repositoriesService.get().createRepository(newRepositoryMetadata);
public List<Repository> createRepositories(RemoteStoreNode node) {
List<Repository> repositories = new ArrayList<>();
for (RepositoryMetadata repositoryMetadata : node.getRepositoriesMetadata().repositories()) {
RepositoriesService.validate(repositoryMetadata.name());
Repository repository = repositoriesService.get().createRepository(repositoryMetadata);
logger.info(
"Remote store repository with name {} and type {} created.",
repository.getMetadata().name(),
repository.getMetadata().type()
);
repositories.add(repository);
}
return repositories;
}

private ClusterState updateRepositoryMetadata(RepositoryMetadata newRepositoryMetadata, ClusterState currentState) {
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
if (repositories == null) {
repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata));
} else {
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);
Expand All @@ -134,71 +142,18 @@ private ClusterState createRepository(RepositoryMetadata newRepositoryMetadata,
repositoriesMetadata.add(repositoryMetadata);
}
}
Repository repository = repositoriesService.get().createRepository(newRepositoryMetadata);
logger.info(
"Remote store repository with name {} and type {} created",
repository.getMetadata().name(),
repository.getMetadata().type()
);
repositoriesMetadata.add(newRepositoryMetadata);
repositories = new RepositoriesMetadata(repositoriesMetadata);
}
mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories);
return ClusterState.builder(currentState).metadata(mdBuilder).build();
}

private boolean isRepositoryCreated(RepositoryMetadata repositoryMetadata) {
try {
repositoriesService.get().repository(repositoryMetadata.name());
return true;
} catch (RepositoryMissingException e) {
return false;
}
}

private boolean isRepositoryAddedInClusterState(RepositoryMetadata repositoryMetadata, ClusterState currentState) {
RepositoriesMetadata repositoriesMetadata = currentState.metadata().custom(RepositoriesMetadata.TYPE);
if (repositoriesMetadata == null) {
return false;
}
for (RepositoryMetadata existingRepositoryMetadata : repositoriesMetadata.repositories()) {
existingRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata);
return true;
}
return false;
}

private ClusterState createOrVerifyRepository(RepositoriesMetadata repositoriesMetadata, ClusterState currentState) {
public ClusterState updateClusterStateRepositoriesMetadata(RemoteStoreNode node, ClusterState currentState) {
ClusterState newState = ClusterState.builder(currentState).build();
for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) {
if (isRepositoryCreated(repositoryMetadata)) {
verifyRepository(repositoryMetadata);
} else {
if (!isRepositoryAddedInClusterState(repositoryMetadata, currentState)) {
newState = ClusterState.builder(createRepository(repositoryMetadata, newState)).build();
// verifyRepository(repositoryMetadata);
}
}
for (RepositoryMetadata newRepositoryMetadata : node.getRepositoriesMetadata().repositories()) {
newState = updateRepositoryMetadata(newRepositoryMetadata, newState);
}
return newState;
}

public ClusterState joinCluster(RemoteStoreNode joiningRemoteStoreNode, ClusterState currentState) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentState.nodes().getNodes().values());
if (existingNodes.isEmpty()) {
return currentState;
}
ClusterState.Builder newState = ClusterState.builder(currentState);
if (existingNodes.get(0).isRemoteStoreNode()) {
RemoteStoreNode existingRemoteStoreNode = new RemoteStoreNode(existingNodes.get(0));
if (existingRemoteStoreNode.equals(joiningRemoteStoreNode)) {
newState = ClusterState.builder(createOrVerifyRepository(joiningRemoteStoreNode.getRepositoriesMetadata(), currentState));
}
} else {
throw new IllegalStateException(
"a remote store node [" + joiningRemoteStoreNode + "] is trying to join a non remote store cluster."
);
}
return newState.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.remotestore.RemoteStoreService.CompatibilityMode.ALLOW_ONLY_REMOTE_STORE_NODES;
import static org.opensearch.action.admin.cluster.remotestore.RemoteStoreService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

Expand Down Expand Up @@ -191,12 +193,11 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
* present in join task as well as current node. We want the repositories to be registered during
* first node join. See
* {@link org.opensearch.gateway.GatewayMetaState#prepareInitialClusterState(TransportService, ClusterService, ClusterState)} **/
newState = ClusterState.builder(remoteStoreService.joinCluster(new RemoteStoreNode(node), currentState));
newState = ClusterState.builder(
remoteStoreService.updateClusterStateRepositoriesMetadata(new RemoteStoreNode(node), currentState)
);
}
} else {
if (node.isRemoteStoreNode()) {
newState = ClusterState.builder(remoteStoreService.joinCluster(new RemoteStoreNode(node), currentState));
}
try {
if (enforceMajorVersion) {
ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);
Expand All @@ -217,6 +218,11 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
if (node.isClusterManagerNode()) {
joiniedNodeNameIds.put(node.getName(), node.getId());
}
if (node.isRemoteStoreNode()) {
newState = ClusterState.builder(
remoteStoreService.updateClusterStateRepositoriesMetadata(new RemoteStoreNode(node), currentState)
);
}
} catch (IllegalArgumentException | IllegalStateException | NodeDecommissionedException e) {
results.failure(joinTask, e);
continue;
Expand Down Expand Up @@ -466,10 +472,9 @@ public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode
}

// TODO: The below check is valid till we support migration, once we start supporting migration a remote
// store node will be able to join a non remote store cluster and vice versa. #7986
if (RemoteStoreService.MigrationTypes.NOT_MIGRATING.value.equals(
RemoteStoreService.REMOTE_STORE_MIGRATION_SETTING.get(currentState.metadata().settings())
)) {
// store node will be able to join a non remote store cluster and vice versa. #7986
String remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(currentState.metadata().settings());
if (ALLOW_ONLY_REMOTE_STORE_NODES.value.equals(remoteStoreCompatibilityMode)) {
DiscoveryNode existingNode = existingNodes.get(0);
if (joiningNode.isRemoteStoreNode()) {
if (existingNode.isRemoteStoreNode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
package org.opensearch.cluster.node;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.remotestore.RemoteStoreNode;
import org.opensearch.action.admin.cluster.remotestore.RemoteStoreService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.Setting;
Expand All @@ -44,11 +46,13 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.node.Node;
import org.opensearch.repositories.Repository;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -282,6 +286,29 @@ public static DiscoveryNode createLocal(Settings settings, TransportAddress publ
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes, roles, Version.CURRENT);
}

/** Creates a DiscoveryNode representing the local node and verifies the repository. */
public static DiscoveryNode createLocal(
Settings settings,
TransportAddress publishAddress,
String nodeId,
RemoteStoreService remoteStoreService
) {
Map<String, String> attributes = Node.NODE_ATTRIBUTES.getAsMap(settings);
Set<DiscoveryNodeRole> roles = getRolesFromSettings(settings);
DiscoveryNode discoveryNode = new DiscoveryNode(
Node.NODE_NAME_SETTING.get(settings),
nodeId,
publishAddress,
attributes,
roles,
Version.CURRENT
);
RemoteStoreNode remoteStoreNode = new RemoteStoreNode(discoveryNode);
List<Repository> repositories = remoteStoreService.createRepositories(remoteStoreNode);
remoteStoreService.verifyRepository(repositories);
return discoveryNode;
}

/** extract node roles from the given settings */
public static Set<DiscoveryNodeRole> getRolesFromSettings(final Settings settings) {
if (NODE_ROLES_SETTING.exists(settings)) {
Expand Down
Loading

0 comments on commit f1409b1

Please sign in to comment.