Skip to content

Commit

Permalink
Make setting dynamic and make batch size also dynamic
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Jul 5, 2024
1 parent 853fd93 commit 3fc08d8
Showing 1 changed file with 36 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -52,6 +53,7 @@
import java.util.Set;
import java.util.Spliterators;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -66,8 +68,8 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {

public static final String ALLOCATOR_NAME = "shards_batch_gateway_allocator";
private static final Logger logger = LogManager.getLogger(ShardsBatchGatewayAllocator.class);
private final long maxBatchSize;
private final TimeValue batchAllocatorRerouteTimeout;
private long maxBatchSize;
private TimeValue batchAllocatorRerouteTimeout;
private static final short DEFAULT_SHARD_BATCH_SIZE = 2000;
private static final String BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING_KEY = "cluster.routing.allocation.shards_batch_gateway_allocator.reroute_timeout";

Expand All @@ -79,13 +81,15 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
DEFAULT_SHARD_BATCH_SIZE,
1,
10000,
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING = Setting.timeSetting(
BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING_KEY,
TimeValue.timeValueSeconds(30),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final RerouteService rerouteService;
Expand All @@ -106,7 +110,8 @@ public ShardsBatchGatewayAllocator(
RerouteService rerouteService,
TransportNodesListGatewayStartedShardsBatch batchStartedAction,
TransportNodesListShardStoreMetadataBatch batchStoreAction,
Settings settings
Settings settings,
ClusterSettings clusterSettings
) {
this.rerouteService = rerouteService;
this.primaryShardBatchAllocator = new InternalPrimaryBatchShardAllocator();
Expand All @@ -115,6 +120,22 @@ public ShardsBatchGatewayAllocator(
this.batchStoreAction = batchStoreAction;
this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings);
this.batchAllocatorRerouteTimeout = BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
GATEWAY_ALLOCATOR_BATCH_SIZE,
this::setGatewayAllocatorBatchSize
);
clusterSettings.addSettingsUpdateConsumer(
BATCH_ALLOCATOR_REROUTE_TIMEOUT_SETTING,
this::setBatchAllocatorRerouteTimeout
);
}

private void setBatchAllocatorRerouteTimeout(TimeValue batchAllocatorRerouteTimeout) {
this.batchAllocatorRerouteTimeout = batchAllocatorRerouteTimeout;
}

private void setGatewayAllocatorBatchSize(long maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}

@Override
Expand Down Expand Up @@ -221,30 +242,34 @@ protected void innerAllocateUnassignedBatch(
if (primary) {
for (ShardsBatch shardsBatch: batchIdToStartedShardBatch.values()) {
if (System.currentTimeMillis() - startTime > batchAllocatorRerouteTimeout.millis()) {
logger.info("Batch allocation timed out after executing [{}] batches out of total [{}] batch to assign",
totalBatchSuccessfullyExecuted, batchesToAssign.size());
break;
}
if (batchesToAssign.contains(shardsBatch.batchId)) {
logger.info("Allocating unassigned primary shards batch with total [{}] shards in the batch with id [{}]",
shardsBatch.getBatchedShardRoutings().size(), shardsBatch.batchId);
long startTimeOfBatch = System.currentTimeMillis();
long startTimeOfBatch = System.nanoTime();
primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation);
logger.info("Time taken to allocate unassigned primary batch with id [{}] in this cycle:[{}ms]",
shardsBatch.batchId, System.currentTimeMillis() - startTimeOfBatch);
logger.info("Time taken to allocate unassigned primary batch with id [{}] in this cycle:[{} ms]",
shardsBatch.batchId, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeOfBatch));
totalBatchSuccessfullyExecuted++;
}
}
} else {
for (ShardsBatch shardsBatch: batchIdToStoreShardBatch.values()) {
if (System.currentTimeMillis() - startTime > batchAllocatorRerouteTimeout.millis()) {
logger.info("Batch allocation timed out after executing [{}] batches out of total [{}] batch to assign",
totalBatchSuccessfullyExecuted, batchesToAssign.size());
break;
}
if (batchesToAssign.contains(shardsBatch.batchId)) {
logger.info("Allocating unassigned replica shards batch with total [{}] shards in the batch with id [{}]",
shardsBatch.getBatchedShardRoutings().size(), shardsBatch.batchId);
long startTimeOfBatch = System.currentTimeMillis();
long startTimeOfBatch = System.nanoTime();
replicaBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation);
logger.info("Time taken to allocate unassigned replica batch with id [{}] in this cycle:[{}ms]",
shardsBatch.batchId, System.currentTimeMillis() - startTimeOfBatch);
logger.info("Time taken to allocate unassigned replica batch with id [{}] in this cycle:[{} ms]",
shardsBatch.batchId, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeOfBatch));
totalBatchSuccessfullyExecuted++;
}
}
Expand Down

0 comments on commit 3fc08d8

Please sign in to comment.