Skip to content

Commit

Permalink
Schedule reroute after allocator timed out
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Sep 2, 2024
1 parent cad81b0 commit 7cf09a5
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.metadata.ViewMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand Down Expand Up @@ -474,4 +475,9 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard
allocationService.setExistingShardsAllocators(existingShardsAllocators);
}

public void setRerouteServiceForAllocator(RerouteService rerouteService) {
if (shardsAllocator instanceof BalancedShardsAllocator) {
((BalancedShardsAllocator) shardsAllocator).setRerouteService(rerouteService);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
Expand All @@ -49,12 +50,14 @@
import org.opensearch.cluster.routing.allocation.RebalanceParameter;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.ShardAllocationDecision;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -202,6 +205,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
private long startTime;
private RerouteService rerouteService;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand Down Expand Up @@ -231,6 +235,11 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
}

public void setRerouteService(RerouteService rerouteService) {
assert this.rerouteService == null : "RerouteService is already set";
this.rerouteService = rerouteService;
}

/**
* Changes in deprecated setting SHARD_MOVE_PRIMARY_FIRST_SETTING affect value of its replacement setting SHARD_MOVEMENT_STRATEGY_SETTING.
*/
Expand Down Expand Up @@ -342,6 +351,7 @@ public void allocate(RoutingAllocation allocation) {
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();
scheduleRerouteIfAllocatorTimedOut();

final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
Expand Down Expand Up @@ -404,6 +414,23 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
}
}

private void scheduleRerouteIfAllocatorTimedOut() {
if(allocatorTimedOut()) {
if (rerouteService == null) {
logger.info("RerouteService not set to schedule reroute after allocator time out");
return;
}
rerouteService.reroute(
"reroute after balanced shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after balanced shards allocator timed out completed"),
e -> logger.debug("reroute after balanced shards allocator timed out failed", e)
)
);
}
}

/**
* Returns the currently configured delta threshold
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ public void onTimeout() {
@Override
public void run() {
primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation);
if (timedOutPrimaryShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for primary shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
}));
return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout) {
Expand Down Expand Up @@ -320,6 +332,18 @@ public void run() {
public void onComplete() {
logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size());
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false);
if (timedOutReplicaShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for replica shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
};
}
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ protected Node(
final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);
clusterModule.setRerouteServiceForAllocator(rerouteService);

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());

Expand Down

0 comments on commit 7cf09a5

Please sign in to comment.