Skip to content

Commit

Permalink
[cuebot] Introduce depend.satisfy_only_on_frame_success setting. (#1082)
Browse files Browse the repository at this point in the history
  • Loading branch information
splhack authored Jan 18, 2022
1 parent 310af8a commit f7c12f7
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.dao.EmptyResultDataAccessException;

import com.imageworks.spcue.DispatchFrame;
Expand Down Expand Up @@ -92,6 +94,25 @@ public class FrameCompleteHandler {
*/
private boolean shutdown = false;

/**
* Whether or not to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success
*/
private boolean satisfyDependOnlyOnFrameSuccess;

public boolean getSatisfyDependOnlyOnFrameSuccess() {
return satisfyDependOnlyOnFrameSuccess;
}

public void setSatisfyDependOnlyOnFrameSuccess(boolean satisfyDependOnlyOnFrameSuccess) {
this.satisfyDependOnlyOnFrameSuccess = satisfyDependOnlyOnFrameSuccess;
}

@Autowired
public FrameCompleteHandler(Environment env) {
satisfyDependOnlyOnFrameSuccess = env.getProperty(
"depend.satisfy_only_on_frame_success", Boolean.class, true);
}

/**
* Handle the given FrameCompleteReport from RQD.
*
Expand Down Expand Up @@ -235,21 +256,28 @@ public void handlePostFrameCompleteOperations(VirtualProc proc,

dispatchSupport.updateUsageCounters(frame, report.getExitStatus());

if (newFrameState.equals(FrameState.SUCCEEDED)) {
boolean isLayerComplete = false;

if (newFrameState.equals(FrameState.SUCCEEDED)
|| (!satisfyDependOnlyOnFrameSuccess
&& newFrameState.equals(FrameState.EATEN))) {
jobManagerSupport.satisfyWhatDependsOn(frame);
if (jobManager.isLayerComplete(frame)) {
isLayerComplete = jobManager.isLayerComplete(frame);
if (isLayerComplete) {
jobManagerSupport.satisfyWhatDependsOn((LayerInterface) frame);
} else {
/*
* If the layer meets some specific criteria then try to
* update the minimum memory and tags so it can run on a
* wider variety of cores, namely older hardware.
*/
jobManager.optimizeLayer(frame, report.getFrame().getNumCores(),
report.getFrame().getMaxRss(), report.getRunTime());
}
}

if (newFrameState.equals(FrameState.SUCCEEDED) && !isLayerComplete) {
/*
* If the layer meets some specific criteria then try to
* update the minimum memory and tags so it can run on a
* wider variety of cores, namely older hardware.
*/
jobManager.optimizeLayer(frame, report.getFrame().getNumCores(),
report.getFrame().getMaxRss(), report.getRunTime());
}

/*
* The final frame can either be Succeeded or Eaten. If you only
* check if the frame is Succeeded before doing an isJobComplete
Expand Down
3 changes: 3 additions & 0 deletions cuebot/src/main/resources/opencue.properties
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ dispatcher.booking_queue.max_pool_size=6
# Queue capacity for booking.
dispatcher.booking_queue.queue_capacity=1000

# Whether or not to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success
depend.satisfy_only_on_frame_success=true

# Jobs will be archived to the history tables after being completed for this long.
history.archive_jobs_cutoff_hours=72

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,23 @@
import org.junit.Test;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import com.imageworks.spcue.DispatchFrame;
import com.imageworks.spcue.DispatchHost;
import com.imageworks.spcue.FrameInterface;
import com.imageworks.spcue.DispatchJob;
import com.imageworks.spcue.FrameDetail;
import com.imageworks.spcue.JobDetail;
import com.imageworks.spcue.LayerDetail;
import com.imageworks.spcue.VirtualProc;
import com.imageworks.spcue.dao.FrameDao;
import com.imageworks.spcue.dao.LayerDao;
import com.imageworks.spcue.dispatcher.Dispatcher;
import com.imageworks.spcue.dispatcher.DispatchSupport;
import com.imageworks.spcue.dispatcher.FrameCompleteHandler;
import com.imageworks.spcue.grpc.host.HardwareState;
import com.imageworks.spcue.grpc.job.FrameState;
import com.imageworks.spcue.grpc.report.FrameCompleteReport;
import com.imageworks.spcue.grpc.report.RenderHost;
import com.imageworks.spcue.grpc.report.RunningFrameInfo;
Expand Down Expand Up @@ -70,12 +76,18 @@ public class FrameCompleteHandlerTests extends TransactionalTest {
@Resource
JobManager jobManager;

@Resource
FrameDao frameDao;

@Resource
LayerDao layerDao;

@Resource
Dispatcher dispatcher;

@Resource
DispatchSupport dispatchSupport;

private static final String HOSTNAME = "beta";

@Before
Expand Down Expand Up @@ -232,5 +244,88 @@ public void testGpuReportOver() {
(jobManager.isJobComplete(job1) ? 1 : 0) +
(jobManager.isJobComplete(job2) ? 1 : 0));
}

private void executeDepend(
FrameState frameState, int exitStatus, int dependCount, FrameState dependState) {
JobDetail job = jobManager.findJobDetail("pipe-default-testuser_test_depend");
LayerDetail layerFirst = layerDao.findLayerDetail(job, "layer_first");
LayerDetail layerSecond = layerDao.findLayerDetail(job, "layer_second");
FrameDetail frameFirst = frameDao.findFrameDetail(job, "0000-layer_first");
FrameDetail frameSecond = frameDao.findFrameDetail(job, "0000-layer_second");

assertEquals(1, frameSecond.dependCount);
assertEquals(FrameState.DEPEND, frameSecond.state);

jobManager.setJobPaused(job, false);

DispatchHost host = getHost();
List<VirtualProc> procs = dispatcher.dispatchHost(host);
assertEquals(1, procs.size());
VirtualProc proc = procs.get(0);
assertEquals(job.getId(), proc.getJobId());
assertEquals(layerFirst.getId(), proc.getLayerId());
assertEquals(frameFirst.getId(), proc.getFrameId());

RunningFrameInfo info = RunningFrameInfo.newBuilder()
.setJobId(proc.getJobId())
.setLayerId(proc.getLayerId())
.setFrameId(proc.getFrameId())
.setResourceId(proc.getProcId())
.build();
FrameCompleteReport report = FrameCompleteReport.newBuilder()
.setFrame(info)
.setExitStatus(exitStatus)
.build();

DispatchJob dispatchJob = jobManager.getDispatchJob(proc.getJobId());
DispatchFrame dispatchFrame = jobManager.getDispatchFrame(report.getFrame().getFrameId());
dispatchSupport.stopFrame(dispatchFrame, frameState, report.getExitStatus(),
report.getFrame().getMaxRss());
frameCompleteHandler.handlePostFrameCompleteOperations(proc,
report, dispatchJob, dispatchFrame, frameState);

assertTrue(jobManager.isLayerComplete(layerFirst));
assertFalse(jobManager.isLayerComplete(layerSecond));

frameSecond = frameDao.findFrameDetail(job, "0000-layer_second");
assertEquals(dependCount, frameSecond.dependCount);
assertEquals(dependState, frameSecond.state);
}

@Test
@Transactional
@Rollback(true)
public void testDependOnSuccess() {
assertTrue(frameCompleteHandler.getSatisfyDependOnlyOnFrameSuccess());
executeDepend(FrameState.SUCCEEDED, 0, 0, FrameState.WAITING);
}

@Test
@Transactional
@Rollback(true)
public void testDependOnFailure() {
assertTrue(frameCompleteHandler.getSatisfyDependOnlyOnFrameSuccess());
executeDepend(FrameState.EATEN, -1, 1, FrameState.DEPEND);
}

@Test
@Transactional
@Rollback(true)
public void testDependOnSuccessSatifyOnAny() {
frameCompleteHandler.setSatisfyDependOnlyOnFrameSuccess(false);
assertFalse(frameCompleteHandler.getSatisfyDependOnlyOnFrameSuccess());
executeDepend(FrameState.SUCCEEDED, 0, 0, FrameState.WAITING);
frameCompleteHandler.setSatisfyDependOnlyOnFrameSuccess(true);
}

@Test
@Transactional
@Rollback(true)
public void testDependOnFailureSatisfyOnAny() {
frameCompleteHandler.setSatisfyDependOnlyOnFrameSuccess(false);
assertFalse(frameCompleteHandler.getSatisfyDependOnlyOnFrameSuccess());
executeDepend(FrameState.EATEN, -1, 0, FrameState.WAITING);
frameCompleteHandler.setSatisfyDependOnlyOnFrameSuccess(true);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testEnabled() {

launchAndDeleteJob();

assertEquals(Integer.valueOf(3), jdbcTemplate.queryForObject(
assertEquals(Integer.valueOf(4), jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM job_history", Integer.class));
assertEquals(Integer.valueOf(1), jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM frame_history", Integer.class));
Expand Down
35 changes: 35 additions & 0 deletions cuebot/src/test/resources/conf/jobspec/jobspec_gpus_test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,39 @@
</layer>
</layers>
</job>

<job name="test_depend">
<paused>True</paused>
<layers>
<layer name="layer_first" type="Render">
<cmd>true</cmd>
<range>0</range>
<chunk>1</chunk>
<gpus>1</gpus>
<gpu_memory>1</gpu_memory>
<services>
<service>shell</service>
</services>
</layer>
<layer name="layer_second" type="Render">
<cmd>true</cmd>
<range>0</range>
<chunk>1</chunk>
<gpus>1</gpus>
<gpu_memory>1</gpu_memory>
<services>
<service>shell</service>
</services>
</layer>
</layers>
</job>

<depends>
<depend type="LAYER_ON_LAYER" anyframe="False">
<depjob>test_depend</depjob>
<deplayer>layer_second</deplayer>
<onjob>test_depend</onjob>
<onlayer>layer_first</onlayer>
</depend>
</depends>
</spec>

0 comments on commit f7c12f7

Please sign in to comment.