Skip to content

Commit

Permalink
Add settings for allocator timeout and add additional logs
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Jul 22, 2024
1 parent 6d2a9d3 commit ef8232a
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -102,7 +103,6 @@ public class AllocationService {
private final ClusterInfoService clusterInfoService;
private SnapshotsInfoService snapshotsInfoService;
private final ClusterManagerMetrics clusterManagerMetrics;
private final long maxRunTimeoutInMillis = 5;

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand Down Expand Up @@ -568,8 +568,14 @@ private void reroute(RoutingAllocation allocation) {
long rerouteStartTimeNS = System.nanoTime();
removeDelayMarkers(allocation);

long startTime = System.nanoTime();
allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
logger.info("Completing allocate unassigned, elapsed time: [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
startTime = System.nanoTime();
shardsAllocator.allocate(allocation);
logger.info("Shard allocator to allocate all shards, elapsed time: [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
clusterManagerMetrics.recordLatency(
clusterManagerMetrics.rerouteHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS))
Expand Down Expand Up @@ -620,22 +626,26 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, true), () -> maxRunTimeoutInMillis);
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, true), () -> allocator.getPrimaryBatchAllocatorTimeout().millis(), true);
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, false), () -> maxRunTimeoutInMillis);
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, false), () -> allocator.getReplicaBatchAllocatorTimeout().millis(), false);
}

private void executeTimedRunnables(List<Consumer<Boolean>> runnables, Supplier<Long> maxRunTimeSupplier) {
private void executeTimedRunnables(List<Consumer<Boolean>> runnables, Supplier<Long> maxRunTimeSupplier, boolean primary) {
logger.info("Executing timed runnables for primary [{}] of size [{}]", primary, runnables.size());
Collections.shuffle(runnables);
long startTime = System.nanoTime();
for (Consumer<Boolean> workQueue : runnables) {
if (System.nanoTime() - startTime < TimeValue.timeValueMillis(maxRunTimeSupplier.get()).nanos()) {
logger.info("Starting primary [{}] batch to allocate", primary);
workQueue.accept(false);
} else {
logger.info("Timing out primary [{}] batch to allocate", primary);
workQueue.accept(true);
}
}
logger.info("Time taken to execute timed runnables in this cycle:[{}ms]", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

Expand Down Expand Up @@ -122,6 +123,14 @@ default List<Consumer<Boolean>> allocateAllUnassignedShards(RoutingAllocation al
return runnables;
}

default TimeValue getPrimaryBatchAllocatorTimeout() {
return TimeValue.MINUS_ONE;
}

default TimeValue getReplicaBatchAllocatorTimeout() {
return TimeValue.MINUS_ONE;
}

/**
* Returns an explanation for a single unassigned shard.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -162,6 +163,13 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<TimeValue> ALLOCATE_UNASSIGNED_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.routing.allocation.allocate_unassigned_timeout",
TimeValue.MINUS_ONE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

Expand All @@ -172,6 +180,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile float shardBalanceFactor;
private volatile WeightFunction weightFunction;
private volatile float threshold;
private volatile TimeValue allocateUnassignedTimeout;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand All @@ -187,6 +196,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setAllocateUnassignedTimeout(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
Expand All @@ -195,6 +205,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING, this::setAllocateUnassignedTimeout);
}

/**
Expand Down Expand Up @@ -269,6 +280,10 @@ private void setThreshold(float threshold) {
this.threshold = threshold;
}

private void setAllocateUnassignedTimeout(TimeValue allocateUnassignedTimeout) {
this.allocateUnassignedTimeout = allocateUnassignedTimeout;
}

@Override
public void allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
Expand All @@ -282,7 +297,8 @@ public void allocate(RoutingAllocation allocation) {
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance
preferPrimaryShardRebalance,
allocateUnassignedTimeout
);
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
Expand All @@ -297,19 +313,21 @@ public void allocate(RoutingAllocation allocation) {

@Override
public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) {
long startTime = System.nanoTime();
ShardsBalancer localShardsBalancer = new LocalShardsBalancer(
logger,
allocation,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance
preferPrimaryShardRebalance,
allocateUnassignedTimeout
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
if (shard.unassigned()) {
allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard);
allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard, startTime);
} else {
moveDecision = localShardsBalancer.decideMove(shard);
if (moveDecision.isDecisionTaken() && moveDecision.canRemain()) {
Expand Down Expand Up @@ -558,7 +576,7 @@ public Balancer(
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false);
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, TimeValue.MINUS_ONE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.gateway.PriorityComparator;

import java.util.ArrayList;
Expand All @@ -40,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand All @@ -60,6 +62,7 @@ public class LocalShardsBalancer extends ShardsBalancer {

private final boolean preferPrimaryBalance;
private final boolean preferPrimaryRebalance;
private final TimeValue allocateUnassignedTimeout;
private final BalancedShardsAllocator.WeightFunction weight;

private final float threshold;
Expand All @@ -77,7 +80,8 @@ public LocalShardsBalancer(
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance,
boolean preferPrimaryRebalance
boolean preferPrimaryRebalance,
TimeValue allocateUnassignedTimeout
) {
this.logger = logger;
this.allocation = allocation;
Expand All @@ -94,6 +98,7 @@ public LocalShardsBalancer(
this.preferPrimaryBalance = preferPrimaryBalance;
this.preferPrimaryRebalance = preferPrimaryRebalance;
this.shardMovementStrategy = shardMovementStrategy;
this.allocateUnassignedTimeout = allocateUnassignedTimeout;
}

/**
Expand Down Expand Up @@ -742,6 +747,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
*/
@Override
void allocateUnassigned() {
long startTime = System.nanoTime();
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
assert !nodes.isEmpty();
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -797,7 +803,7 @@ void allocateUnassigned() {
do {
for (int i = 0; i < primaryLength; i++) {
ShardRouting shard = primary[i];
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard);
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard, startTime);
final String assignedNodeId = allocationDecision.getTargetNode() != null
? allocationDecision.getTargetNode().getId()
: null;
Expand Down Expand Up @@ -870,6 +876,8 @@ void allocateUnassigned() {
secondaryLength = 0;
} while (primaryLength > 0);
// clear everything we have either added it or moved to ignoreUnassigned
logger.debug("Time taken in allocate unassigned [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}

/**
Expand All @@ -879,12 +887,17 @@ void allocateUnassigned() {
* is of type {@link Decision.Type#NO}, then the assigned node will be null.
*/
@Override
AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard, long startTime) {
if (shard.assignedToNode()) {
// we only make decisions for unassigned shards here
return AllocateUnassignedDecision.NOT_TAKEN;
}

if (System.nanoTime() - startTime > allocateUnassignedTimeout.nanos()) {
logger.info("Timed out while running Local shard balancer allocate unassigned - outer loop");
return AllocateUnassignedDecision.throttle(null);
}

final boolean explain = allocation.debugDecision();
Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation);
if (shardLevelDecision.type() == Decision.Type.NO && explain == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private Map<String, Integer> calculateNodePrimaryShardCount(List<RoutingNode> re
}

@Override
AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting) {
AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting, long startTime) {
throw new UnsupportedOperationException("remote shards balancer does not support decision operations");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class ShardsBalancer {
* @param shardRouting the shard for which the decision has to be made
* @return the allocation decision
*/
abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting);
abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting, long startTime);

/**
* Makes a decision on whether to move a started shard to another node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
BalancedShardsAllocator.ALLOCATE_UNASSIGNED_TIMEOUT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down Expand Up @@ -342,6 +343,8 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING,
ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Expand Down
Loading

0 comments on commit ef8232a

Please sign in to comment.