Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Fix computation of excess workload and available capacity #371

Merged
merged 2 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,63 +30,64 @@ public NodeProvisioner.StrategyDecision apply(final NodeProvisioner.StrategyStat
final Label label = strategyState.getLabel();

final LoadStatistics.LoadStatisticsSnapshot snapshot = strategyState.getSnapshot();
final int availableCapacity =
snapshot.getAvailableExecutors() // live executors
+ snapshot.getConnectingExecutors() // executors present but not yet connected
+ strategyState.getPlannedCapacitySnapshot() // capacity added by previous strategies from previous rounds
+ strategyState.getAdditionalPlannedCapacity(); // capacity added by previous strategies _this round_

int currentDemand = snapshot.getQueueLength() - availableCapacity;
LOGGER.log(currentDemand < 1 ? Level.FINE : Level.INFO,
"label [{0}]: currentDemand {1} availableCapacity {2} (availableExecutors {3} connectingExecutors {4} plannedCapacitySnapshot {5} additionalPlannedCapacity {6})",
new Object[]{label, currentDemand, availableCapacity, snapshot.getAvailableExecutors(),
snapshot.getConnectingExecutors(), strategyState.getPlannedCapacitySnapshot(),
strategyState.getAdditionalPlannedCapacity()});

for (final Cloud cloud : getClouds()) {
if (currentDemand < 1) {
LOGGER.log(Level.FINE, "label [{0}]: currentDemand is less than 1, not provisioning", label);
final int availableCapacity = snapshot.getAvailableExecutors() // available executors
+ strategyState.getPlannedCapacitySnapshot() // capacity added by previous strategies from previous rounds
+ strategyState.getAdditionalPlannedCapacity(); // capacity added by previous strategies _this round_

int qLen = snapshot.getQueueLength();
int excessWorkload = qLen - availableCapacity;
LOGGER.log(Level.FINE, "label [{0}]: queueLength {1} availableCapacity {2} (availableExecutors {3} plannedCapacitySnapshot {4} additionalPlannedCapacity {5})",
new Object[]{label, qLen, availableCapacity, snapshot.getAvailableExecutors(),
strategyState.getPlannedCapacitySnapshot(), strategyState.getAdditionalPlannedCapacity()});

if (excessWorkload <= 0) {
LOGGER.log(Level.INFO, "label [{0}]: No excess workload, provisioning not needed.", label);
return NodeProvisioner.StrategyDecision.PROVISIONING_COMPLETED;
}

for (final Cloud c : getClouds()) {
if (excessWorkload < 1) {
break;
}

if (!(cloud instanceof EC2FleetCloud)) {
if (!(c instanceof EC2FleetCloud)) {
LOGGER.log(Level.FINE, "label [{0}]: cloud {1} is not an EC2FleetCloud, continuing...",
new Object[]{label, cloud.getDisplayName()});
new Object[]{label, c.getDisplayName()});
continue;
}

Cloud.CloudState cloudState = new Cloud.CloudState(label, strategyState.getAdditionalPlannedCapacity());
if (!cloud.canProvision(cloudState)) {
if (!c.canProvision(cloudState)) {
LOGGER.log(Level.INFO, "label [{0}]: cloud {1} can not provision for this label, continuing...",
new Object[]{label, cloud.getDisplayName()});
new Object[]{label, c.getDisplayName()});
continue;
}

final EC2FleetCloud ec2 = (EC2FleetCloud) cloud;
if (!ec2.isNoDelayProvision()) {
if (!((EC2FleetCloud) c).isNoDelayProvision()) {
LOGGER.log(Level.FINE, "label [{0}]: cloud {1} does not use No Delay Provision Strategy, continuing...",
new Object[]{label, cloud.getDisplayName()});
new Object[]{label, c.getDisplayName()});
continue;
}

LOGGER.log(Level.INFO, "label [{0}]: cloud {1} can provision for this label",
new Object[]{label, cloud.getDisplayName()});
final Collection<NodeProvisioner.PlannedNode> plannedNodes = cloud.provision(cloudState, currentDemand);
for (NodeProvisioner.PlannedNode plannedNode : plannedNodes) {
currentDemand -= plannedNode.numExecutors;
LOGGER.log(Level.FINE, "label [{0}]: cloud {1} can provision for this label",
new Object[]{label, c.getDisplayName()});
final Collection<NodeProvisioner.PlannedNode> plannedNodes = c.provision(cloudState, excessWorkload);
for (NodeProvisioner.PlannedNode pn : plannedNodes) {
excessWorkload -= pn.numExecutors;
LOGGER.log(Level.INFO, "Started provisioning {0} from {1} with {2,number,integer} "
+ "executors. Remaining excess workload: {3,number,#.###}",
new Object[]{pn.displayName, c.name, pn.numExecutors, excessWorkload});
}
LOGGER.log(Level.FINE, "Planned {0} new nodes", plannedNodes.size());
strategyState.recordPendingLaunches(plannedNodes);
LOGGER.log(Level.FINE, "After provisioning currentDemand={0}", new Object[]{currentDemand});
}

if (currentDemand < 1) {
LOGGER.log(Level.FINE, "Provisioning completed");
return NodeProvisioner.StrategyDecision.PROVISIONING_COMPLETED;
} else {
if (excessWorkload > 0) {
LOGGER.log(Level.FINE, "Provisioning not complete, consulting remaining strategies");
return NodeProvisioner.StrategyDecision.CONSULT_REMAINING_STRATEGIES;
}

LOGGER.log(Level.FINE, "Provisioning completed");
return NodeProvisioner.StrategyDecision.PROVISIONING_COMPLETED;
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,32 @@ public void givenEC2Clouds_shouldReduceAsAmountOfExecutors() {
verify(ec2FleetCloud1, times(1)).provision(any(Cloud.CloudState.class), eq(2));
}

@Test
public void givenPlannedCapacity_shouldComputeExcessWorkloadCorrectly() {
when(snapshot.getQueueLength()).thenReturn(6);
when(snapshot.getAvailableExecutors()).thenReturn(0);
when(state.getPlannedCapacitySnapshot()).thenReturn(3);
when(state.getLabel()).thenReturn(label);

final EC2FleetCloud ec2FleetCloud1 = mock(EC2FleetCloud.class);
clouds.add(ec2FleetCloud1);
when(ec2FleetCloud1.canProvision(any(Cloud.CloudState.class))).thenReturn(true);
when(ec2FleetCloud1.isNoDelayProvision()).thenReturn(true);
final NodeProvisioner.PlannedNode plannedNode1 = new NodeProvisioner.PlannedNode("fc1-0", new CompletableFuture<>(), 2);
when(ec2FleetCloud1.provision(any(Cloud.CloudState.class), anyInt())).thenReturn(Arrays.asList(plannedNode1));

final EC2FleetCloud ec2FleetCloud2 = mock(EC2FleetCloud.class);
clouds.add(ec2FleetCloud2);
when(ec2FleetCloud2.canProvision(any(Cloud.CloudState.class))).thenReturn(true);
when(ec2FleetCloud2.isNoDelayProvision()).thenReturn(true);
final NodeProvisioner.PlannedNode plannedNode2 = new NodeProvisioner.PlannedNode("fc2-0", new CompletableFuture<>(), 1);
when(ec2FleetCloud2.provision(any(Cloud.CloudState.class), anyInt())).thenReturn(Arrays.asList(plannedNode2));

final NodeProvisioner.StrategyDecision decision = strategy.apply(state);

Assert.assertEquals(NodeProvisioner.StrategyDecision.PROVISIONING_COMPLETED, decision);
verify(ec2FleetCloud1, times(1)).provision(any(Cloud.CloudState.class), eq(3));
verify(ec2FleetCloud2, times(1)).provision(any(Cloud.CloudState.class), eq(1));
}

}