Skip to content

Commit

Permalink
Updating javadoc and RepoRegistration IT
Browse files Browse the repository at this point in the history
Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
  • Loading branch information
psychbot authored and Sachin Kale committed Aug 31, 2023
1 parent a7316a8 commit d97cc06
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.action.admin.cluster.remotestore.RemoteStoreNode.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.action.admin.cluster.remotestore.RemoteStoreNode.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.action.admin.cluster.remotestore.RemoteStoreNode.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.action.admin.cluster.remotestore.RemoteStoreNode.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING;
Expand All @@ -55,6 +60,7 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {

protected Path absolutePath;
protected Path absolutePath2;
protected Settings nodeAttributesSettings;
private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
Expand Down Expand Up @@ -108,9 +114,13 @@ protected boolean addMockInternalEngine() {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (nodeAttributesSettings == null) {
nodeAttributesSettings = remoteStoreNodeAttributes(REPOSITORY_NAME, REPOSITORY_2_NAME);
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, REPOSITORY_2_NAME, true))
.put(nodeAttributesSettings)
.build();
}

Expand Down Expand Up @@ -181,6 +191,36 @@ public static Settings remoteStoreClusterSettings(String segmentRepoName, String
return settingsBuilder.build();
}

public Settings remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName) {
absolutePath = randomRepoPath().toAbsolutePath();
absolutePath2 = randomRepoPath().toAbsolutePath();
if (segmentRepoName.equals(translogRepoName)) {
absolutePath2 = absolutePath;
}
return Settings.builder()
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(
String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, segmentRepoName),
"fs"
)
.put(
String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, segmentRepoName)
+ "location",
absolutePath.toString()
)
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName)
.put(
String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, translogRepoName),
"fs"
)
.put(
String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, translogRepoName)
+ "location",
absolutePath2.toString()
)
.build();
}

private Settings defaultIndexSettings() {
return Settings.builder()
.put(super.indexSettings())
Expand Down Expand Up @@ -236,6 +276,7 @@ protected void setupRepo(boolean startDedicatedClusterManager) {

@After
public void teardown() {
nodeAttributesSettings = null;
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.remotestore.RemoteStoreNode.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.action.admin.cluster.remotestore.RemoteStoreNode.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class RemoteStoreRepositoryRegistrationIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
Map<String, String> nodeAttributes = node.getAttributes();
String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));

String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name);
Map<String, String> settingsMap = node.getAttributes()
.keySet()
.stream()
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key)));

Settings.Builder settings = Settings.builder();
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));

return new RepositoryMetadata(name, type, settings.build());
}

private void assertRemoteStoreRepositoryOnAllNodes() {
RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0])
.state()
.metadata()
.custom(RepositoriesMetadata.TYPE);
RepositoryMetadata actualSegmentRepository = repositories.repository(REPOSITORY_NAME);
RepositoryMetadata actualTranslogRepository = repositories.repository(REPOSITORY_2_NAME);

for (String nodeName : internalCluster().getNodeNames()) {
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
DiscoveryNode node = clusterService.localNode();
RepositoryMetadata expectedSegmentRepository = buildRepositoryMetadata(node, REPOSITORY_NAME);
RepositoryMetadata expectedTranslogRepository = buildRepositoryMetadata(node, REPOSITORY_2_NAME);
assertTrue(actualSegmentRepository.equalsIgnoreGenerations(expectedSegmentRepository));
assertTrue(actualTranslogRepository.equalsIgnoreGenerations(expectedTranslogRepository));
}
}

public void testSingleNodeClusterRepositoryRegistration() {
internalCluster().startClusterManagerOnlyNode(remoteStoreNodeAttributes(REPOSITORY_NAME, REPOSITORY_2_NAME));
ensureStableCluster(1);

assertRemoteStoreRepositoryOnAllNodes();
}

public void testMultiNodeClusterRepositoryRegistration() {
Settings clusterSettings = remoteStoreNodeAttributes(REPOSITORY_NAME, REPOSITORY_2_NAME);
internalCluster().startClusterManagerOnlyNode(clusterSettings);
internalCluster().startNodes(3, clusterSettings);
ensureStableCluster(4);

assertRemoteStoreRepositoryOnAllNodes();
}

public void testMultiNodeClusterOnlyDataRepositoryRegistration() {
Settings clusterSettings = remoteStoreNodeAttributes(REPOSITORY_NAME, REPOSITORY_2_NAME);
internalCluster().startNodes(3, clusterSettings);
ensureStableCluster(3);

assertRemoteStoreRepositoryOnAllNodes();
}

public void testMultiNodeClusterRepositoryRegistrationWithMultipleMasters() {
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startNodes(3);
ensureStableCluster(6);

assertRemoteStoreRepositoryOnAllNodes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ private Map<String, String> validateSettingsAttributesNonNull(String settingsAtt
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> validateAttributeNonNull(key)));
}

// TODO: Add logic to mark these repository as System Repository once thats merged.
private RepositoryMetadata buildRepositoryMetadata(String name) {
String type = validateAttributeNonNull(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));
Map<String, String> settingsMap = validateSettingsAttributesNonNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public RemoteStoreService(Supplier<RepositoriesService> repositoriesService, Thr
this.threadPool = threadPool;
}

/**
* Performs repository verification during node startup post its creation by invoking verify method against
* repository mentioned. This verification will happen on a local node to validate if the node is able to connect
* to the repository.
*/
public void verifyRepository(List<Repository> repositories, DiscoveryNode localNode) {
for (Repository repository : repositories) {
String verificationToken = repository.startVerification();
Expand Down Expand Up @@ -105,6 +110,9 @@ public void verifyRepository(List<Repository> repositories, DiscoveryNode localN
}
}

/**
* Creates a repository during a node startup.
*/
public List<Repository> createRepositories(RemoteStoreNode node) {
List<Repository> repositories = new ArrayList<>();
for (RepositoryMetadata repositoryMetadata : node.getRepositoriesMetadata().repositories()) {
Expand Down Expand Up @@ -153,9 +161,14 @@ private ClusterState updateRepositoryMetadata(RepositoryMetadata newRepositoryMe
return ClusterState.builder(currentState).metadata(mdBuilder).build();
}

public ClusterState updateClusterStateRepositoriesMetadata(RemoteStoreNode node, ClusterState currentState) {
/**
* Updates repositories metadata in the cluster state if not already present. If a repository metadata for a
* repository is already present in the cluster state and if it's different then the joining remote store node
* repository metadata an exception will be thrown and the node will not be allowed to join the cluster.
*/
public ClusterState updateClusterStateRepositoriesMetadata(RemoteStoreNode joiningNode, ClusterState currentState) {
ClusterState newState = ClusterState.builder(currentState).build();
for (RepositoryMetadata newRepositoryMetadata : node.getRepositoriesMetadata().repositories()) {
for (RepositoryMetadata newRepositoryMetadata : joiningNode.getRepositoriesMetadata().repositories()) {
newState = updateRepositoryMetadata(newRepositoryMetadata, newState);
}
return newState;
Expand Down

0 comments on commit d97cc06

Please sign in to comment.