Skip to content

Commit

Permalink
Remove markers and handle after primary before replica directly
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Jul 9, 2024
1 parent 14f29ea commit 60630d8
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ public class AllocationService {
private SnapshotsInfoService snapshotsInfoService;
private final ClusterManagerMetrics clusterManagerMetrics;

private static final long MAX_ALLOCATION_BATCH_CAPACITY = 100;
private final Queue<Runnable> workQueue = new LinkedBlockingQueue<>();
AtomicBoolean primaryBatchExecution = new AtomicBoolean(true);
AtomicBoolean replicaBatchExecution = new AtomicBoolean(true);
AtomicBoolean replicaAfterPrimaryBatchExecution = new AtomicBoolean(true);

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand Down Expand Up @@ -624,7 +620,6 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
}

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
replicaAfterPrimaryBatchExecution.set(true);
if (workQueue.isEmpty()) {
logger.info("Producing work item at allocateAllUnassignedShards");
produceWorkItem(allocation);
Expand All @@ -644,16 +639,18 @@ private void processWorkItemQueue(RoutingAllocation allocation) {
logger.info("attempting to start work queue with size [{}], elapsed time [{}]",
workQueue.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
Runnable workItem = workQueue.poll();
if (workItem != null) {
workItem.run();
} else {
// null is marker to avoid hitting hot loops of after primary before replica
logger.info("Returning as there is no work item pending");
return;
}
assert workItem != null;
workItem.run();
if (workQueue.isEmpty()) {
logger.info("Producing work item at processWorkItemQueue");
produceWorkItem(allocation);
if (workQueue.size() == 1) {
workQueue.poll().run();
if (workQueue.isEmpty()) {
logger.info("Returning as there is no work item pending");
return;
}
}
}
}
}
Expand All @@ -662,16 +659,7 @@ private void produceWorkItem(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
long startTime = System.nanoTime();
List<Runnable> primaryRunnable = allocator.allocateAllUnassignedShards(allocation, true);
primaryBatchExecution.set(CollectionUtils.isEmpty(primaryRunnable) == false);
workQueue.addAll(primaryRunnable);
workQueue.add(
primaryBatchExecution.get() || replicaBatchExecution.get() || replicaAfterPrimaryBatchExecution.get() ? () -> {
long startTimeAPBR = System.nanoTime();
allocator.afterPrimariesBeforeReplicas(allocation);
logger.info("Adding after primary before replica runnable with elapsed time [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeAPBR));
} : null);
replicaAfterPrimaryBatchExecution.set(false);
workQueue.add(() -> produceReplicaWorkItem(allocation));
logger.info("Produced work item with time taken [{}], with queue size [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime), workQueue.size());
Expand All @@ -680,7 +668,6 @@ private void produceWorkItem(RoutingAllocation allocation) {
private void produceReplicaWorkItem(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
List<Runnable> replicaBatchRunnable = allocator.allocateAllUnassignedShards(allocation, false);
replicaBatchExecution.set(CollectionUtils.isEmpty(replicaBatchRunnable) == false);
logger.info("Inflating replica runnables now with size [{}]", replicaBatchRunnable.size());
workQueue.addAll(replicaBatchRunnable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
BalancedShardsAllocator.ALLOCATE_UNASSIGNED_TIMEOUT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ protected List<Runnable> innerAllocateUnassignedBatch(
ReplicaShardBatchAllocator replicaBatchShardAllocator,
boolean primary
) {
if (primary == false) {
afterPrimariesBeforeReplicas(allocation);
}
// create batches for unassigned shards
Set<String> batchesToAssign = createAndUpdateBatches(allocation, primary);
if (batchesToAssign.isEmpty()) {
Expand Down

0 comments on commit 60630d8

Please sign in to comment.