Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Batch shards changes for GatewayAllocator #8746

Merged
merged 46 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
7f13975
Draft changes for creation of batches
Gaurav614 Jul 14, 2023
637fca0
WIP changes for GA changes for fetching
Gaurav614 Jul 18, 2023
49f7ce1
Batcher changes in GA
Gaurav614 Jul 18, 2023
9b51dfb
Binding of Trasnport Classes to Injector
Gaurav614 Jul 18, 2023
0075bb2
Updated ShardsBatch to take ShardRouting into account instead of Shar…
Gaurav614 Jul 19, 2023
38205cf
Cleaned Shards from batches after allocation attempt and moved keys t…
Gaurav614 Jul 25, 2023
9bbba2e
Allocation service changes for batch assignment
Gaurav614 Jul 26, 2023
818ef1d
Added in-flight fetches for batchMode
shiv0408 Aug 30, 2023
1eabedf
Fixed bugs
Gaurav614 Sep 6, 2023
80afedf
Fixed typos and added tests
Gaurav614 Sep 28, 2023
9238ebf
Added batch mode setting
Gaurav614 Sep 28, 2023
728167c
Added missed clusterModule changes
Gaurav614 Nov 29, 2023
7fe6709
Modify GA for running RSBA
shiv0408 Nov 30, 2023
214aaa4
Addressed PR comments
Gaurav614 Dec 5, 2023
5992235
Addressed PR comments
Gaurav614 Dec 13, 2023
7eb12b9
Added Changes for ShardBatchGatewayAllocator
Gaurav614 Dec 14, 2023
d741081
Fixed culprit shard test
Gaurav614 Dec 14, 2023
809c99f
Cosmetic changes in AllocationService
Gaurav614 Dec 20, 2023
340abe4
PR comments
Gaurav614 Jan 12, 2024
4ee2040
Changes to implement interface for ShardBatchGatewayAllocator
Gaurav614 Jan 15, 2024
9f6284c
Fixed PR comments around documentation
Gaurav614 Feb 12, 2024
38692a0
Renamed allocateUnassignedBatch to allocateAllUnassignedShards
Gaurav614 Jan 12, 2024
20312bd
Removed batchMode variable from ShardsBatchGatewayAllocator
Gaurav614 Feb 6, 2024
8880e80
Fix TestShardBatchGatewayAllocator
shiv0408 Feb 16, 2024
79a99e0
Renamed variables and added documentations
Gaurav614 Feb 23, 2024
c2da82c
Use new AsyncShardFetchBatch class for creating cache for batch trans…
Mar 13, 2024
d2e5522
Correct version check with 3.0.0
Mar 13, 2024
4e4fa06
Local lookup map for shardId to batchId, more code comments
Mar 18, 2024
3b851b9
Remove shards from batches if they are not present in unassigned list…
Apr 4, 2024
edc66dd
Resolved PR comments
Gaurav614 Apr 10, 2024
26a90ce
Incorporate AsyncShardBatchFetch class changes
Apr 12, 2024
3da0af3
Fixed GatewayAllocatorTests
Gaurav614 Apr 12, 2024
dbd51b5
Delete batch if empty when removing shards from batch
Apr 12, 2024
eff1d2e
Fix index deletion test
Apr 12, 2024
6f52f9f
Spotless Apply
Gaurav614 Apr 15, 2024
7298a08
Refactored to reuse duplicate code
Gaurav614 Apr 16, 2024
1afa3f4
Keep initializing shards in batch
shiv0408 Apr 22, 2024
3b5751e
Spotless apply
shiv0408 Apr 23, 2024
8ae90fc
Update ensureAsyncFetchStorePrimaryRecency and refresh batch flow
shiv0408 Apr 23, 2024
16d6c59
Revert ensureAsyncFetchStorePrimaryRecency update
shiv0408 Apr 23, 2024
0693b9a
Enable batch mode temporarily for gradle check run
shiv0408 Apr 23, 2024
1ff4588
Disabled batch mode by default
shiv0408 Apr 25, 2024
95cec33
Merge branch 'main' into GA-batcher-PR
shiv0408 Apr 25, 2024
3f103c1
Spotless apply
shiv0408 Apr 25, 2024
154d13b
Fix for missed merge conflict
shiv0408 Apr 25, 2024
080c3ba
Added changelog
shiv0408 Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.opensearch.core.common.io.stream.Writeable.Reader;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.persistent.PersistentTasksCustomMetadata;
import org.opensearch.persistent.PersistentTasksNodeService;
Expand Down Expand Up @@ -153,7 +154,13 @@ public ClusterModule(
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this.allocationService = new AllocationService(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
settings
);
}

public static List<Entry> getNamedWriteables() {
Expand Down Expand Up @@ -423,6 +430,7 @@ public AllocationService getAllocationService() {
@Override
protected void configure() {
bind(GatewayAllocator.class).asEagerSingleton();
bind(ShardsBatchGatewayAllocator.class).asEagerSingleton();
bind(AllocationService.class).toInstance(allocationService);
bind(ClusterService.class).toInstance(clusterService);
bind(NodeConnectionsService.class).asEagerSingleton();
Expand All @@ -442,10 +450,10 @@ protected void configure() {
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}

public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator) {
public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, ShardsBatchGatewayAllocator shardsBatchGatewayAllocator) {
final Map<String, ExistingShardsAllocator> existingShardsAllocators = new HashMap<>();
existingShardsAllocators.put(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator);

existingShardsAllocators.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, shardsBatchGatewayAllocator);
for (ClusterPlugin clusterPlugin : clusterPlugins) {
for (Map.Entry<String, ExistingShardsAllocator> existingShardsAllocatorEntry : clusterPlugin.getExistingShardsAllocators()
.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@
return shards.size();
}

public Collection<ShardRouting> getInitializingShards() {
return initializingShards;

Check warning on line 208 in server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java#L208

Added line #L208 was not covered by tests
}

/**
* Add a new shard to this node
* @param shard Shard to create on this Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.RestoreInProgress;
Expand All @@ -54,8 +55,10 @@
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.snapshots.SnapshotsInfoService;

import java.util.ArrayList;
Expand All @@ -73,6 +76,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE;

/**
* This service manages the node allocation of a cluster. For this reason the
Expand All @@ -87,6 +91,7 @@
private static final Logger logger = LogManager.getLogger(AllocationService.class);

private final AllocationDeciders allocationDeciders;
private Settings settings;
private Map<String, ExistingShardsAllocator> existingShardsAllocators;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
Expand All @@ -109,11 +114,23 @@
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY);
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
Settings settings

) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = settings;
}

/**
Expand Down Expand Up @@ -548,6 +565,20 @@
existingShardsAllocator.beforeAllocation(allocation);
}

/*
Use batch mode if enabled and there is no custom allocator set for Allocation service
*/
Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(settings);
if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.V_3_0_0) && existingShardsAllocators.size() == 2) {
/*
If we do not have any custom allocator set then we will be using ShardsBatchGatewayAllocator
Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards
*/
allocateAllUnassignedShards(allocation);
return;

Check warning on line 578 in server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L577-L578

Added lines #L577 - L578 were not covered by tests
}
logger.warn("Falling back to single shard assignment since batch mode disable or multiple custom allocators set");

final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
final ShardRouting shardRouting = primaryIterator.next();
Expand All @@ -569,6 +600,14 @@
}
}

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
allocator.afterPrimariesBeforeReplicas(allocation);

Check warning on line 606 in server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L604-L606

Added lines #L604 - L606 were not covered by tests
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
}

Check warning on line 609 in server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L608-L609

Added lines #L608 - L609 were not covered by tests

private void disassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext();) {
RoutingNode node = it.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.List;

/**
* Searches for, and allocates, shards for which there is an existing on-disk copy somewhere in the cluster. The default implementation is
* {@link GatewayAllocator}, but plugins can supply their own implementations too.
* {@link GatewayAllocator} and {@link ShardsBatchGatewayAllocator}, but plugins can supply their own implementations too.
*
* @opensearch.internal
*/
Expand All @@ -60,6 +61,26 @@
Setting.Property.PrivateIndex
);

/**
* Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk.
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
* in one or more go.
*
* Enable this setting if your ExistingShardAllocator is implementing the
* {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method.
* The default implementation of this method is not optimized and assigns shards one by one.
*
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e,
* {@link ShardsBatchGatewayAllocator}.
*
* This setting is experimental at this point.
*/
Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting(
"cluster.allocator.existing_shards_allocator.batch_enabled",
false,
Setting.Property.NodeScope
);

/**
* Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate.
*/
Expand All @@ -80,6 +101,23 @@
UnassignedAllocationHandler unassignedAllocationHandler
);

/**
* Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible.
* Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard
* and is kept here for backward compatibility.
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();

Check warning on line 112 in server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java#L112

Added line #L112 was not covered by tests
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();

Check warning on line 114 in server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java#L114

Added line #L114 was not covered by tests
if (shardRouting.primary() == primary) {
allocateUnassigned(shardRouting, allocation, iterator);

Check warning on line 116 in server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java#L116

Added line #L116 was not covered by tests
}
}
}

Check warning on line 119 in server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java#L118-L119

Added lines #L118 - L119 were not covered by tests

/**
* Returns an explanation for a single unassigned shard.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.opensearch.cluster.routing.OperationRouting;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
Expand Down Expand Up @@ -102,6 +103,7 @@
import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -268,6 +270,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE,
Gaurav614 marked this conversation as resolved.
Show resolved Hide resolved
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING,
Expand Down Expand Up @@ -330,6 +333,7 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@
}
}

public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {}

Check warning on line 138 in server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L138

Added line #L138 was not covered by tests
Gaurav614 marked this conversation as resolved.
Show resolved Hide resolved

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void processExistingRecoveries(RoutingAllocation allocation, List<List<Sh
if (shard != null && !shard.primary()) {
// need to iterate over all the nodes to find matching shard
if (shouldSkipFetchForRecovery(shard)) {
ineligibleShards.add(shard);
// shard should just be skipped for fetchData, no need to remove from batch
continue;
}
eligibleShards.add(shard);
Expand Down
Loading
Loading