From 7a760e17040189defbf525e07bb6b048270dbfda Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sun, 7 Jul 2024 15:37:52 +0530 Subject: [PATCH] Add timeout setting for batch allocator Fix minor bug Signed-off-by: Rishab Nahata --- .../routing/allocation/AllocationService.java | 8 ++++- .../allocation/ExistingShardsAllocator.java | 5 ++++ .../common/settings/ClusterSettings.java | 1 + .../gateway/ShardsBatchGatewayAllocator.java | 30 ++++++++++++++++++- 4 files changed, 42 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 0002ced09da63..9895ad7de65be 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -633,10 +633,11 @@ private void allocateAllUnassignedShards(RoutingAllocation allocation) { } private void processWorkItemQueue(RoutingAllocation allocation) { + ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); long startTime = System.nanoTime(); //TODO replace with max time while (workQueue.isEmpty() == false) { - if (System.nanoTime() - startTime > TimeValue.timeValueSeconds(30).nanos()) { + if (System.nanoTime() - startTime > allocator.getAllocatorTimeout().nanos()) { logger.info("Timed out while running process work item queue"); return; } @@ -827,6 +828,11 @@ public void allocateUnassigned( unassignedAllocationHandler.removeAndIgnore(AllocationStatus.NO_VALID_SHARD_COPY, allocation.changes()); } + @Override + public TimeValue getAllocatorTimeout() { + return null; + } + @Override public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation allocation) { assert unassignedShard.unassigned(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index f44c4854f6de1..a56c4cf2de3f5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.common.Nullable; import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.ShardsBatchGatewayAllocator; @@ -102,6 +103,10 @@ void allocateUnassigned( UnassignedAllocationHandler unassignedAllocationHandler ); + default TimeValue getAllocatorTimeout() { + return null; + } + /** * Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible. * Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5dcf23ae52294..eee75efecb924 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -342,6 +342,7 @@ public void apply(Settings value, Settings current, Settings previous) { GatewayService.RECOVER_AFTER_NODES_SETTING, GatewayService.RECOVER_AFTER_TIME_SETTING, ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE, + ShardsBatchGatewayAllocator.BATCH_ALLOCATOR_TIMEOUT_SETTING, PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 2a9f0ff552452..e13efb907abaf 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -27,8 +27,10 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.inject.Inject; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; @@ -60,6 +62,9 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private static final Logger logger = LogManager.getLogger(ShardsBatchGatewayAllocator.class); private final long maxBatchSize; private static final short DEFAULT_SHARD_BATCH_SIZE = 2000; + private static final String BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY = "cluster.routing.allocation.shards_batch_gateway_allocator.allocator_timeout"; + + private TimeValue shardsBatchGatewayAllocatorTimeout; /** * Number of shards we send in one batch to data nodes for fetching metadata @@ -72,6 +77,13 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { Setting.Property.NodeScope ); + public static final Setting BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( + BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, + TimeValue.timeValueSeconds(60), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private final RerouteService rerouteService; private final PrimaryShardBatchAllocator primaryShardBatchAllocator; private final ReplicaShardBatchAllocator replicaShardBatchAllocator; @@ -90,7 +102,8 @@ public ShardsBatchGatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShardsBatch batchStartedAction, TransportNodesListShardStoreMetadataBatch batchStoreAction, - Settings settings + Settings settings, + ClusterSettings clusterSettings ) { this.rerouteService = rerouteService; this.primaryShardBatchAllocator = new InternalPrimaryBatchShardAllocator(); @@ -98,6 +111,11 @@ public ShardsBatchGatewayAllocator( this.batchStartedAction = batchStartedAction; this.batchStoreAction = batchStoreAction; this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings); + this.shardsBatchGatewayAllocatorTimeout = BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + BATCH_ALLOCATOR_TIMEOUT_SETTING, + this::setAllocatorTimeout + ); } @Override @@ -120,6 +138,7 @@ protected ShardsBatchGatewayAllocator(long batchSize) { this.batchStoreAction = null; this.replicaShardBatchAllocator = null; this.maxBatchSize = batchSize; + this.shardsBatchGatewayAllocatorTimeout = null; } // for tests @@ -179,6 +198,15 @@ public void allocateUnassigned( throw new UnsupportedOperationException("ShardsBatchGatewayAllocator does not support allocating unassigned shards"); } + @Override + public TimeValue getAllocatorTimeout() { + return this.shardsBatchGatewayAllocatorTimeout; + } + + public void setAllocatorTimeout(TimeValue shardsBatchGatewayAllocatorTimeout) { + this.shardsBatchGatewayAllocatorTimeout = shardsBatchGatewayAllocatorTimeout; + } + @Override public List allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) {