Skip to content

Commit

Permalink
Async Batch shards changes for GatewayAllocator (opensearch-project#8746
Browse files Browse the repository at this point in the history
)

Changes for create/update/delete batches for batch mode for async fetch for both primary & replica. It also added the node scope setting to enable/ disable batch mode.

Signed-off-by: Gaurav Chandani <chngau@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
Signed-off-by: Aman Khare <amkhar@amazon.com>
  • Loading branch information
Gaurav614 authored Apr 26, 2024
1 parent db694a9 commit a69cd08
Show file tree
Hide file tree
Showing 17 changed files with 1,696 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.opensearch.core.common.io.stream.Writeable.Reader;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.persistent.PersistentTasksCustomMetadata;
import org.opensearch.persistent.PersistentTasksNodeService;
Expand Down Expand Up @@ -153,7 +154,13 @@ public ClusterModule(
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this.allocationService = new AllocationService(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
settings
);
}

public static List<Entry> getNamedWriteables() {
Expand Down Expand Up @@ -423,6 +430,7 @@ public AllocationService getAllocationService() {
@Override
protected void configure() {
bind(GatewayAllocator.class).asEagerSingleton();
bind(ShardsBatchGatewayAllocator.class).asEagerSingleton();
bind(AllocationService.class).toInstance(allocationService);
bind(ClusterService.class).toInstance(clusterService);
bind(NodeConnectionsService.class).asEagerSingleton();
Expand All @@ -442,10 +450,10 @@ protected void configure() {
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}

public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator) {
public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, ShardsBatchGatewayAllocator shardsBatchGatewayAllocator) {
final Map<String, ExistingShardsAllocator> existingShardsAllocators = new HashMap<>();
existingShardsAllocators.put(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator);

existingShardsAllocators.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, shardsBatchGatewayAllocator);
for (ClusterPlugin clusterPlugin : clusterPlugins) {
for (Map.Entry<String, ExistingShardsAllocator> existingShardsAllocatorEntry : clusterPlugin.getExistingShardsAllocators()
.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ public int size() {
return shards.size();
}

public Collection<ShardRouting> getInitializingShards() {
return initializingShards;
}

/**
* Add a new shard to this node
* @param shard Shard to create on this Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.RestoreInProgress;
Expand All @@ -54,8 +55,10 @@
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.snapshots.SnapshotsInfoService;

import java.util.ArrayList;
Expand All @@ -73,6 +76,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE;

/**
* This service manages the node allocation of a cluster. For this reason the
Expand All @@ -87,6 +91,7 @@ public class AllocationService {
private static final Logger logger = LogManager.getLogger(AllocationService.class);

private final AllocationDeciders allocationDeciders;
private Settings settings;
private Map<String, ExistingShardsAllocator> existingShardsAllocators;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
Expand All @@ -109,11 +114,23 @@ public AllocationService(
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY);
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
Settings settings

) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = settings;
}

/**
Expand Down Expand Up @@ -548,6 +565,20 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
existingShardsAllocator.beforeAllocation(allocation);
}

/*
Use batch mode if enabled and there is no custom allocator set for Allocation service
*/
Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(settings);
if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.V_3_0_0) && existingShardsAllocators.size() == 2) {
/*
If we do not have any custom allocator set then we will be using ShardsBatchGatewayAllocator
Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards
*/
allocateAllUnassignedShards(allocation);
return;
}
logger.warn("Falling back to single shard assignment since batch mode disable or multiple custom allocators set");

final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
final ShardRouting shardRouting = primaryIterator.next();
Expand All @@ -569,6 +600,14 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
}
}

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext();) {
RoutingNode node = it.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.List;

/**
* Searches for, and allocates, shards for which there is an existing on-disk copy somewhere in the cluster. The default implementation is
* {@link GatewayAllocator}, but plugins can supply their own implementations too.
* {@link GatewayAllocator} and {@link ShardsBatchGatewayAllocator}, but plugins can supply their own implementations too.
*
* @opensearch.internal
*/
Expand All @@ -60,6 +61,26 @@ public interface ExistingShardsAllocator {
Setting.Property.PrivateIndex
);

/**
* Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk.
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
* in one or more go.
*
* Enable this setting if your ExistingShardAllocator is implementing the
* {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method.
* The default implementation of this method is not optimized and assigns shards one by one.
*
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e,
* {@link ShardsBatchGatewayAllocator}.
*
* This setting is experimental at this point.
*/
Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting(
"cluster.allocator.existing_shards_allocator.batch_enabled",
false,
Setting.Property.NodeScope
);

/**
* Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate.
*/
Expand All @@ -80,6 +101,23 @@ void allocateUnassigned(
UnassignedAllocationHandler unassignedAllocationHandler
);

/**
* Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible.
* Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard
* and is kept here for backward compatibility.
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();
if (shardRouting.primary() == primary) {
allocateUnassigned(shardRouting, allocation, iterator);
}
}
}

/**
* Returns an explanation for a single unassigned shard.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.opensearch.cluster.routing.OperationRouting;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
Expand Down Expand Up @@ -102,6 +103,7 @@
import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -268,6 +270,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE,
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING,
Expand Down Expand Up @@ -330,6 +333,7 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ private void executeDecision(
}
}

public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {}

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void processExistingRecoveries(RoutingAllocation allocation, List<List<Sh
if (shard != null && !shard.primary()) {
// need to iterate over all the nodes to find matching shard
if (shouldSkipFetchForRecovery(shard)) {
ineligibleShards.add(shard);
// shard should just be skipped for fetchData, no need to remove from batch
continue;
}
eligibleShards.add(shard);
Expand Down
Loading

0 comments on commit a69cd08

Please sign in to comment.