From 3fc08d8d0f48d55c9ace8b92fee3e699f0352f9a Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Fri, 5 Jul 2024 13:12:43 +0530 Subject: [PATCH] Make setting dynamic and make batch size also dynamic Signed-off-by: Rishab Nahata --- .../gateway/ShardsBatchGatewayAllocator.java | 47 ++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 6962f9496949e..6a7492561134b 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -27,6 +27,7 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.inject.Inject; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -52,6 +53,7 @@ import java.util.Set; import java.util.Spliterators; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -66,8 +68,8 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { public static final String ALLOCATOR_NAME = "shards_batch_gateway_allocator"; private static final Logger logger = LogManager.getLogger(ShardsBatchGatewayAllocator.class); - private final long maxBatchSize; - private final TimeValue batchAllocatorRerouteTimeout; + private long maxBatchSize; + private TimeValue batchAllocatorRerouteTimeout; private static final short DEFAULT_SHARD_BATCH_SIZE = 2000; private static final String BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING_KEY = "cluster.routing.allocation.shards_batch_gateway_allocator.reroute_timeout"; @@ -79,13 +81,15 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { DEFAULT_SHARD_BATCH_SIZE, 1, 10000, - Setting.Property.NodeScope + Setting.Property.NodeScope, + Setting.Property.Dynamic ); public static final Setting BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING = Setting.timeSetting( BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING_KEY, TimeValue.timeValueSeconds(30), - Setting.Property.NodeScope + Setting.Property.NodeScope, + Setting.Property.Dynamic ); private final RerouteService rerouteService; @@ -106,7 +110,8 @@ public ShardsBatchGatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShardsBatch batchStartedAction, TransportNodesListShardStoreMetadataBatch batchStoreAction, - Settings settings + Settings settings, + ClusterSettings clusterSettings ) { this.rerouteService = rerouteService; this.primaryShardBatchAllocator = new InternalPrimaryBatchShardAllocator(); @@ -115,6 +120,22 @@ public ShardsBatchGatewayAllocator( this.batchStoreAction = batchStoreAction; this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings); this.batchAllocatorRerouteTimeout = BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + GATEWAY_ALLOCATOR_BATCH_SIZE, + this::setGatewayAllocatorBatchSize + ); + clusterSettings.addSettingsUpdateConsumer( + BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING, + this::setBatchAllocatorRerouteTimeout + ); + } + + private void setBatchAllocatorRerouteTimeout(TimeValue batchAllocatorRerouteTimeout) { + this.batchAllocatorRerouteTimeout = batchAllocatorRerouteTimeout; + } + + private void setGatewayAllocatorBatchSize(long maxBatchSize) { + this.maxBatchSize = maxBatchSize; } @Override @@ -221,30 +242,34 @@ protected void innerAllocateUnassignedBatch( if (primary) { for (ShardsBatch shardsBatch: batchIdToStartedShardBatch.values()) { if (System.currentTimeMillis() - startTime > batchAllocatorRerouteTimeout.millis()) { + logger.info("Batch allocation timed out after executing [{}] batches out of total [{}] batch to assign", + totalBatchSuccessfullyExecuted, batchesToAssign.size()); break; } if (batchesToAssign.contains(shardsBatch.batchId)) { logger.info("Allocating unassigned primary shards batch with total [{}] shards in the batch with id [{}]", shardsBatch.getBatchedShardRoutings().size(), shardsBatch.batchId); - long startTimeOfBatch = System.currentTimeMillis(); + long startTimeOfBatch = System.nanoTime(); primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation); - logger.info("Time taken to allocate unassigned primary batch with id [{}] in this cycle:[{}ms]", - shardsBatch.batchId, System.currentTimeMillis() - startTimeOfBatch); + logger.info("Time taken to allocate unassigned primary batch with id [{}] in this cycle:[{} ms]", + shardsBatch.batchId, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeOfBatch)); totalBatchSuccessfullyExecuted++; } } } else { for (ShardsBatch shardsBatch: batchIdToStoreShardBatch.values()) { if (System.currentTimeMillis() - startTime > batchAllocatorRerouteTimeout.millis()) { + logger.info("Batch allocation timed out after executing [{}] batches out of total [{}] batch to assign", + totalBatchSuccessfullyExecuted, batchesToAssign.size()); break; } if (batchesToAssign.contains(shardsBatch.batchId)) { logger.info("Allocating unassigned replica shards batch with total [{}] shards in the batch with id [{}]", shardsBatch.getBatchedShardRoutings().size(), shardsBatch.batchId); - long startTimeOfBatch = System.currentTimeMillis(); + long startTimeOfBatch = System.nanoTime(); replicaBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation); - logger.info("Time taken to allocate unassigned replica batch with id [{}] in this cycle:[{}ms]", - shardsBatch.batchId, System.currentTimeMillis() - startTimeOfBatch); + logger.info("Time taken to allocate unassigned replica batch with id [{}] in this cycle:[{} ms]", + shardsBatch.batchId, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeOfBatch)); totalBatchSuccessfullyExecuted++; } }