Skip to content

Commit

Permalink
Introduce BatchRunnableExecutor
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
Bukhtawar committed Jul 22, 2024
1 parent bf9cf16 commit de8f6aa
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
Expand All @@ -75,9 +74,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -627,26 +624,10 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, true), () -> allocator.getPrimaryBatchAllocatorTimeout().millis(), true);
allocator.allocateAllUnassignedShards(allocation, true).run();
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, false), () -> allocator.getReplicaBatchAllocatorTimeout().millis(), false);
}

private void executeTimedRunnables(List<TimeoutAwareRunnable> runnables, Supplier<Long> maxRunTimeSupplier, boolean primary) {
logger.info("Executing timed runnables for primary [{}] of size [{}]", primary, runnables.size());
Collections.shuffle(runnables);
long startTime = System.nanoTime();
for (TimeoutAwareRunnable workQueue : runnables) {
if (System.nanoTime() - startTime < TimeValue.timeValueMillis(maxRunTimeSupplier.get()).nanos()) {
logger.info("Starting primary [{}] batch to allocate", primary);
workQueue.run();
} else {
logger.info("Timing out primary [{}] batch to allocate", primary);
workQueue.onTimeout();
}
}
logger.info("Time taken to execute timed runnables in this cycle:[{}ms]", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
allocator.allocateAllUnassignedShards(allocation, false).run();
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BatchRunnableExecutor;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* Searches for, and allocates, shards for which there is an existing on-disk copy somewhere in the cluster. The default implementation is
Expand Down Expand Up @@ -112,7 +112,7 @@ void allocateUnassigned(
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default List<TimeoutAwareRunnable> allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
default BatchRunnableExecutor allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
List<TimeoutAwareRunnable> runnables = new ArrayList<>();
while (iterator.hasNext()) {
Expand All @@ -131,15 +131,7 @@ public void run() {
});
}
}
return runnables;
}

default TimeValue getPrimaryBatchAllocatorTimeout() {
return TimeValue.MINUS_ONE;
}

default TimeValue getReplicaBatchAllocatorTimeout() {
return TimeValue.MINUS_ONE;
return new BatchRunnableExecutor(runnables, () -> TimeValue.MINUS_ONE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* The executor that executes a batch of {@link TimeoutAwareRunnable} and triggers a timeout based on {@link TimeValue} timeout
*/
public class BatchRunnableExecutor implements Runnable {

private final Supplier<TimeValue> timeoutSupplier;

private final List<TimeoutAwareRunnable> timeoutAwareRunnables;

private static final Logger logger = LogManager.getLogger(BatchRunnableExecutor.class);

public BatchRunnableExecutor(List<TimeoutAwareRunnable> timeoutAwareRunnables, Supplier<TimeValue> timeoutSupplier) {
this.timeoutSupplier = timeoutSupplier;
this.timeoutAwareRunnables = timeoutAwareRunnables;
}

@Override
public void run() {
logger.debug("Starting execution of runnable of size [{}]", timeoutAwareRunnables.size());
Collections.shuffle(timeoutAwareRunnables);
long startTime = System.nanoTime();
for (TimeoutAwareRunnable workQueue : timeoutAwareRunnables) {
if (System.nanoTime() - startTime > timeoutSupplier.get().nanos()) {
workQueue.run();
} else {
logger.debug("Executing timeout for runnable of size [{}]", timeoutAwareRunnables.size());
workQueue.onTimeout();
}
}
logger.debug("Time taken to execute timed runnables in this cycle:[{}ms]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.opensearch.common.util.concurrent;

/**
* Runnable that is aware of a timeout and can execute another {@link Runnable} when a timeout is reached
* Runnable that is aware of a timeout
*/
public interface TimeoutAwareRunnable extends Runnable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BatchRunnableExecutor;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;
import org.opensearch.common.util.set.Sets;
Expand All @@ -56,7 +57,6 @@
import java.util.Spliterators;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -168,24 +168,6 @@ protected ShardsBatchGatewayAllocator(long batchSize) {
this.replicaShardsBatchGatewayAllocatorTimeout = null;
}

@Override
public TimeValue getPrimaryBatchAllocatorTimeout() {
return this.primaryShardsBatchGatewayAllocatorTimeout;
}

public void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) {
this.primaryShardsBatchGatewayAllocatorTimeout = primaryShardsBatchGatewayAllocatorTimeout;
}

@Override
public TimeValue getReplicaBatchAllocatorTimeout() {
return this.primaryShardsBatchGatewayAllocatorTimeout;
}

public void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) {
this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout;
}

// for tests

@Override
Expand Down Expand Up @@ -245,14 +227,14 @@ public void allocateUnassigned(
}

@Override
public List<TimeoutAwareRunnable> allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) {
public BatchRunnableExecutor allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) {

assert primaryShardBatchAllocator != null;
assert replicaShardBatchAllocator != null;
return innerAllocateUnassignedBatch(allocation, primaryShardBatchAllocator, replicaShardBatchAllocator, primary);
}

protected List<TimeoutAwareRunnable> innerAllocateUnassignedBatch(
protected BatchRunnableExecutor innerAllocateUnassignedBatch(
RoutingAllocation allocation,
PrimaryShardBatchAllocator primaryBatchShardAllocator,
ReplicaShardBatchAllocator replicaBatchShardAllocator,
Expand All @@ -261,7 +243,7 @@ protected List<TimeoutAwareRunnable> innerAllocateUnassignedBatch(
// create batches for unassigned shards
Set<String> batchesToAssign = createAndUpdateBatches(allocation, primary);
if (batchesToAssign.isEmpty()) {
return Collections.emptyList();
return null;
}
List<TimeoutAwareRunnable> runnables = new ArrayList<>();
if (primary) {
Expand All @@ -287,6 +269,7 @@ public void run() {

}
}));
return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout);
} else {
batchIdToStoreShardBatch.values()
.stream()
Expand All @@ -311,8 +294,8 @@ public void run() {

}
}));
return new BatchRunnableExecutor(runnables, () -> replicaShardsBatchGatewayAllocatorTimeout);
}
return runnables;
}

// visible for testing
Expand Down Expand Up @@ -816,4 +799,12 @@ public int getNumberOfStartedShardBatches() {
public int getNumberOfStoreShardBatches() {
return batchIdToStoreShardBatch.size();
}

private void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) {
this.primaryShardsBatchGatewayAllocatorTimeout = primaryShardsBatchGatewayAllocatorTimeout;
}

private void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) {
this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;
import org.opensearch.common.util.BatchRunnableExecutor;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.AsyncShardFetch;
import org.opensearch.gateway.PrimaryShardBatchAllocator;
Expand All @@ -29,7 +29,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

public class TestShardBatchGatewayAllocator extends ShardsBatchGatewayAllocator {

Expand Down Expand Up @@ -104,7 +103,7 @@ protected boolean hasInitiatedFetching(ShardRouting shard) {
};

@Override
public List<TimeoutAwareRunnable> allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
public BatchRunnableExecutor allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
currentNodes = allocation.nodes();
return innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary);
}
Expand Down

0 comments on commit de8f6aa

Please sign in to comment.