Skip to content

Commit

Permalink
Updated ShardsBatch to take ShardRouting into account instead of Shar…
Browse files Browse the repository at this point in the history
…dIds
  • Loading branch information
Gaurav614 committed Jul 19, 2023
1 parent fb3e757 commit c47bc4d
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,17 @@ public void allocateUnassignedBatch(final RoutingAllocation allocation, boolean
createBatches(allocation, primary);

if (primary) {
asyncBatchFetchStarted.keySet().forEach(batch -> primaryBatchShardAllocator.allocateUnassignedBatch(batch.getBatchId(), allocation));
asyncBatchFetchStarted.keySet().forEach(batch -> primaryBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShards(), allocation));
}
else {
asyncBatchFetchStore.keySet().forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchId(), allocation));
asyncBatchFetchStore.keySet().forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShards(), allocation));
}
}

private void createBatches(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
// fetch all current batched shards
Set<ShardId> currentBatchedShards;
Set<ShardRouting> currentBatchedShards;
if (primary) {
currentBatchedShards = asyncBatchFetchStarted.keySet().stream().flatMap(shardsBatch -> shardsBatch.getBatchedShards().stream()).collect(Collectors.toSet());
} else {
Expand All @@ -218,32 +218,36 @@ private void createBatches(RoutingAllocation allocation, boolean primary) {
Set<ShardRouting> shardsToBatch = Sets.newHashSet();
// add all unassigned shards to the batch if they are not already in a batch
unassigned.forEach(shardRouting -> {
if ((currentBatchedShards.contains(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) {
if ((currentBatchedShards.contains(shardRouting) == false) && (shardRouting.primary() == primary)) {
assert shardRouting.unassigned();
shardsToBatch.add(shardRouting);
}
});
Iterator<ShardRouting> iterator = shardsToBatch.iterator();
long batchSize = MAX_BATCH_SIZE;
Map<ShardId, String> addToCurrentBatch = new HashMap<>();
Map<ShardRouting, String> addToCurrentBatch = new HashMap<>();
while (iterator.hasNext()) {
ShardRouting currentShard = iterator.next();
if (batchSize > 0) {
addToCurrentBatch.put(currentShard.shardId(), IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings()));
addToCurrentBatch.put(currentShard, IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings()));
batchSize--;
iterator.remove();
}
// add to batch if batch size full or last shard in unassigned list
if (batchSize == 0 || iterator.hasNext() == false) {
String batchUUId = UUIDs.base64UUID();
ShardsBatch shardsBatch = new ShardsBatch(batchUUId, addToCurrentBatch);
Map<ShardId, String> shardIdsMap = addToCurrentBatch.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey().shardId(),
Map.Entry::getValue
));
if(primary) {
asyncBatchFetchStarted.computeIfAbsent(
shardsBatch,
batch -> new InternalBatchAsyncFetch<>(
logger,
"batch_shards_started",
batch.getShardsToCustomDataPathMap(),
shardIdsMap,
this.batchStartedAction,
batch.getBatchId()
));
Expand All @@ -254,7 +258,7 @@ private void createBatches(RoutingAllocation allocation, boolean primary) {
batch -> new InternalBatchAsyncFetch<>(
logger,
"batch_shards_store",
batch.getShardsToCustomDataPathMap(),
shardIdsMap,
this.batchStoreAction,
batch.getBatchId()
));
Expand Down Expand Up @@ -502,20 +506,20 @@ protected boolean hasInitiatedFetching(ShardRouting shard) {
private class ShardsBatch {
private final String uuid;

public Map<ShardId, String> getShardsToCustomDataPathMap() {
public Map<ShardRouting, String> getShardsToCustomDataPathMap() {
return shardsToCustomDataPathMap;
}

private Map<ShardId,String> shardsToCustomDataPathMap;
private ShardsBatch(String uuid, Map<ShardId,String> shardsToCustomDataPathMap) {
private Map<ShardRouting,String> shardsToCustomDataPathMap;
private ShardsBatch(String uuid, Map<ShardRouting,String> shardsToCustomDataPathMap) {
this.uuid = uuid;
this.shardsToCustomDataPathMap = shardsToCustomDataPathMap;
}
void removeFromBatch(ShardId shardId) {
shardsToCustomDataPathMap.remove(shardId);
void removeFromBatch(ShardRouting shard) {
shardsToCustomDataPathMap.remove(shard);
}

Set<ShardId> getBatchedShards() {
Set<ShardRouting> getBatchedShards() {
return shardsToCustomDataPathMap.keySet();
}

Expand Down

0 comments on commit c47bc4d

Please sign in to comment.