Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GET api to get shard routing weights #4275

Merged
merged 87 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
162cbeb
Weighted round-robin scheduling policy for shard coordination traffic…
Aug 17, 2022
8513c4f
Add caching layer for wrr shard routing and moved wrr routing call to…
Aug 26, 2022
4cdbee3
Integrate ARS with weighted round robin,
Aug 27, 2022
7a49e5c
Remove ARS and add tests for zone with undefined weight
Sep 1, 2022
03c4d23
Add changelog for the commit
Sep 1, 2022
7c64537
Merge branch 'main' into feature/wrr-shard-routing-core
anshu1106 Sep 1, 2022
154a3b9
Fix java doc, add test of WeightedRoundRobinRouting metadata
Sep 1, 2022
3afb4e3
Fix minor change related to shuffling wrr shard routings
Sep 1, 2022
d9e2b08
Add PUT api for updating weighted round robin shard routing weights
Aug 22, 2022
c3b847a
Add more validations for request body for put wrr weights
Aug 29, 2022
9cc2cd5
Update thread pool executor and do code refactoring
Sep 1, 2022
f07a9ad
Fix missing java doc build failure
Sep 1, 2022
f3eda84
Add GET api to get weighted round robin shard routing weights
Aug 22, 2022
28ccce4
Add UTs for get local weight and some refactoring changes
Sep 1, 2022
c9e6237
Make get action to return empty response if weights are not set
Sep 2, 2022
63616e7
Update data type for weight
Sep 2, 2022
9948d97
Merge branch 'main' into feature/wrr-get-api
anshu1106 Sep 2, 2022
e12d247
Throw ActionRequestValidationException on invalid awareness attribute
Sep 2, 2022
3463b39
Add safety check for getting local node weight
Sep 2, 2022
13ddba2
Merge branch 'feature/wrr-get-api' of https://github.com/anshu1106/Op…
Sep 2, 2022
30e17bc
Merge branch 'main' into feature/wrr-shard-routing-core
anshu1106 Sep 2, 2022
3aa6fbb
Add default weight 1 for zones with undefined weight
Sep 7, 2022
9ae063c
Merge remote-tracking branch 'origin/main' into feature/wrr-shard-rou…
Sep 7, 2022
1112366
Inject WRRShardsCache on node start
Sep 12, 2022
cd3e16c
Remove extra new lines
Sep 12, 2022
0c7dbd9
Fix import
Sep 12, 2022
438fe9e
Add size for shard routing cache
Sep 12, 2022
694be4d
Invalidate shard routing cache on close
Sep 12, 2022
9236b8d
Refactor code
Sep 13, 2022
93e7587
Add test for Weighted routing iterator and some refactoring changes
Sep 13, 2022
a9e30d1
Merge remote-tracking branch 'origin/main' into feature/wrr-shard-rou…
Sep 13, 2022
17dc58c
Update metadata minimal supported version
Sep 13, 2022
ed0cc1b
Add cluster setting for default weight
Sep 14, 2022
5a5cb11
Fix tests due to the change
Sep 14, 2022
1c33964
Fix cache concurrency issue
Sep 14, 2022
f8638f2
Spotless check fix
Sep 14, 2022
4016d27
Fix weighted round robin logic case when there is an entity with weig…
Sep 15, 2022
c98606d
Changes weight data type to double
Sep 15, 2022
1bd83f3
Fix test
Sep 15, 2022
8a72dc0
Empty commit
Sep 15, 2022
bff61b9
Empty commit
Sep 15, 2022
9102616
Fix spotless check
Sep 15, 2022
a291634
Create in-memory cache for shard routings
Sep 16, 2022
bd6c9e7
Fix put operation for weighted shard routings
Sep 16, 2022
d9368ab
Add tests for shard routing in-memory store
Sep 17, 2022
a3f0290
Merge branch 'main' into feature/wrr-shard-routing-core
Sep 17, 2022
01239bf
Add java docs and some code refactoring
Sep 17, 2022
dbbb263
Add null check for discovery nodes and single mutex for weighted shar…
Sep 19, 2022
785437b
Merge branch 'feature/wrr-shard-routing-core' into feature/wrr-put-api
Sep 19, 2022
f06a039
Refactor code
Sep 19, 2022
0a4b829
Fix tests
Sep 19, 2022
5f451d4
Merge remote-tracking branch 'origin/main' into feature/wrr-put-api
Sep 19, 2022
4f349e9
Refactor code
Sep 19, 2022
e59cff2
Remove unwanted changes
Sep 19, 2022
dc79fec
Remove unwanted changes for Requests.java
Sep 19, 2022
7671d63
Refactore code
Sep 19, 2022
454387d
Add change log
Sep 19, 2022
86ba360
Merge branch 'feature/wrr-put-api' into feature/wrr-get-api
Sep 19, 2022
54eecdf
Refactor code
Sep 19, 2022
9eb1bc1
Modify request validation
Sep 20, 2022
027d73a
Fix test failure
Sep 19, 2022
19047e6
Make WeightedRoutingService not to imoplement ClusterStateApplier
Sep 20, 2022
663017f
Update metadata call
Sep 20, 2022
70790e9
Add tests for WeightedRoutingService
Sep 21, 2022
4829300
Merge branch 'feature/wrr-put-api' into feature/wrr-get-api
Sep 21, 2022
ba87256
Add string formatter
Sep 22, 2022
4057441
Fix locale in string formatter
Sep 22, 2022
bb16814
Merge branch 'feature/wrr-put-api' into feature/wrr-get-api
Sep 22, 2022
2ab72a8
Move logic to validate awareness attribute to service
Sep 22, 2022
83797f4
Refactor code based on review comments
Sep 23, 2022
006600c
Refactor code
Sep 26, 2022
0240ff0
Refactor code
Sep 26, 2022
7763067
Address review comments
Sep 26, 2022
3a0b0ff
Merge branch 'main' into feature/wrr-put-api
anshu1106 Sep 26, 2022
703d918
Address review comments
Sep 26, 2022
329c6f2
Add explicit count in validation error message
Sep 27, 2022
dbb6055
Merge branch 'feature/wrr-put-api' into feature/wrr-get-api
Sep 27, 2022
b9d4145
Refactor code
Sep 27, 2022
41c557b
Add validation for get request
Sep 27, 2022
d1ec28b
Merge branch 'main' into feature/wrr-get-api
Sep 27, 2022
e4f030e
Refactor code
Sep 27, 2022
e9aea90
Add unit tests
Sep 28, 2022
fb07086
Incorparate review comments and add integ tests
Sep 29, 2022
e152d39
Merge branch 'main' into feature/wrr-get-api
Oct 3, 2022
ee839f7
Remove return statement
Oct 4, 2022
373f41a
Made change to invoke response listener on all failures
Oct 6, 2022
defc1b4
Merge branch 'main' into feature/wrr-get-api
Oct 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
- PUT api for weighted shard routing ([#4272](https://github.com/opensearch-project/OpenSearch/pull/4272))
- GET api for weighted shard routing([#4275](https://github.com/opensearch-project/OpenSearch/pull/4275/))
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
- Further simplification of the ZIP publication implementation ([#4360](https://github.com/opensearch-project/OpenSearch/pull/4360))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ public void testApiNamingConventions() throws Exception {
"search_shards",
"remote_store.restore",
"cluster.put_weighted_routing",
"cluster.get_weighted_routing",
"cluster.put_decommission_awareness",
"cluster.get_decommission_awareness", };
List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"cluster.get_weighted_routing": {
"documentation": {
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/get",
"description": "Fetches weighted shard routing weights"
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_cluster/routing/awareness/{attribute}/weights",
"methods": [
"GET"
],
"parts": {
"attribute": {
"type": "string",
"description": "Awareness attribute name"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class WeightedRoutingIT extends OpenSearchIntegTestCase {

public void testPutWeightedRouting() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> starting 6 nodes on different zones");
int nodeCountPerAZ = 2;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

// put call made on a data node in zone a
response = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1)))
.admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);
}

public void testPutWeightedRouting_InvalidAwarenessAttribute() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone1", weights);

assertThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).get()
);
}

public void testPutWeightedRouting_MoreThanOneZoneHasZeroWeight() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 0.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone1", weights);

assertThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).get()
);
}

public void testGetWeightedRouting_WeightsNotSet() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.get();
assertNull(weightedRoutingResponse.weights());
}

public void testGetWeightedRouting_WeightsAreSet() throws IOException {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

int nodeCountPerAZ = 2;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 2 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
// put api call to set weights
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

// get api call to fetch weights
ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());

// get api to fetch local node weight for a node in zone a
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1)))
.admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("1.0", weightedRoutingResponse.getLocalNodeWeight());

// get api to fetch local node weight for a node in zone b
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_b.get(0), nodes_in_zone_b.get(1)))
.admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("2.0", weightedRoutingResponse.getLocalNodeWeight());

// get api to fetch local node weight for a node in zone c
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_c.get(0), nodes_in_zone_c.get(1)))
.admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("3.0", weightedRoutingResponse.getLocalNodeWeight());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.TransportGetWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.TransportAddWeightedRoutingAction;
import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction;
Expand Down Expand Up @@ -299,6 +301,7 @@
import org.opensearch.rest.action.admin.cluster.RestCloneSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction;
import org.opensearch.rest.action.admin.cluster.RestClusterPutWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterRerouteAction;
Expand Down Expand Up @@ -573,6 +576,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
Expand Down Expand Up @@ -759,6 +763,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestOpenIndexAction());
registerHandler.accept(new RestAddIndexBlockAction());
registerHandler.accept(new RestClusterPutWeightedRoutingAction());
registerHandler.accept(new RestClusterGetWeightedRoutingAction());

registerHandler.accept(new RestUpdateSettingsAction());
registerHandler.accept(new RestGetSettingsAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.get;

import org.opensearch.action.ActionType;

/**
* Action to get weights for weighted round-robin search routing policy.
*
* @opensearch.internal
*/
public class ClusterGetWeightedRoutingAction extends ActionType<ClusterGetWeightedRoutingResponse> {
public static final ClusterGetWeightedRoutingAction INSTANCE = new ClusterGetWeightedRoutingAction();
public static final String NAME = "cluster:admin/routing/awareness/weights/get";

private ClusterGetWeightedRoutingAction() {
super(NAME, ClusterGetWeightedRoutingResponse::new);
}
}
Loading