Skip to content

Commit

Permalink
Modify GA for running RSBA
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Nov 30, 2023
1 parent f03e99e commit b776483
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public GatewayAllocator(
this.batchStoreAction = batchStoreAction;
this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator();
this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings);
this.batchMode = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED.get(settings);
}

@Override
Expand Down Expand Up @@ -216,10 +217,21 @@ public void beforeAllocation(final RoutingAllocation allocation) {

@Override
public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) {
assert replicaShardAllocator != null;
if (allocation.routingNodes().hasInactiveShards()) {
// cancel existing recoveries if we have a better match
replicaShardAllocator.processExistingRecoveries(allocation);
if (this.batchMode) {
assert replicaBatchShardAllocator != null;
List<Set<ShardRouting>> storedShardBatches = batchIdToStoreShardBatch.values().stream()
.map(ShardsBatch::getBatchedShardRoutings)
.collect(Collectors.toList());
if (allocation.routingNodes().hasInactiveShards()) {
// cancel existing recoveries if we have a better match
replicaBatchShardAllocator.processExistingRecoveries(allocation, storedShardBatches);
}
} else {
assert replicaShardAllocator != null;
if (allocation.routingNodes().hasInactiveShards()) {
// cancel existing recoveries if we have a better match
replicaShardAllocator.processExistingRecoveries(allocation);
}
}
}

Expand Down

0 comments on commit b776483

Please sign in to comment.