diff --git a/CHANGELOG.md b/CHANGELOG.md index 6480944a42ebc..7d9c71b5401bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + # CHANGELOG All notable changes to this project are documented in this file. @@ -15,6 +16,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Test] Add IAE test for deprecated edgeNGram analyzer name ([#5040](https://github.com/opensearch-project/OpenSearch/pull/5040)) - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211)) +- Support to fail open requests on search shard failures with weighted traffic routing ([#5072](https://github.com/opensearch-project/OpenSearch/pull/5072)) +- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366)) - Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495)) - Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615))) - Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) @@ -24,7 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for discovered cluster manager and remove local weights ([#5680](https://github.com/opensearch-project/OpenSearch/pull/5680)) - Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959)) - Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658)) -- Remove two permissions from server security policy and change extension reading ([#5768](https://github.com/opensearch-project/OpenSearch/pull/5768)) +- Revert 'Added jackson dependency to server' and change extension reading ([#5768](https://github.com/opensearch-project/OpenSearch/pull/5768)) ### Dependencies - Bumps `log4j-core` from 2.18.0 to 2.19.0 @@ -67,6 +70,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955)) +- Gracefully handle concurrent zone decommission action ([#5542](https://github.com/opensearch-project/OpenSearch/pull/5542)) ### Deprecated @@ -102,6 +106,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229)) - Added support to apply index create block ([#4603](https://github.com/opensearch-project/OpenSearch/issues/4603)) - Support request level durability for remote-backed indexes ([#5671](https://github.com/opensearch-project/OpenSearch/issues/5671)) +- Added new level to get health per awareness attribute in _cluster/health ([#5694](https://github.com/opensearch-project/OpenSearch/pull/5694)) ### Dependencies - Bumps `bcpg-fips` from 1.0.5.1 to 1.0.7.1 diff --git a/release-notes/opensearch.release-notes-2.4.0.md b/release-notes/opensearch.release-notes-2.4.0.md index f6a1378a55ee1..fb52656a1eb82 100644 --- a/release-notes/opensearch.release-notes-2.4.0.md +++ b/release-notes/opensearch.release-notes-2.4.0.md @@ -97,4 +97,5 @@ - Fix error handling while reading analyzer mapping rules ([6d20423](https://github.com/opensearch-project/OpenSearch/commit/6d20423f5920745463b1abc5f1daf6a786c41aa0)) ### Security +- Fixes CVE-2022-41917 ([#5141](https://github.com/opensearch-project/OpenSearch/pull/5141)) - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json index b3fc958891dfe..2fc9a2d4fd716 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json @@ -45,7 +45,8 @@ "options":[ "cluster", "indices", - "shards" + "shards", + "awareness_attributes" ], "default":"cluster", "description":"Specify the level of detail for returned information" @@ -106,6 +107,10 @@ "red" ], "description":"Wait until cluster is in a specific state" + }, + "awareness_attribute":{ + "type":"string", + "description":"The awareness attribute for which the health is required" } } } diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/clusterAwarenessHealthIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/clusterAwarenessHealthIT.java new file mode 100644 index 0000000000000..ba759b3dd52d1 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/clusterAwarenessHealthIT.java @@ -0,0 +1,220 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.cluster.awarenesshealth.ClusterAwarenessAttributesHealth; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.test.NodeRoles.onlyRole; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class clusterAwarenessHealthIT extends OpenSearchIntegTestCase { + + public void testAwarenessAttributeHealthSucceeded() { + createIndex("test"); + ensureGreen(); + + for (final String node : internalCluster().getNodeNames()) { + // a very high time out, which should never fire due to the local flag + logger.info("--> getting cluster health on [{}]", node); + final ClusterHealthResponse health = client(node).admin() + .cluster() + .prepareHealth() + .setTimeout("30s") + .setLevel("awareness_attributes") + .setAwarenessAttribute("zone") + .get("10s"); + + assertFalse("timed out on " + node, health.isTimedOut()); + assertThat( + "health status on " + node, + health.getClusterAwarenessHealth().getClusterAwarenessAttributesHealthMap().size(), + equalTo(1) + ); + } + } + + public void testAwarenessAttributeHealthValidationFailed() { + createIndex("test"); + ensureGreen(); + for (final String node : internalCluster().getNodeNames()) { + // a very high time out, which should never fire due to the local flag + logger.info("--> getting cluster health on [{}]", node); + try { + final ClusterHealthResponse health = client(node).admin() + .cluster() + .prepareHealth() + .setTimeout("30s") + .setAwarenessAttribute("zone") + .get("10s"); + } catch (Exception exception) { + assertThat( + exception.getMessage(), + equalTo("Validation Failed: 1: level=awareness_attributes is required with awareness_attribute parameter;") + ); + } + } + } + + public void testAwarenessAttributeHealthValidationFailedOnIndexHealth() { + createIndex("test"); + ensureGreen(); + for (final String node : internalCluster().getNodeNames()) { + // a very high time out, which should never fire due to the local flag + logger.info("--> getting cluster health on [{}]", node); + try { + final ClusterHealthResponse health = client(node).admin() + .cluster() + .prepareHealth("test") + .setTimeout("30s") + .setLevel("awareness_attributes") + .setAwarenessAttribute("zone") + .get("10s"); + } catch (Exception exception) { + assertThat( + exception.getMessage(), + equalTo("Validation Failed: 1: awareness_attribute is not a supported parameter with index health;") + ); + } + } + } + + public void testAwarenessAttributeHealth() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'd' & 'e' & 'f'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "d") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "e") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "f") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + List dataNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + final ClusterHealthResponse health = client(dataNodes.get(0)).admin() + .cluster() + .prepareHealth() + .setTimeout("30s") + .setLevel("awareness_attributes") + .setAwarenessAttribute("zone") + .get("10s"); + + ensureStableCluster(6); + assertThat(health.getClusterAwarenessHealth().getClusterAwarenessAttributesHealthMap().size(), equalTo(1)); + Map attributes = health.getClusterAwarenessHealth() + .getClusterAwarenessAttributesHealthMap(); + for (String attribute : attributes.keySet()) { + String attributeName = attributes.get(attribute).getAwarenessAttributeName(); + assertThat(attributeName, equalTo("zone")); + assertThat(attributes.get(attribute).getAwarenessAttributeHealthMap().size(), equalTo(3)); + } + } + + public void testAwarenessAttributeHealthAttributeDoesNotExists() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'd' & 'e' & 'f'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "d") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "e") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "f") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + List dataNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + final ClusterHealthResponse health = client(dataNodes.get(0)).admin() + .cluster() + .prepareHealth() + .setTimeout("30s") + .setLevel("awareness_attributes") + .setAwarenessAttribute("rack") + .get("10s"); + + ensureStableCluster(6); + assertThat(health.getClusterAwarenessHealth().getClusterAwarenessAttributesHealthMap().size(), equalTo(1)); + Map attributes = health.getClusterAwarenessHealth() + .getClusterAwarenessAttributesHealthMap(); + for (String attribute : attributes.keySet()) { + String attributeName = attributes.get(attribute).getAwarenessAttributeName(); + assertThat(attributeName, equalTo("rack")); + assertThat(attributes.get(attribute).getAwarenessAttributeHealthMap().size(), equalTo(0)); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 07580f17a67bc..e2eb08bd0969c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -44,6 +44,8 @@ import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; @@ -59,6 +61,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import static org.opensearch.test.NodeRoles.onlyRole; @@ -961,6 +964,114 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw ensureStableCluster(6, TimeValue.timeValueMinutes(2)); } + public void testConcurrentDecommissionAction() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + AtomicInteger numRequestAcknowledged = new AtomicInteger(); + AtomicInteger numRequestUnAcknowledged = new AtomicInteger(); + AtomicInteger numRequestFailed = new AtomicInteger(); + int concurrentRuns = randomIntBetween(5, 10); + TestThreadPool testThreadPool = null; + logger.info("--> starting {} concurrent decommission action in zone {}", concurrentRuns, 'a'); + try { + testThreadPool = new TestThreadPool(AwarenessAttributeDecommissionIT.class.getName()); + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + for (int i = 0; i < concurrentRuns; i++) { + Runnable thread = () -> { + logger.info("Triggering decommission action"); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + try { + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest) + .get(); + if (decommissionResponse.isAcknowledged()) { + numRequestAcknowledged.incrementAndGet(); + } else { + numRequestUnAcknowledged.incrementAndGet(); + } + } catch (Exception e) { + numRequestFailed.incrementAndGet(); + } + countDownLatch.countDown(); + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + assertEquals(concurrentRuns, numRequestAcknowledged.get() + numRequestUnAcknowledged.get() + numRequestFailed.get()); + assertEquals(concurrentRuns - 1, numRequestFailed.get()); + assertEquals(1, numRequestAcknowledged.get() + numRequestUnAcknowledged.get()); + } + private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener { final CountDownLatch doneLatch; diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 80e7f22c47c58..b0afbc6983c95 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -14,38 +14,60 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; +import org.opensearch.action.get.MultiGetRequest; +import org.opensearch.action.get.MultiGetResponse; +import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.search.stats.SearchStats; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.aggregations.bucket.terms.Terms; +import org.opensearch.snapshots.mockstore.MockRepository; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.disruption.NetworkDisruption; +import org.opensearch.test.transport.MockTransportService; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.search.aggregations.AggregationBuilders.terms; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3) public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase { + @Override - protected int numberOfReplicas() { - return 2; + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class); } public void testSearchWithWRRShardRouting() throws IOException { Settings commonSettings = Settings.builder() .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c") .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put("cluster.routing.weighted.fail_open", false) .build(); logger.info("--> starting 6 nodes on different zones"); @@ -102,7 +124,8 @@ public void testSearchWithWRRShardRouting() throws IOException { hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); } } - // search should not go to nodes in zone c + // search should not go to nodes in zone c with weight zero in case + // shard copies are available in other zones assertThat(hitNodes.size(), lessThanOrEqualTo(4)); DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); List nodeIdsFromZoneWithWeightZero = new ArrayList<>(); @@ -159,4 +182,618 @@ public void testSearchWithWRRShardRouting() throws IOException { } } + private Map> setupCluster(int nodeCountPerAZ, Settings commonSettings) { + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + Map> nodeMap = new HashMap<>(); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + nodeMap.put("a", nodes_in_zone_a); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + nodeMap.put("b", nodes_in_zone_b); + + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + nodeMap.put("c", nodes_in_zone_c); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + return nodeMap; + + } + + private void setUpIndexing(int numShards, int numReplicas) { + assertAcked( + prepareCreate("test").setSettings( + Settings.builder().put("index.number_of_shards", numShards).put("index.number_of_replicas", numReplicas) + ) + ); + ensureGreen(); + + logger.info("--> creating indices for test"); + for (int i = 0; i < 100; i++) { + client().prepareIndex("test").setId("" + i).setSource("field_" + i, "value_" + i).get(); + } + refresh("test"); + } + + private void setShardRoutingWeights(Map weights) { + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertEquals(response.isAcknowledged(), true); + } + + /** + * Shard routing request fail without fail-open if there are no healthy nodes in active az to serve request + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Data nodes in zone a and b are stopped, + * assertions are put to check that search requests fail. + * @throws Exception throws Exception + */ + public void testShardRoutingByStoppingDataNodes_FailOpenDisabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 1; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> data nodes in zone a and b are stopped"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0))); + ensureStableCluster(2); + + Set hitNodes = new HashSet<>(); + + // Make Search Requests + Future[] responses = new Future[50]; + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().smartClient().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).execute(); + } + int failedCount = 0; + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(0, searchResponse.getFailedShards()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + failedCount++; + } + } + + Assert.assertTrue(failedCount > 0); + logger.info("--> failed request count is [()]", failedCount); + assertNoSearchInAZ("c"); + } + + /** + * Shard routing request with fail open enabled is served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs (with fail open enabled) + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Data nodes in zone a and b are stopped, + * assertions are put to make sure shard search requests do not fail. + * @throws IOException throws exception + */ + public void testShardRoutingByStoppingDataNodes_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 1; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> data nodes in zone a and b are stopped"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0))); + ensureStableCluster(2); + + Set hitNodes = new HashSet<>(); + + // Make Search Requests + Future[] responses = new Future[50]; + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().smartClient().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).execute(); + } + int failedCount = 0; + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(0, searchResponse.getFailedShards()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + failedCount++; + } + } + + Assert.assertTrue(failedCount == 0); + assertSearchInAZ("c"); + } + + /** + * Shard routing request with fail open disabled is not served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs. + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Indices are created with one replica copy and network disruption is introduced, + * which makes data node in zone-a unresponsive. + * Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b. + * Assertions are put to make sure such shard search requests are not served by data node in zone c. + * @throws IOException throws exception + */ + public void testShardRoutingWithNetworkDisruption_FailOpenDisabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + int failedShardCount = 0; + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + failedShardCount += searchResponse.getFailedShards(); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + Assert.assertTrue(failedShardCount > 0); + // assert that no search request goes to az with weight zero + assertNoSearchInAZ("c"); + } + + /** + * Shard routing request is served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs.(with fail open enabled) + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Indices are created with one replica copy and network disruption is introduced, + * which makes data node in zone-a unresponsive. + * Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b. + * Assertions are put to make sure such shard search requests are served by data node in zone c. + * @throws IOException throws exception + */ + public void testShardRoutingWithNetworkDisruption_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + assertSearchInAZ("b"); + assertSearchInAZ("c"); + assertNoSearchInAZ("a"); + } + + private void assertNoSearchInAZ(String az) { + ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); + String dataNodeId = null; + + for (Iterator it = dataNodes.valuesIt(); it.hasNext();) { + DiscoveryNode node = it.next(); + if (node.getAttributes().get("zone").equals(az)) { + dataNodeId = node.getId(); + break; + } + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : nodeStats.getNodes()) { + SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (stat.getNode().isDataNode()) { + if (stat.getNode().getId().equals(dataNodeId)) { + assertEquals(0, searchStats.getQueryCount()); + assertEquals(0, searchStats.getFetchCount()); + } + } + } + } + + private void assertSearchInAZ(String az) { + ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); + String dataNodeId = null; + + for (Iterator it = dataNodes.valuesIt(); it.hasNext();) { + DiscoveryNode node = it.next(); + if (node.getAttributes().get("zone").equals(az)) { + dataNodeId = node.getId(); + break; + } + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : nodeStats.getNodes()) { + SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (stat.getNode().isDataNode()) { + if (stat.getNode().getId().equals(dataNodeId)) { + Assert.assertTrue(searchStats.getFetchCount() > 0L); + Assert.assertTrue(searchStats.getQueryCount() > 0L); + } + } + } + } + + /** + * Shard routing request is served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs. + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Indices are created with one replica copy and network disruption is introduced, + * which makes node in zone-a unresponsive. + * Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b. + * Assertions are put to make sure such shard search requests are served by data node in zone c. + * @throws IOException throws exception + */ + public void testSearchAggregationWithNetworkDisruption_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + assertAcked( + prepareCreate("index").setMapping("f", "type=keyword") + .setSettings(Settings.builder().put("index" + ".number_of_shards", 10).put("index" + ".number_of_replicas", 1)) + ); + + int numDocs = 10; + List docs = new ArrayList<>(); + for (int i = 0; i < numDocs; ++i) { + docs.add(client().prepareIndex("index").setSource("f", Integer.toString(i / 3))); + } + indexRandom(true, docs); + ensureGreen(); + refresh("index"); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[51]; + int size = 17; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("index") + .setSize(20) + .addAggregation(terms("f").field("f")) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + Aggregations aggregations = searchResponse.getAggregations(); + assertNotNull(aggregations); + Terms terms = aggregations.get("f"); + assertEquals(0, searchResponse.getFailedShards()); + assertEquals(Math.min(numDocs, 3L), terms.getBucketByKey("0").getDocCount()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + assertSearchInAZ("b"); + assertSearchInAZ("c"); + assertNoSearchInAZ("a"); + + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 60, + TimeUnit.SECONDS + ); + } + + /** + * MultiGet with fail open enabled. No request failure on network disruption + * @throws IOException throws exception + */ + public void testMultiGetWithNetworkDisruption_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + int index1, index2; + for (int i = 0; i < 50; i++) { + index1 = randomIntBetween(0, 9); + index2 = randomIntBetween(0, 9); + responses[i] = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "" + index1)) + .add(new MultiGetRequest.Item("test", "" + index2)) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + MultiGetResponse multiGetResponse = responses[i].get(); + assertThat(multiGetResponse.getResponses().length, equalTo(2)); + assertThat(multiGetResponse.getResponses()[0].isFailed(), equalTo(false)); + assertThat(multiGetResponse.getResponses()[1].isFailed(), equalTo(false)); + } catch (Exception t) { + fail("search should not fail"); + } + } + + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 60, + TimeUnit.SECONDS + ); + } + + /** + * MultiGet with fail open disabled. Assert that some requests do fail. + * @throws IOException throws exception + */ + public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + int index1, index2; + for (int i = 0; i < 50; i++) { + index1 = randomIntBetween(0, 9); + index2 = randomIntBetween(0, 9); + responses[i] = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "" + index1)) + .add(new MultiGetRequest.Item("test", "" + index2)) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + int failedCount = 0; + for (int i = 0; i < 50; i++) { + try { + MultiGetResponse multiGetResponse = responses[i].get(); + assertThat(multiGetResponse.getResponses().length, equalTo(2)); + if (multiGetResponse.getResponses()[0].isFailed() || multiGetResponse.getResponses()[1].isFailed()) { + failedCount++; + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + Assert.assertTrue(failedCount > 0); + + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 60, + TimeUnit.SECONDS + ); + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 7ec2cea769069..44f047137eece 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -21,8 +21,6 @@ import static org.opensearch.action.ValidateActions.addValidationError; /** - * Register decommission request. - *

* Registers a decommission request with decommission attribute and timeout * * @opensearch.internal @@ -32,7 +30,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest 0) { + return addValidationError("awareness_attribute is not a supported parameter with index health", null); + } else if (!level.equals(Level.AWARENESS_ATTRIBUTES) && awarenessAttribute != null) { + return addValidationError("level=awareness_attributes is required with awareness_attribute parameter", null); + } return null; } @@ -290,6 +332,7 @@ public ActionRequestValidationException validate() { public enum Level { CLUSTER, INDICES, - SHARDS + SHARDS, + AWARENESS_ATTRIBUTES } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java index 3874bf31e1e23..ac9697e06a5d7 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java @@ -151,4 +151,14 @@ public ClusterHealthRequestBuilder setWaitForEvents(Priority waitForEvents) { request.waitForEvents(waitForEvents); return this; } + + public ClusterHealthRequestBuilder setAwarenessAttribute(String awarenessAttribute) { + request.setAwarenessAttribute(awarenessAttribute); + return this; + } + + public ClusterHealthRequestBuilder setLevel(String level) { + request.setLevel(level); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java index 8cabdff02390c..b2b6c26c596b6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java @@ -32,8 +32,10 @@ package org.opensearch.action.admin.cluster.health; +import org.opensearch.Version; import org.opensearch.action.ActionResponse; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.awarenesshealth.ClusterAwarenessHealth; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.health.ClusterIndexHealth; import org.opensearch.cluster.health.ClusterStateHealth; @@ -41,6 +43,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.ConstructingObjectParser; import org.opensearch.common.xcontent.ObjectParser; @@ -187,6 +190,7 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo private boolean timedOut = false; private ClusterStateHealth clusterStateHealth; private ClusterHealthStatus clusterHealthStatus; + private ClusterAwarenessHealth clusterAwarenessHealth; public ClusterHealthResponse() {} @@ -200,6 +204,11 @@ public ClusterHealthResponse(StreamInput in) throws IOException { numberOfInFlightFetch = in.readInt(); delayedUnassignedShards = in.readInt(); taskMaxWaitingTime = in.readTimeValue(); + if (in.getVersion().onOrAfter(Version.V_2_5_0)) { + if (in.readBoolean()) { + clusterAwarenessHealth = new ClusterAwarenessHealth(in); + } + } } /** needed for plugins BWC */ @@ -225,6 +234,30 @@ public ClusterHealthResponse( this.clusterHealthStatus = clusterStateHealth.getStatus(); } + // Awareness Attribute health + public ClusterHealthResponse( + String clusterName, + ClusterState clusterState, + ClusterSettings clusterSettings, + String[] concreteIndices, + String awarenessAttributeName, + int numberOfPendingTasks, + int numberOfInFlightFetch, + int delayedUnassignedShards, + TimeValue taskMaxWaitingTime + ) { + this( + clusterName, + concreteIndices, + clusterState, + numberOfPendingTasks, + numberOfInFlightFetch, + delayedUnassignedShards, + taskMaxWaitingTime + ); + this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName); + } + /** * For XContent Parser and serialization tests */ @@ -357,6 +390,10 @@ public double getActiveShardsPercent() { return clusterStateHealth.getActiveShardsPercent(); } + public ClusterAwarenessHealth getClusterAwarenessHealth() { + return clusterAwarenessHealth; + } + public static ClusterHealthResponse readResponseFrom(StreamInput in) throws IOException { return new ClusterHealthResponse(in); } @@ -371,6 +408,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(numberOfInFlightFetch); out.writeInt(delayedUnassignedShards); out.writeTimeValue(taskMaxWaitingTime); + if (out.getVersion().onOrAfter(Version.V_2_5_0)) { + if (clusterAwarenessHealth != null) { + out.writeBoolean(true); + clusterAwarenessHealth.writeTo(out); + } else { + out.writeBoolean(false); + } + } } @Override @@ -406,6 +451,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws String level = params.param("level", "cluster"); boolean outputIndices = "indices".equals(level) || "shards".equals(level); + boolean outputAwarenessHealth = "awareness_attributes".equals(level); if (outputIndices) { builder.startObject(INDICES); @@ -414,6 +460,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.endObject(); } + + if (outputAwarenessHealth && clusterAwarenessHealth != null) { + clusterAwarenessHealth.toXContent(builder, params); + } + builder.endObject(); return builder; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java index d6b9f5f40f32c..a0b760d0dce28 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -457,6 +457,22 @@ private ClusterHealthResponse clusterHealth( } String[] concreteIndices; + if (request.level().equals(ClusterHealthRequest.Level.AWARENESS_ATTRIBUTES)) { + String awarenessAttribute = request.getAwarenessAttribute(); + concreteIndices = clusterState.getMetadata().getConcreteAllIndices(); + return new ClusterHealthResponse( + clusterState.getClusterName().value(), + clusterState, + clusterService.getClusterSettings(), + concreteIndices, + awarenessAttribute, + numberOfPendingTasks, + numberOfInFlightFetch, + UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), + pendingTaskTimeInQueue + ); + } + try { concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request); } catch (IndexNotFoundException e) { diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index 7d9ab4ff93f59..d57bbed59fe3e 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; @@ -261,16 +262,17 @@ private void onFailure(ShardRouting shardRouting, Exception e) { tryNext(e, false); } - private ShardRouting nextRoutingOrNull() { + private ShardRouting nextRoutingOrNull(Exception failure) { if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) { return null; } - ShardRouting next = shardsIt.get(shardIndex).nextOrNull(); + ShardRouting next = FailAwareWeightedRouting.getInstance().findNext(shardsIt.get(shardIndex), clusterService.state(), failure); + if (next != null) { return next; } moveToNextShard(); - return nextRoutingOrNull(); + return nextRoutingOrNull(failure); } private void moveToNextShard() { @@ -278,7 +280,7 @@ private void moveToNextShard() { } private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) { - ShardRouting shardRouting = nextRoutingOrNull(); + ShardRouting shardRouting = nextRoutingOrNull(lastFailure); if (shardRouting == null) { if (canMatchShard == false) { listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false)); diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 0876bf93a557b..1a37406e19f14 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -43,6 +43,7 @@ import org.opensearch.action.ShardOperationFailedException; import org.opensearch.action.support.TransportActions; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.Nullable; import org.opensearch.common.lease.Releasable; @@ -449,7 +450,8 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard onShardFailure(shardIndex, shard, e); - final SearchShardTarget nextShard = shardIt.nextOrNull(); + SearchShardTarget nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterState, e); + final boolean lastShard = nextShard == null; logger.debug( () -> new ParameterizedMessage( diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java b/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java index 72951f60c286e..45e4c1a54eeba 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java @@ -119,7 +119,7 @@ public String getClusterAlias() { return clusterAlias; } - SearchShardTarget nextOrNull() { + public SearchShardTarget nextOrNull() { final String nodeId = targetNodesIterator.nextOrNull(); if (nodeId != null) { return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices); diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java index a69853dc6a3c0..10645c744b2f3 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; @@ -250,7 +251,8 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int // we set the shard failure always, even if its the first in the replication group, and the next one // will work (it will just override it...) setFailure(shardIt, shardIndex, e); - ShardRouting nextShard = shardIt.nextOrNull(); + ShardRouting nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), e); + if (nextShard != null) { if (e != null) { if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java index df39bd29493dd..d8c4913e595a4 100644 --- a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardsIterator; import org.opensearch.cluster.service.ClusterService; @@ -244,7 +245,8 @@ private void perform(@Nullable final Exception currentFailure) { lastFailure = currentFailure; this.lastFailure = currentFailure; } - final ShardRouting shardRouting = shardIt.nextOrNull(); + ShardRouting shardRouting = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), currentFailure); + if (shardRouting == null) { Exception failure = lastFailure; if (failure == null || isShardNotAvailableException(failure)) { @@ -273,6 +275,7 @@ private void perform(@Nullable final Exception currentFailure) { ); } final Writeable.Reader reader = getResponseReader(); + ShardRouting finalShardRouting = shardRouting; transportService.sendRequest( node, transportShardAction, @@ -296,7 +299,7 @@ public void handleResponse(final Response response) { @Override public void handleException(TransportException exp) { - onFailure(shardRouting, exp); + onFailure(finalShardRouting, exp); } } ); diff --git a/server/src/main/java/org/opensearch/cluster/awarenesshealth/ClusterAwarenessHealth.java b/server/src/main/java/org/opensearch/cluster/awarenesshealth/ClusterAwarenessHealth.java index 7e7dbd8ad6502..1bbc5404c99be 100644 --- a/server/src/main/java/org/opensearch/cluster/awarenesshealth/ClusterAwarenessHealth.java +++ b/server/src/main/java/org/opensearch/cluster/awarenesshealth/ClusterAwarenessHealth.java @@ -16,17 +16,25 @@ import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Objects; /** * Cluster state Awareness health information */ -public class ClusterAwarenessHealth implements Writeable { +public class ClusterAwarenessHealth implements Writeable, ToXContentFragment, Iterable { - private final ClusterAwarenessAttributesHealth clusterAwarenessAttributesHealth; + private static final String AWARENESS_ATTRIBUTE = "awareness_attributes"; + private final Map clusterAwarenessAttributesHealthMap; /** * Creates cluster awareness health from cluster state. @@ -37,20 +45,47 @@ public class ClusterAwarenessHealth implements Writeable { */ public ClusterAwarenessHealth(ClusterState clusterState, ClusterSettings clusterSettings, String awarenessAttributeName) { // This property will govern if we need to show unassigned shard info or not - boolean displayUnassignedShardLevelInfo = canCalcUnassignedShards(clusterSettings, awarenessAttributeName); - clusterAwarenessAttributesHealth = new ClusterAwarenessAttributesHealth( - awarenessAttributeName, - displayUnassignedShardLevelInfo, - clusterState - ); + boolean displayUnassignedShardLevelInfo; + ClusterAwarenessAttributesHealth clusterAwarenessAttributesHealth; + clusterAwarenessAttributesHealthMap = new HashMap<>(); + List awarenessAttributeList = getAwarenessAttributeList(awarenessAttributeName, clusterSettings); + for (String awarenessAttribute : awarenessAttributeList) { + displayUnassignedShardLevelInfo = canCalcUnassignedShards(clusterSettings, awarenessAttribute); + clusterAwarenessAttributesHealth = new ClusterAwarenessAttributesHealth( + awarenessAttribute, + displayUnassignedShardLevelInfo, + clusterState + ); + clusterAwarenessAttributesHealthMap.put(awarenessAttribute, clusterAwarenessAttributesHealth); + } } public ClusterAwarenessHealth(final StreamInput in) throws IOException { - clusterAwarenessAttributesHealth = new ClusterAwarenessAttributesHealth(in); + int size = in.readVInt(); + if (size > 0) { + clusterAwarenessAttributesHealthMap = new HashMap<>(size); + for (int i = 0; i < size; i++) { + ClusterAwarenessAttributesHealth clusterAwarenessAttributesHealth = new ClusterAwarenessAttributesHealth(in); + clusterAwarenessAttributesHealthMap.put( + clusterAwarenessAttributesHealth.getAwarenessAttributeName(), + clusterAwarenessAttributesHealth + ); + } + } else { + clusterAwarenessAttributesHealthMap = Collections.emptyMap(); + } } - public ClusterAwarenessHealth(String awarenessAttribute) { - this.clusterAwarenessAttributesHealth = new ClusterAwarenessAttributesHealth(awarenessAttribute, Collections.emptyMap()); + private List getAwarenessAttributeList(String awarenessAttributeName, ClusterSettings clusterSettings) { + // Helper function to check if we need health for all or for one awareness attribute. + boolean displayAllAwarenessAttribute = awarenessAttributeName == null || awarenessAttributeName.isBlank(); + List awarenessAttributeList = new ArrayList<>(); + if (!displayAllAwarenessAttribute) { + awarenessAttributeList.add(awarenessAttributeName); + } else { + awarenessAttributeList = clusterSettings.get(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING); + } + return awarenessAttributeList; } private boolean canCalcUnassignedShards(ClusterSettings clusterSettings, String awarenessAttributeName) { @@ -73,22 +108,26 @@ private boolean canCalcUnassignedShards(ClusterSettings clusterSettings, String return allocationAwarenessBalance && forcedZoneSettingsExists; } - public ClusterAwarenessAttributesHealth getAwarenessAttributeHealth() { - return this.clusterAwarenessAttributesHealth; + public Map getClusterAwarenessAttributesHealthMap() { + return clusterAwarenessAttributesHealthMap; } @Override public void writeTo(final StreamOutput out) throws IOException { - clusterAwarenessAttributesHealth.writeTo(out); + int size = clusterAwarenessAttributesHealthMap.size(); + out.writeVInt(size); + if (size > 0) { + for (ClusterAwarenessAttributesHealth awarenessAttributeValueHealth : this) { + awarenessAttributeValueHealth.writeTo(out); + } + } } @Override public String toString() { - return "ClusterStateHealth{" - + ", clusterAwarenessAttributeHealth.awarenessAttributeName" - + (clusterAwarenessAttributesHealth == null ? "null" : clusterAwarenessAttributesHealth.getAwarenessAttributeName()) - + ", clusterAwarenessAttributeHealth.size=" - + (clusterAwarenessAttributesHealth == null ? "null" : clusterAwarenessAttributesHealth.getAwarenessAttributeHealthMap().size()) + return "ClusterAwarenessHealth{" + + "clusterAwarenessHealth.clusterAwarenessAttributesHealthMap.size=" + + (clusterAwarenessAttributesHealthMap == null ? "null" : clusterAwarenessAttributesHealthMap.size()) + '}'; } @@ -97,11 +136,26 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ClusterAwarenessHealth that = (ClusterAwarenessHealth) o; - return Objects.equals(clusterAwarenessAttributesHealth, that.clusterAwarenessAttributesHealth); + return clusterAwarenessAttributesHealthMap.size() == that.clusterAwarenessAttributesHealthMap.size(); } @Override public int hashCode() { - return Objects.hash(clusterAwarenessAttributesHealth); + return Objects.hash(clusterAwarenessAttributesHealthMap); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(AWARENESS_ATTRIBUTE); + for (ClusterAwarenessAttributesHealth awarenessAttributeValueHealth : this) { + awarenessAttributeValueHealth.toXContent(builder, params); + } + builder.endObject(); + return builder; + } + + @Override + public Iterator iterator() { + return clusterAwarenessAttributesHealthMap.values().iterator(); } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java index 395d733b8f1b1..30ef361b0c13e 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java @@ -37,6 +37,7 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable private final DecommissionAttribute decommissionAttribute; private DecommissionStatus status; + private String requestID; public static final String attributeType = "awareness"; /** @@ -45,18 +46,19 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable * @param decommissionAttribute attribute details * @param status current status of the attribute decommission */ - public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status) { + public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status, String requestId) { this.decommissionAttribute = decommissionAttribute; this.status = status; + this.requestID = requestId; } /** - * Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} + * Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} and request id * * @param decommissionAttribute attribute details */ - public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute) { - this(decommissionAttribute, DecommissionStatus.INIT); + public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, String requestID) { + this(decommissionAttribute, DecommissionStatus.INIT, requestID); } /** @@ -77,6 +79,15 @@ public DecommissionStatus status() { return this.status; } + /** + * Returns the request id of the decommission + * + * @return request id + */ + public String requestID() { + return this.requestID; + } + /** * Returns instance of the metadata with updated status * @param newStatus status to be updated with @@ -128,12 +139,13 @@ public boolean equals(Object o) { DecommissionAttributeMetadata that = (DecommissionAttributeMetadata) o; if (!status.equals(that.status)) return false; + if (!requestID.equals(that.requestID)) return false; return decommissionAttribute.equals(that.decommissionAttribute); } @Override public int hashCode() { - return Objects.hash(attributeType, decommissionAttribute, status); + return Objects.hash(attributeType, decommissionAttribute, status, requestID); } /** @@ -152,6 +164,7 @@ public Version getMinimalSupportedVersion() { public DecommissionAttributeMetadata(StreamInput in) throws IOException { this.decommissionAttribute = new DecommissionAttribute(in); this.status = DecommissionStatus.fromString(in.readString()); + this.requestID = in.readString(); } public static NamedDiff readDiffFrom(StreamInput in) throws IOException { @@ -165,12 +178,14 @@ public static NamedDiff readDiffFrom(StreamInput in) throws IOException public void writeTo(StreamOutput out) throws IOException { decommissionAttribute.writeTo(out); out.writeString(status.status()); + out.writeString(requestID); } public static DecommissionAttributeMetadata fromXContent(XContentParser parser) throws IOException { XContentParser.Token token; DecommissionAttribute decommissionAttribute = null; DecommissionStatus status = null; + String requestID = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { String currentFieldName = parser.currentName(); @@ -210,6 +225,13 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) ); } status = DecommissionStatus.fromString(parser.text()); + } else if ("requestID".equals(currentFieldName)) { + if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { + throw new OpenSearchParseException( + "failed to parse status of decommissioning, expected string but found unknown type" + ); + } + requestID = parser.text(); } else { throw new OpenSearchParseException( "unknown field found [{}], failed to parse the decommission attribute", @@ -218,7 +240,7 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) } } } - return new DecommissionAttributeMetadata(decommissionAttribute, status); + return new DecommissionAttributeMetadata(decommissionAttribute, status, requestID); } /** @@ -226,7 +248,7 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) */ @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - toXContent(decommissionAttribute, status, attributeType, builder, params); + toXContent(decommissionAttribute, status, requestID, attributeType, builder, params); return builder; } @@ -245,6 +267,7 @@ public EnumSet context() { public static void toXContent( DecommissionAttribute decommissionAttribute, DecommissionStatus status, + String requestID, String attributeType, XContentBuilder builder, ToXContent.Params params @@ -253,6 +276,7 @@ public static void toXContent( builder.field(decommissionAttribute.attributeName(), decommissionAttribute.attributeValue()); builder.endObject(); builder.field("status", status.status()); + builder.field("requestID", requestID); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 1ff2fb52175c7..79a91ee85f016 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -175,7 +175,8 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.validateNewStatus(decommissionStatus); decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttributeMetadata.decommissionAttribute(), - decommissionStatus + decommissionStatus, + decommissionAttributeMetadata.requestID() ); ClusterState newState = ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java index 8305bda545998..83589a2f55685 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -32,9 +32,10 @@ public class DecommissionHelper { static ClusterState registerDecommissionAttributeInClusterState( ClusterState currentState, - DecommissionAttribute decommissionAttribute + DecommissionAttribute decommissionAttribute, + String requestID ) { - DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, requestID); return ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 4139ad8d36ed0..2e27898dd413c 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -28,6 +28,7 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; +import org.opensearch.common.UUIDs; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -128,7 +129,7 @@ private void setMaxVotingConfigExclusions(int maxVotingConfigExclusions) { * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration * - * @param decommissionRequest decommission request Object + * @param decommissionRequest request for decommission action * @param listener register decommission listener */ public void startDecommissionAction( @@ -144,12 +145,19 @@ public void startDecommissionAction( public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); + if (decommissionRequest.requestID() == null) { + decommissionRequest.setRequestID(UUIDs.randomBase64UUID()); + } DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); // check that request is eligible to proceed and attribute is weighed away - ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); + ensureEligibleRequest(decommissionAttributeMetadata, decommissionRequest); ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); - ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); + ClusterState newState = registerDecommissionAttributeInClusterState( + currentState, + decommissionAttribute, + decommissionRequest.requestID() + ); // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion nodeIdsToBeExcluded = filterNodesWithDecommissionAttribute(currentState, decommissionAttribute, true).stream() .map(DiscoveryNode::getId) @@ -188,6 +196,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT); + assert decommissionAttributeMetadata.requestID().equals(decommissionRequest.requestID()); assert newState.getVotingConfigExclusions() .stream() .map(CoordinationMetadata.VotingConfigExclusion::getNodeId) @@ -294,6 +303,7 @@ public void onTimeout(TimeValue timeout) { // the action again (retry) void drainNodesWithDecommissionedAttribute(DecommissionRequest decommissionRequest) { ClusterState state = clusterService.getClusterApplierService().state(); + assert state.metadata().decommissionAttributeMetadata().requestID().equals(decommissionRequest.requestID()); Set decommissionedNodes = filterNodesWithDecommissionAttribute( state, decommissionRequest.getDecommissionAttribute(), @@ -456,22 +466,31 @@ private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState st private static void ensureEligibleRequest( DecommissionAttributeMetadata decommissionAttributeMetadata, - DecommissionAttribute requestedDecommissionAttribute + DecommissionRequest decommissionRequest ) { - String msg = null; + String msg; + DecommissionAttribute requestedDecommissionAttribute = decommissionRequest.getDecommissionAttribute(); if (decommissionAttributeMetadata != null) { // check if the same attribute is registered and handle it accordingly if (decommissionAttributeMetadata.decommissionAttribute().equals(requestedDecommissionAttribute)) { switch (decommissionAttributeMetadata.status()) { - // for INIT and FAILED - we are good to process it again + // for INIT - check if it is eligible internal retry case INIT: + if (decommissionRequest.requestID().equals(decommissionAttributeMetadata.requestID()) == false) { + throw new DecommissioningFailedException( + requestedDecommissionAttribute, + "same request is already in status [INIT]" + ); + } + break; + // for FAILED - we are good to process it again case FAILED: break; case DRAINING: case IN_PROGRESS: case SUCCESSFUL: msg = "same request is already in status [" + decommissionAttributeMetadata.status() + "]"; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); default: throw new IllegalStateException( "unknown status [" + decommissionAttributeMetadata.status() + "] currently registered in metadata" @@ -484,7 +503,7 @@ private static void ensureEligibleRequest( msg = "one awareness attribute [" + decommissionAttributeMetadata.decommissionAttribute().toString() + "] already successfully decommissioned, recommission before triggering another decommission"; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); case DRAINING: case IN_PROGRESS: case INIT: @@ -492,7 +511,7 @@ private static void ensureEligibleRequest( msg = "there's an inflight decommission request for attribute [" + decommissionAttributeMetadata.decommissionAttribute().toString() + "] is in progress, cannot process this request"; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); case FAILED: break; default: @@ -502,10 +521,6 @@ private static void ensureEligibleRequest( } } } - - if (msg != null) { - throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); - } } private ActionListener statusUpdateListener() { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index 320f75a9f2ada..c7136785606f6 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -39,6 +39,7 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable internalErrorRestStatusList = List.of( + RestStatus.INTERNAL_SERVER_ERROR, + RestStatus.BAD_GATEWAY, + RestStatus.SERVICE_UNAVAILABLE, + RestStatus.GATEWAY_TIMEOUT + ); + + public static FailAwareWeightedRouting getInstance() { + return INSTANCE; + } + + /** + * * + * @return true if exception is due to cluster availability issues + */ + private boolean isInternalFailure(Exception exception) { + if (exception instanceof OpenSearchException) { + // checking for 5xx failures + return internalErrorRestStatusList.contains(((OpenSearchException) exception).status()); + } + return false; + } + + /** + * This function checks if the shard is present in data node with weighted routing weight set to 0, + * In such cases we fail open, if shard search request for the shard from other shard copies fail with non + * retryable exception. + * + * @param nodeId the node with the shard copy + * @return true if the node has attribute value with shard routing weight set to zero, else false + */ + private boolean isWeighedAway(String nodeId, ClusterState clusterState) { + DiscoveryNode node = clusterState.nodes().get(nodeId); + WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata(); + if (weightedRoutingMetadata != null) { + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (weightedRouting != null && weightedRouting.isSet()) { + // Fetch weighted routing attributes with weight set as zero + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT) + .map(Map.Entry::getKey); + + for (Object key : keys.toArray()) { + if (node.getAttributes().get(weightedRouting.attributeName()).equals(key.toString())) { + return true; + } + } + } + } + return false; + } + + /** + * This function returns next shard copy to retry search request in case of failure from previous copy returned + * by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard + * routing weight set to zero + * + * @param shardIt Shard Iterator containing order in which shard copies for a shard need to be requested + * @return the next shard copy + */ + public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterState clusterState, Exception exception) { + SearchShardTarget next = shardIt.nextOrNull(); + while (next != null && isWeighedAway(next.getNodeId(), clusterState)) { + SearchShardTarget nextShard = next; + if (canFailOpen(nextShard.getShardId(), exception, clusterState)) { + logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception); + break; + } + next = shardIt.nextOrNull(); + } + return next; + } + + /** + * This function returns next shard copy to retry search request in case of failure from previous copy returned + * by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard + * routing weight set to zero + * + * @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested + * @return the next shard copy + */ + public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception) { + ShardRouting next = shardsIt.nextOrNull(); + + while (next != null && isWeighedAway(next.currentNodeId(), clusterState)) { + ShardRouting nextShard = next; + if (canFailOpen(nextShard.shardId(), exception, clusterState)) { + logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception); + break; + } + next = shardsIt.nextOrNull(); + } + return next; + } + + /** + * * + * @return true if can fail open ie request shard copies present in nodes with weighted shard + * routing weight set to zero + */ + private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) { + return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId); + } + + private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) { + List shards = clusterState.routingTable().shardRoutingTable(shardId).shards(); + for (ShardRouting shardRouting : shards) { + if (!shardRouting.active()) { + return true; + } + } + return false; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 9026e7068e9fe..207570c1d56b2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -32,6 +32,9 @@ package org.opensearch.cluster.routing; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.Nullable; @@ -57,6 +60,8 @@ import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.emptyMap; @@ -89,6 +94,8 @@ public class IndexShardRoutingTable implements Iterable { private volatile Map> activeShardsByWeight = emptyMap(); private volatile Map> initializingShardsByWeight = emptyMap(); + private static final Logger logger = LogManager.getLogger(IndexShardRoutingTable.class); + /** * The initializing list, including ones that are initializing on a target node because of relocation. * If we can come up with a better variable name, it would be nice... @@ -305,19 +312,50 @@ public ShardIterator activeInitializingShardsRankedIt( * * @param weightedRouting entity * @param nodes discovered nodes in the cluster + * @param isFailOpenEnabled if true, shards search requests in case of failures are tried on shard copies present + * in node attribute value with weight zero * @return an iterator over active and initializing shards, ordered by weighted round-robin * scheduling policy. Making sure that initializing shards are the last to iterate through. */ - public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) { + public ShardIterator activeInitializingShardsWeightedIt( + WeightedRouting weightedRouting, + DiscoveryNodes nodes, + double defaultWeight, + boolean isFailOpenEnabled + ) { final int seed = shuffler.nextSeed(); List ordered = new ArrayList<>(); List orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight); + List orderedListWithDistinctShards; ordered.addAll(shuffler.shuffle(orderedActiveShards, seed)); if (!allInitializingShards.isEmpty()) { List orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); ordered.addAll(orderedInitializingShards); } - return new PlainShardIterator(shardId, ordered); + + // append shards for attribute value with weight zero, so that shard search requests can be tried on + // shard copies in case of request failure from other attribute values. + if (isFailOpenEnabled) { + try { + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT) + .map(Map.Entry::getKey); + keys.forEach(key -> { + ShardIterator iterator = onlyNodeSelectorActiveInitializingShardsIt(weightedRouting.attributeName() + ":" + key, nodes); + while (iterator.remaining() > 0) { + ordered.add(iterator.nextOrNull()); + } + }); + } catch (IllegalArgumentException e) { + // this exception is thrown by {@link onlyNodeSelectorActiveInitializingShardsIt} in case count of shard + // copies found is zero + logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId); + } + } + orderedListWithDistinctShards = ordered.stream().distinct().collect(Collectors.toList()); + return new PlainShardIterator(shardId, orderedListWithDistinctShards); } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index a4b4cc961fade..d7df1a2c2181b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -83,10 +83,18 @@ public class OperationRouting { Setting.Property.Dynamic, Setting.Property.NodeScope ); + + public static final Setting WEIGHTED_ROUTING_FAILOPEN_ENABLED = Setting.boolSetting( + "cluster.routing.weighted.fail_open", + true, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); private volatile List awarenessAttributes; private volatile boolean useAdaptiveReplicaSelection; private volatile boolean ignoreAwarenessAttr; private volatile double weightedRoutingDefaultWeight; + private volatile boolean isFailOpenEnabled; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { // whether to ignore awareness attributes when routing requests @@ -98,9 +106,11 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { ); this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings); + this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings); clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes); clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight); + clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled); } void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { @@ -115,6 +125,10 @@ void setWeightedRoutingDefaultWeight(double weightedRoutingDefaultWeight) { this.weightedRoutingDefaultWeight = weightedRoutingDefaultWeight; } + void setFailOpenEnabled(boolean isFailOpenEnabled) { + this.isFailOpenEnabled = isFailOpenEnabled; + } + public boolean isIgnoreAwarenessAttr() { return ignoreAwarenessAttr; } @@ -328,7 +342,8 @@ private ShardIterator shardRoutings( return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, - getWeightedRoutingDefaultWeight() + getWeightedRoutingDefaultWeight(), + isFailOpenEnabled ); } else if (ignoreAwarenessAttributes()) { if (useAdaptiveReplicaSelection) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index ea27cb82b8c0f..55e88d2d09d8c 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -540,6 +540,7 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING, OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT, + OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index f6c5bf7640a73..b796db8d4ca44 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -226,6 +226,9 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { store.decRef(); } } + + @Override + public void setMinSeqNoToKeep(long seqNo) {} }; } catch (IOException ex) { throw new RuntimeException(ex); diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index a40048e7b9781..bc135a41898a6 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -712,7 +712,7 @@ public CheckpointState(StreamInput in) throws IOException { this.globalCheckpoint = in.readZLong(); this.inSync = in.readBoolean(); this.tracked = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.CURRENT)) { + if (in.getVersion().onOrAfter(Version.V_2_5_0)) { this.replicated = in.readBoolean(); } else { this.replicated = true; @@ -725,7 +725,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeZLong(globalCheckpoint); out.writeBoolean(inSync); out.writeBoolean(tracked); - out.writeBoolean(replicated); + if (out.getVersion().onOrAfter(Version.V_2_5_0)) { + out.writeBoolean(replicated); + } } /** diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 838cbc7f25e09..9263bd0c894bd 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -144,6 +144,9 @@ public void afterRefresh(boolean didRefresh) { .filter(file -> !localSegmentsPostRefresh.contains(file)) .collect(Collectors.toSet()) .forEach(localSegmentChecksumMap::remove); + final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()) + .lastRefreshedCheckpoint(); + indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); } } } catch (EngineException e) { diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 574fcc54bafa6..80966fbd5bd96 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -289,6 +289,11 @@ public void ensureCanFlush() { } } + @Override + public void setMinSeqNoToKeep(long seqNo) { + translog.setMinSeqNoToKeep(seqNo); + } + /** * Reads operations from the translog * @param location location of translog diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 4e46ca4b3e79e..cea38b4fbc781 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -95,6 +95,9 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T @Override public void ensureCanFlush() {} + @Override + public void setMinSeqNoToKeep(long seqNo) {} + @Override public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { return 0; diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 629b1bebccac8..e7d6b509b1c4c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -47,6 +47,8 @@ public class RemoteFsTranslog extends Translog { private final FileTransferTracker fileTransferTracker; private volatile long maxRemoteTranslogGenerationUploaded; + private volatile long minSeqNoToKeep; + public RemoteFsTranslog( TranslogConfig config, String translogUUID, @@ -282,4 +284,42 @@ public void close() throws IOException { } } } + + protected long getMinReferencedGen() throws IOException { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); + long minReferencedGen = Math.min( + deletionPolicy.minTranslogGenRequired(readers, current), + minGenerationForSeqNo(Math.min(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, minSeqNoToKeep), current, readers) + ); + assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + + minReferencedGen + + "] but the lowest gen available is [" + + getMinFileGeneration() + + "]"; + assert minReferencedGen <= currentFileGeneration() : "deletion policy requires a minReferenceGen of [" + + minReferencedGen + + "] which is higher than the current generation [" + + currentFileGeneration() + + "]"; + return minReferencedGen; + } + + protected void setMinSeqNoToKeep(long seqNo) { + if (seqNo < this.minSeqNoToKeep) { + throw new IllegalArgumentException( + "min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]" + ); + } + this.minSeqNoToKeep = seqNo; + } + + @Override + void deleteReaderFiles(TranslogReader reader) { + try { + translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), reader.generation); + } catch (IOException ignored) { + logger.error("Exception {} while deleting generation {}", ignored, reader.generation); + } + super.deleteReaderFiles(reader); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index f5a9faff8bfff..3318f6858dc82 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -1684,7 +1684,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { } } - private static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List readers) { + static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List readers) { long minGen = writer.generation; for (final TranslogReader reader : readers) { if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { @@ -1781,7 +1781,7 @@ public void trimUnreferencedReaders() throws IOException { } } - private long getMinReferencedGen() throws IOException { + protected long getMinReferencedGen() throws IOException { assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); long minReferencedGen = Math.min( deletionPolicy.minTranslogGenRequired(readers, current), @@ -1800,6 +1800,12 @@ private long getMinReferencedGen() throws IOException { return minReferencedGen; } + /* + Min Seq number required in translog to restore the complete data . + This might be required when segments are persisted via other mechanism than flush. + */ + protected void setMinSeqNoToKeep(long seqNo) {} + /** * deletes all files associated with a reader. package-private to be able to simulate node failures at this point */ diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 39f819da4b018..e2818dd702d87 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -119,4 +119,11 @@ public interface TranslogManager { * Checks if the translog has a pending recovery */ void ensureCanFlush(); + + /** + * + * @param seqNo : operations greater or equal to seqNo should be persisted + * This might be required when segments are persisted via other mechanism than flush. + */ + void setMinSeqNoToKeep(long seqNo); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java index 178cdc110ec3b..1250184ff656b 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java @@ -359,7 +359,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) { * * Note: any exception during the sync process will be interpreted as a tragic exception and the writer will be closed before * raising the exception. - * @return + * @return true if this call caused an actual sync operation */ public boolean sync() throws IOException { return syncUpTo(Long.MAX_VALUE); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 3a8e77d4cc1fc..78a26baa052ef 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -75,6 +76,11 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I return blobStore.blobContainer((BlobPath) path).readBlob(fileName); } + @Override + public void deleteBlobs(Iterable path, List fileNames) throws IOException { + blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); + } + @Override public Set listAll(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).listBlobs().keySet(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index e950be0993e83..5338142afed33 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -55,6 +55,11 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { add(fileSnapshot.getName(), TransferState.FAILED); } + @Override + public void onDelete(String name) { + fileTransferTracker.remove(name); + } + public Set exclusionFilter(Set original) { return original.stream() .filter(fileSnapshot -> fileTransferTracker.get(fileSnapshot.getName()) != TransferState.SUCCESS) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 6a67de99287fd..5745d0838efb3 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -13,6 +13,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Set; /** @@ -42,6 +43,8 @@ void uploadBlobAsync( */ void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; + void deleteBlobs(Iterable path, List fileNames) throws IOException; + /** * Lists the files * @param path : the path to list @@ -52,8 +55,8 @@ void uploadBlobAsync( /** * - * @param path - * @param fileName + * @param path the remote path from where download should be made + * @param fileName the name of the file * @return inputstream of the remote file * @throws IOException the exception while reading the data */ diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 6750eedd86180..35ccb4ccf17db 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -194,4 +194,15 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) translogTransferMetadata.getPrimaryTerm() ); } + + public void deleteTranslog(long primaryTerm, long generation) throws IOException { + String ckpFileName = Translog.getCommitCheckpointFileName(generation); + String translogFilename = Translog.getFilename(generation); + // ToDo - Take care of metadata file cleanup + // https://github.com/opensearch-project/OpenSearch/issues/5677 + fileTransferTracker.onDelete(ckpFileName); + fileTransferTracker.onDelete(translogFilename); + List files = List.of(ckpFileName, translogFilename); + transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java index 939b56f109a36..c489e4b9a5809 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java @@ -11,7 +11,7 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; /** - * The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot} + * The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot} or deletion of file * * @opensearch.internal */ @@ -29,4 +29,6 @@ public interface FileTransferListener { * @param e the exception while processing the {@link TransferFileSnapshot} */ void onFailure(TransferFileSnapshot fileSnapshot, Exception e); + + void onDelete(String name); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthAction.java index cf85e5aa4e902..d8a629821954b 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthAction.java @@ -94,6 +94,14 @@ public static ClusterHealthRequest fromRequest(final RestRequest request) { ); parseDeprecatedMasterTimeoutParameter(clusterHealthRequest, request, deprecationLogger, "cluster_health"); clusterHealthRequest.timeout(request.paramAsTime("timeout", clusterHealthRequest.timeout())); + String awarenessAttribute = request.param("awareness_attribute"); + if (awarenessAttribute != null) { + clusterHealthRequest.setAwarenessAttribute(awarenessAttribute); + } + String level = request.param("level"); + if (level != null) { + clusterHealthRequest.setLevel(level); + } String waitForStatus = request.param("wait_for_status"); if (waitForStatus != null) { clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT))); diff --git a/server/src/test/java/org/opensearch/cluster/awarenesshealth/ClusterAwarenessHealthTests.java b/server/src/test/java/org/opensearch/cluster/awarenesshealth/ClusterAwarenessHealthTests.java index a5a4799043496..8d69480d42d23 100644 --- a/server/src/test/java/org/opensearch/cluster/awarenesshealth/ClusterAwarenessHealthTests.java +++ b/server/src/test/java/org/opensearch/cluster/awarenesshealth/ClusterAwarenessHealthTests.java @@ -43,6 +43,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -130,9 +131,11 @@ public void testClusterHealthWithNoAwarenessAttribute() throws IOException { ClusterAwarenessHealth clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, "zone"); clusterAwarenessHealth = serializeResponse(clusterAwarenessHealth); - ClusterAwarenessAttributesHealth attributeHealth = clusterAwarenessHealth.getAwarenessAttributeHealth(); - assertEquals(0, attributeHealth.getAwarenessAttributeHealthMap().size()); - assertEquals("zone", attributeHealth.getAwarenessAttributeName()); + Map attributeHealthMap = clusterAwarenessHealth.getClusterAwarenessAttributesHealthMap(); + for (String attributesName : attributeHealthMap.keySet()) { + assertEquals("zone", attributesName); + assertEquals(0, attributeHealthMap.get(attributesName).getAwarenessAttributeHealthMap().size()); + } } ClusterAwarenessHealth serializeResponse(ClusterAwarenessHealth clusterAwarenessHealth) throws IOException { @@ -178,9 +181,11 @@ public void testClusterHealthWithAwarenessAttributeAndNoNodeAttribute() throws I ClusterAwarenessHealth clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, "zone"); clusterAwarenessHealth = serializeResponse(clusterAwarenessHealth); - ClusterAwarenessAttributesHealth attributeHealth = clusterAwarenessHealth.getAwarenessAttributeHealth(); - assertEquals(0, attributeHealth.getAwarenessAttributeHealthMap().size()); - assertEquals("zone", attributeHealth.getAwarenessAttributeName()); + Map attributeHealthMap = clusterAwarenessHealth.getClusterAwarenessAttributesHealthMap(); + for (String attributesName : attributeHealthMap.keySet()) { + assertEquals("zone", attributesName); + assertEquals(0, attributeHealthMap.get(attributesName).getAwarenessAttributeHealthMap().size()); + } } public void testClusterHealthWithAwarenessAttributeAndNodeAttribute() throws IOException { @@ -246,12 +251,15 @@ public void testClusterHealthWithAwarenessAttributeAndNodeAttribute() throws IOE ClusterAwarenessHealth clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, "zone"); clusterAwarenessHealth = serializeResponse(clusterAwarenessHealth); - ClusterAwarenessAttributesHealth attributeHealth = clusterAwarenessHealth.getAwarenessAttributeHealth(); - assertEquals("zone", attributeHealth.getAwarenessAttributeName()); - assertEquals(3, attributeHealth.getAwarenessAttributeHealthMap().size()); - for (ClusterAwarenessAttributeValueHealth clusterAwarenessAttributeValueHealth : attributeHealth.getAwarenessAttributeHealthMap() - .values()) { - assertEquals(-1, clusterAwarenessAttributeValueHealth.getUnassignedShards()); + Map attributeHealthMap = clusterAwarenessHealth.getClusterAwarenessAttributesHealthMap(); + for (String attributesName : attributeHealthMap.keySet()) { + assertEquals("zone", attributesName); + assertEquals(3, attributeHealthMap.get(attributesName).getAwarenessAttributeHealthMap().size()); + for (ClusterAwarenessAttributeValueHealth clusterAwarenessAttributeValueHealth : attributeHealthMap.get(attributesName) + .getAwarenessAttributeHealthMap() + .values()) { + assertEquals(-1, clusterAwarenessAttributeValueHealth.getUnassignedShards()); + } } } @@ -320,12 +328,15 @@ public void testClusterHealthWithAwarenessAttributeAndReplicaEnforcementEnabled( ClusterAwarenessHealth clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, "zone"); clusterAwarenessHealth = serializeResponse(clusterAwarenessHealth); - ClusterAwarenessAttributesHealth attributeHealth = clusterAwarenessHealth.getAwarenessAttributeHealth(); - assertEquals("zone", attributeHealth.getAwarenessAttributeName()); - assertEquals(3, attributeHealth.getAwarenessAttributeHealthMap().size()); - for (ClusterAwarenessAttributeValueHealth clusterAwarenessAttributeValueHealth : attributeHealth.getAwarenessAttributeHealthMap() - .values()) { - assertEquals("1.0", String.valueOf(clusterAwarenessAttributeValueHealth.getWeight())); + Map attributeHealthMap = clusterAwarenessHealth.getClusterAwarenessAttributesHealthMap(); + for (String attributesName : attributeHealthMap.keySet()) { + assertEquals("zone", attributesName); + assertEquals(3, attributeHealthMap.get(attributesName).getAwarenessAttributeHealthMap().size()); + for (ClusterAwarenessAttributeValueHealth clusterAwarenessAttributeValueHealth : attributeHealthMap.get(attributesName) + .getAwarenessAttributeHealthMap() + .values()) { + assertEquals(1.0, clusterAwarenessAttributeValueHealth.getWeight(), 0.0); + } } } @@ -393,12 +404,16 @@ public void testClusterHealthWithAwarenessAttributeAndReplicaEnforcementNotEnabl ClusterAwarenessHealth clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, "zone"); clusterAwarenessHealth = serializeResponse(clusterAwarenessHealth); - ClusterAwarenessAttributesHealth attributeHealth = clusterAwarenessHealth.getAwarenessAttributeHealth(); - assertEquals(3, attributeHealth.getAwarenessAttributeHealthMap().size()); - for (ClusterAwarenessAttributeValueHealth clusterAwarenessAttributeValueHealth : attributeHealth.getAwarenessAttributeHealthMap() - .values()) { - assertEquals(-1, clusterAwarenessAttributeValueHealth.getUnassignedShards()); - assertEquals("1.0", String.valueOf(clusterAwarenessAttributeValueHealth.getWeight())); + Map attributeHealthMap = clusterAwarenessHealth.getClusterAwarenessAttributesHealthMap(); + for (String attributesName : attributeHealthMap.keySet()) { + assertEquals("zone", attributesName); + assertEquals(3, attributeHealthMap.get(attributesName).getAwarenessAttributeHealthMap().size()); + for (ClusterAwarenessAttributeValueHealth clusterAwarenessAttributeValueHealth : attributeHealthMap.get(attributesName) + .getAwarenessAttributeHealthMap() + .values()) { + assertEquals(-1, clusterAwarenessAttributeValueHealth.getUnassignedShards()); + assertEquals(1.0, clusterAwarenessAttributeValueHealth.getWeight(), 0.0); + } } } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index d7253e6f57b38..f6401558221b0 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -240,7 +240,8 @@ public void testPreventJoinClusterWithDecommission() { ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - decommissionStatus + decommissionStatus, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-1")); @@ -257,7 +258,8 @@ public void testJoinClusterWithDifferentDecommission() { ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - decommissionStatus + decommissionStatus, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); @@ -277,7 +279,8 @@ public void testJoinFailedForDecommissionedNode() throws Exception { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone1"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.SUCCESSFUL + DecommissionStatus.SUCCESSFUL, + randomAlphaOfLength(10) ); final ClusterState clusterManagerClusterState = ClusterState.builder(ClusterName.DEFAULT) .nodes( @@ -315,7 +318,8 @@ public void testJoinClusterWithDecommissionFailed() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.FAILED + DecommissionStatus.FAILED, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 18a7b892a424c..ec8d5bcf1c687 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -847,7 +847,8 @@ private static ClusterState initialStateWithDecommissionedAttribute( ) { DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.SUCCESSFUL + DecommissionStatus.SUCCESSFUL, + randomAlphaOfLength(10) ); return ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index cf92130095e12..9736355629fd9 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -19,8 +19,6 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.ClusterStateObserver; -import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; @@ -55,7 +53,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.sameInstance; import static org.opensearch.cluster.ClusterState.builder; import static org.opensearch.cluster.OpenSearchAllocationTestCase.createAllocationService; import static org.opensearch.test.ClusterServiceUtils.createClusterService; @@ -237,7 +234,8 @@ private void verifyDecommissionStatusTransition(DecommissionStatus currentStatus final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( new DecommissionAttribute("zone", "zone-1"), - currentStatus + currentStatus, + randomAlphaOfLength(10) ); ClusterState state = clusterService.state(); Metadata metadata = state.metadata(); @@ -265,60 +263,6 @@ public void onFailure(Exception e) { assertEquals(decommissionAttributeMetadata.status(), newStatus); } - private static class AdjustConfigurationForExclusions implements ClusterStateObserver.Listener { - - final CountDownLatch doneLatch; - - AdjustConfigurationForExclusions(CountDownLatch latch) { - this.doneLatch = latch; - } - - @Override - public void onNewClusterState(ClusterState state) { - clusterService.getClusterManagerService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - assertThat(currentState, sameInstance(state)); - final Set votingNodeIds = new HashSet<>(); - currentState.nodes().forEach(n -> votingNodeIds.add(n.getId())); - currentState.getVotingConfigExclusions().forEach(t -> votingNodeIds.remove(t.getNodeId())); - final CoordinationMetadata.VotingConfiguration votingConfiguration = new CoordinationMetadata.VotingConfiguration( - votingNodeIds - ); - return builder(currentState).metadata( - Metadata.builder(currentState.metadata()) - .coordinationMetadata( - CoordinationMetadata.builder(currentState.coordinationMetadata()) - .lastAcceptedConfiguration(votingConfiguration) - .lastCommittedConfiguration(votingConfiguration) - .build() - ) - ).build(); - } - - @Override - public void onFailure(String source, Exception e) { - throw new AssertionError("unexpected failure", e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - doneLatch.countDown(); - } - }); - } - - @Override - public void onClusterServiceClose() { - throw new AssertionError("unexpected close"); - } - - @Override - public void onTimeout(TimeValue timeout) { - throw new AssertionError("unexpected timeout"); - } - } - private ClusterState addNodes(ClusterState clusterState, String zone, String... nodeIds) { DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newNode(nodeId, singletonMap("zone", zone)))); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java index ab2d8218ec97d..94833e15f55d0 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java @@ -38,7 +38,11 @@ public class DecommissionHelperTests extends OpenSearchTestCase { public void testRegisterAndDeleteDecommissionAttributeInClusterState() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone2"); - ClusterState updatedState = registerDecommissionAttributeInClusterState(initialClusterState, decommissionAttribute); + ClusterState updatedState = registerDecommissionAttributeInClusterState( + initialClusterState, + decommissionAttribute, + randomAlphaOfLength(10) + ); assertEquals(decommissionAttribute, updatedState.metadata().decommissionAttributeMetadata().decommissionAttribute()); updatedState = deleteDecommissionAttributeInClusterState(updatedState); assertNull(updatedState.metadata().decommissionAttributeMetadata()); @@ -79,13 +83,14 @@ public void testNodeCommissioned() { ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - decommissionStatus + decommissionStatus, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); assertTrue(nodeCommissioned(node2, metadata)); assertFalse(nodeCommissioned(node1, metadata)); DecommissionStatus commissionStatus = randomFrom(DecommissionStatus.FAILED, DecommissionStatus.INIT); - decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, commissionStatus); + decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, commissionStatus, randomAlphaOfLength(10)); metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); assertTrue(nodeCommissioned(node2, metadata)); assertTrue(nodeCommissioned(node1, metadata)); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index a942c62bd05eb..51cd0e6eb23ed 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.decommission; +import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -49,6 +50,9 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; import static org.opensearch.cluster.ClusterState.builder; import static org.opensearch.cluster.OpenSearchAllocationTestCase.createAllocationService; import static org.opensearch.test.ClusterServiceUtils.createClusterService; @@ -195,13 +199,49 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testExternalDecommissionRetryNotAllowed() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionStatus oldStatus = DecommissionStatus.INIT; + DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( + new DecommissionAttribute("zone", "zone_1"), + oldStatus, + randomAlphaOfLength(10) + ); + final ClusterState.Builder builder = builder(clusterService.state()); + setState( + clusterService, + builder.metadata(Metadata.builder(clusterService.state().metadata()).decommissionAttributeMetadata(oldMetadata).build()) + ); + AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + DecommissionRequest request = new DecommissionRequest(new DecommissionAttribute("zone", "zone_1")); + decommissionService.startDecommissionAction(request, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(DecommissioningFailedException.class)); + MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("same request is already in status [INIT]")); + } + @SuppressWarnings("unchecked") public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionStatus oldStatus = randomFrom(DecommissionStatus.SUCCESSFUL, DecommissionStatus.IN_PROGRESS, DecommissionStatus.INIT); DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( new DecommissionAttribute("zone", "zone_1"), - oldStatus + oldStatus, + randomAlphaOfLength(10) ); final ClusterState.Builder builder = builder(clusterService.state()); setState( @@ -233,6 +273,40 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testDecommissioningFailedWhenAnotherRequestForSameAttributeIsExecuted() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionStatus oldStatus = DecommissionStatus.INIT; + DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( + new DecommissionAttribute("zone", "zone_1"), + oldStatus, + randomAlphaOfLength(10) + ); + final ClusterState.Builder builder = builder(clusterService.state()); + setState( + clusterService, + builder.metadata(Metadata.builder(clusterService.state().metadata()).decommissionAttributeMetadata(oldMetadata).build()) + ); + AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + DecommissionRequest request = new DecommissionRequest(new DecommissionAttribute("zone", "zone_1")); + decommissionService.startDecommissionAction(request, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertTrue(exceptionReference.get() instanceof DecommissioningFailedException); + assertThat(exceptionReference.get().getMessage(), Matchers.endsWith("same request is already in status [INIT]")); + } + public void testScheduleNodesDecommissionOnTimeout() { TransportService mockTransportService = Mockito.mock(TransportService.class); ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class); @@ -249,7 +323,8 @@ public void testScheduleNodesDecommissionOnTimeout() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.DRAINING + DecommissionStatus.DRAINING, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); @@ -268,9 +343,11 @@ public void testScheduleNodesDecommissionOnTimeout() { public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); + String requestID = randomAlphaOfLength(10); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.INIT + DecommissionStatus.INIT, + requestID ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); @@ -278,6 +355,7 @@ public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { DecommissionRequest request = new DecommissionRequest(decommissionAttribute); request.setNoDelay(true); + request.setRequestID(requestID); setState(clusterService, state); decommissionService.drainNodesWithDecommissionedAttribute(request); @@ -289,7 +367,8 @@ public void testRecommissionAction() throws InterruptedException { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.SUCCESSFUL + DecommissionStatus.SUCCESSFUL, + randomAlphaOfLength(10) ); final ClusterState.Builder builder = builder(clusterService.state()); setState( diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java index 60b3a03848830..284a0fe77edfb 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java @@ -33,7 +33,7 @@ protected Metadata.Custom createTestInstance() { String attributeValue = randomAlphaOfLength(6); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); - return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus, randomAlphaOfLength(10)); } @Override @@ -57,7 +57,11 @@ protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) { if (randomBoolean()) { attributeValue = randomAlphaOfLength(6); } - return new DecommissionAttributeMetadata(new DecommissionAttribute(attributeName, attributeValue), decommissionStatus); + return new DecommissionAttributeMetadata( + new DecommissionAttribute(attributeName, attributeValue), + decommissionStatus, + randomAlphaOfLength(10) + ); } @Override @@ -77,7 +81,8 @@ protected Metadata.Custom doParseInstance(XContentParser parser) throws IOExcept assertEquals(XContentParser.Token.END_OBJECT, parser.currentToken()); return new DecommissionAttributeMetadata( decommissionAttributeMetadata.decommissionAttribute(), - decommissionAttributeMetadata.status() + decommissionAttributeMetadata.status(), + decommissionAttributeMetadata.requestID() ); } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java index 746d4565b0db3..98cecd8439413 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java @@ -24,7 +24,7 @@ protected DecommissionAttributeMetadata createTestInstance() { String attributeValue = randomAlphaOfLength(6); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); - return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus, randomAlphaOfLength(10)); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java index 030946f4510a1..a83914cec23c0 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java @@ -23,7 +23,7 @@ protected DecommissionAttributeMetadata createTestInstance() { String attributeValue = randomAlphaOfLength(6); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); - return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus, randomAlphaOfLength(10)); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java new file mode 100644 index 0000000000000..c9c616dab0dbc --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java @@ -0,0 +1,266 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.Version; +import org.opensearch.action.OriginalIndicesTests; +import org.opensearch.action.search.SearchShardIterator; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.ShardId; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.NodeNotConnectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; + +public class FailAwareWeightedRoutingTests extends OpenSearchTestCase { + + private ClusterState setUpCluster() { + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); + + // set up nodes + DiscoveryNode nodeA = new DiscoveryNode( + "node_zone_a", + buildNewFakeTransportAddress(), + singletonMap("zone", "a"), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + DiscoveryNode nodeB = new DiscoveryNode( + "node_zone_b", + buildNewFakeTransportAddress(), + singletonMap("zone", "b"), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + DiscoveryNode nodeC = new DiscoveryNode( + "node_zone_c", + buildNewFakeTransportAddress(), + singletonMap("zone", "c"), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + + nodeBuilder.add(nodeA); + nodeBuilder.add(nodeB); + nodeBuilder.add(nodeC); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + + // set up weighted routing weights + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + + return clusterState; + + } + + public void testFindNextWithoutFailOpen() throws IOException { + + ClusterState clusterState = setUpCluster(); + + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + searchShardIterator.nextOrNull(); + searchShardIterator.nextOrNull(); + + // fail open is not executed since fail open conditions don't met + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException()); + assertNull(next); + } + + public void testFindNextWithFailOpenDueTo5xx() throws IOException { + + ClusterState clusterState = setUpCluster(); + + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + searchShardIterator.nextOrNull(); + searchShardIterator.nextOrNull(); + + // Node in zone b is disconnected + DiscoveryNode node = clusterState.nodes().get("node_zone_b"); + // fail open is executed and shard present in node with weighted routing weight zero is returned + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new NodeNotConnectedException(node, "Node is not " + "connected")); + assertNotNull(next); + assertEquals("node_zone_c", next.getNodeId()); + } + + public void testFindNextWithFailOpenDueToUnassignedShard() throws IOException { + + ClusterState clusterState = setUpCluster(); + + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", true, ShardRoutingState.STARTED); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED); + + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + searchShardIterator.nextOrNull(); + searchShardIterator.nextOrNull(); + + // since there is an unassigned shard in the cluster, fail open is executed and shard present in node with + // weighted routing weight zero is returned + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException()); + assertNotNull(next); + assertEquals("node_zone_c", next.getNodeId()); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index d64402a74fba2..ffdb2d39fb817 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -904,6 +904,7 @@ public void testWeightedOperationRoutingWeightUndefinedForOneZone() throws Excep Settings setting = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) .build(); threadPool = new TestThreadPool("testThatOnlyNodesSupport"); diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 1f892b993d4d6..65fc1b902f9a4 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -184,7 +184,11 @@ private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); weights = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 1.0); weightedRouting = new WeightedRouting("zone", weights); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(3, shardIterator.size()); weights = Map.of("zone1", -1.0, "zone2", 0.0, "zone3", 1.0); @@ -573,21 +571,21 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(1, shardIterator.size()); - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node2", "node1").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node2", "node1").contains(shardRouting.currentNodeId())); - weights = Map.of("zone1", 0.0, "zone2", 0.0, "zone3", 0.0); + weights = Map.of("zone1", 3.0, "zone2", 2.0, "zone3", 0.0); weightedRouting = new WeightedRouting("zone", weights); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); - assertEquals(0, shardIterator.size()); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + assertEquals(3, shardIterator.size()); + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); } finally { terminate(threadPool); } @@ -646,14 +644,12 @@ public void testWeightedRoutingInMemoryStore() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); // Make iterator call with same WeightedRouting instance assertNotNull( @@ -662,13 +658,11 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); // Make iterator call with new instance of WeightedRouting but same weights Map weights1 = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0); @@ -679,13 +673,11 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); // Make iterator call with different weights Map weights2 = Map.of("zone1", 1.0, "zone2", 0.0, "zone3", 1.0); @@ -696,13 +688,82 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); - while (shardIterator.remaining() > 0) { + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node2").contains(shardRouting.currentNodeId())); + + } finally { + terminate(threadPool); + } + } + + /** + * Test to validate that shard routing state is maintained across requests + */ + public void testWeightedRoutingShardState() { + TestThreadPool threadPool = null; + try { + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.awareness.attributes", "zone"); + AllocationService strategy = createAllocationService(settings.build()); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + threadPool = new TestThreadPool("testThatOnlyNodesSupport"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + Map node1Attributes = new HashMap<>(); + node1Attributes.put("zone", "zone1"); + Map node2Attributes = new HashMap<>(); + node2Attributes.put("zone", "zone2"); + Map node3Attributes = new HashMap<>(); + node3Attributes.put("zone", "zone3"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", unmodifiableMap(node1Attributes))) + .add(newNode("node2", unmodifiableMap(node2Attributes))) + .add(newNode("node3", unmodifiableMap(node3Attributes))) + .localNodeId("node1") + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + Map weights = Map.of("zone1", 3.0, "zone2", 2.0, "zone3", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + Map requestCount = new HashMap<>(); + + for (int i = 0; i < 5; i++) { + ShardIterator shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + + assertEquals(3, shardIterator.size()); + ShardRouting shardRouting; shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); - assertFalse(Arrays.asList("node2").contains(shardRouting.currentNodeId())); + requestCount.put(shardRouting.currentNodeId(), requestCount.getOrDefault(shardRouting.currentNodeId(), 0) + 1); } + assertEquals(3, requestCount.get("node1").intValue()); + assertEquals(2, requestCount.get("node2").intValue()); + } finally { terminate(threadPool); } diff --git a/server/src/test/java/org/opensearch/common/settings/WriteableSettingTests.java b/server/src/test/java/org/opensearch/common/settings/WriteableSettingTests.java index 59b0e51cbbc5f..85c182a74fd42 100644 --- a/server/src/test/java/org/opensearch/common/settings/WriteableSettingTests.java +++ b/server/src/test/java/org/opensearch/common/settings/WriteableSettingTests.java @@ -462,23 +462,4 @@ public void testVersionSetting() throws IOException { } } } - - @Ignore - @SuppressForbidden(reason = "The only way to test these is via reflection") - public void testExceptionHandling() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException { - // abuse reflection to change default value, no way to do this with given Setting class - Setting setting = Setting.simpleString(""); - Field dv = setting.getClass().getDeclaredField("defaultValue"); - dv.setAccessible(true); - Field p = setting.getClass().getDeclaredField("parser"); - p.setAccessible(true); - - // test default value type not in enum - Function dvfi = s -> ""; - dv.set(setting, dvfi); - Function pfi = s -> new WriteableSettingTests(); - p.set(setting, pfi); - IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> new WriteableSetting(setting)); - assertTrue(iae.getMessage().contains("generic type: WriteableSettingTests")); - } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index bd9a608e6d85e..b016c6a151cce 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -448,13 +448,13 @@ public void testSimpleOperationsUpload() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - assertEquals(translog.allUploaded().size(), 4); + assertEquals(translog.allUploaded().size(), 2); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); - assertEquals(translog.allUploaded().size(), 6); + assertEquals(translog.allUploaded().size(), 4); translog.rollGeneration(); - assertEquals(translog.allUploaded().size(), 6); + assertEquals(translog.allUploaded().size(), 4); Set mdFiles = blobStoreTransferService.listAll( repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata") @@ -495,6 +495,38 @@ public void testSimpleOperationsUpload() throws IOException { assertArrayEquals(ckp, content); } } + + // expose the new checkpoint (simulating a commit), before we trim the translog + translog.deletionPolicy.setLocalCheckpointOfSafeCommit(0); + // simulating the remote segment upload . + translog.setMinSeqNoToKeep(0); + // This should not trim anything + translog.trimUnreferencedReaders(); + assertEquals(translog.allUploaded().size(), 4); + assertEquals( + blobStoreTransferService.listAll( + repository.basePath() + .add(shardId.getIndex().getUUID()) + .add(String.valueOf(shardId.id())) + .add(String.valueOf(primaryTerm.get())) + ).size(), + 4 + ); + + // This should trim tlog-2.* files as it contains seq no 0 + translog.setMinSeqNoToKeep(1); + translog.trimUnreferencedReaders(); + assertEquals(translog.allUploaded().size(), 2); + assertEquals( + blobStoreTransferService.listAll( + repository.basePath() + .add(shardId.getIndex().getUUID()) + .add(String.valueOf(shardId.id())) + .add(String.valueOf(primaryTerm.get())) + ).size(), + 2 + ); + } private Long populateTranslogOps(boolean withMissingOps) throws IOException { @@ -620,8 +652,8 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable { public void testConcurrentWriteViewsAndSnapshot() throws Throwable { final Thread[] writers = new Thread[randomIntBetween(1, 3)]; final Thread[] readers = new Thread[randomIntBetween(1, 3)]; - final int flushEveryOps = randomIntBetween(5, 100); - final int maxOps = randomIntBetween(200, 1000); + final int flushEveryOps = randomIntBetween(5, 10); + final int maxOps = randomIntBetween(20, 100); final Object signalReaderSomeDataWasIndexed = new Object(); final AtomicLong idGenerator = new AtomicLong(); final CyclicBarrier barrier = new CyclicBarrier(writers.length + readers.length + 1); @@ -684,6 +716,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep // expose the new checkpoint (simulating a commit), before we trim the translog lastCommittedLocalCheckpoint.set(localCheckpoint); deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + translog.setMinSeqNoToKeep(localCheckpoint + 1); translog.trimUnreferencedReaders(); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 1a8827ac797a8..8c465a17c2bb2 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -84,6 +84,9 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) { public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { fileTransferFailed.incrementAndGet(); } + + @Override + public void onDelete(String name) {} } );