Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge startFrame and reserveProc in a single transaction #1372

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,12 @@ public void dispatch(DispatchFrame frame, VirtualProc proc) {
/*
* The frame is reserved, the proc is created, now update the frame to
* the running state.
*/
dispatchSupport.startFrame(proc, frame);

/*
*
* Creates a proc to run on the specified frame. Throws a
* ResourceReservationFailureException if the proc cannot be created due
* to lack of resources.
*/
dispatchSupport.reserveProc(proc, frame);
dispatchSupport.startFrameAndProc(proc, frame);

/*
* Communicate with RQD to run the frame.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
* limitations under the License.
*/



package com.imageworks.spcue.dispatcher;

import com.google.common.cache.Cache;
Expand All @@ -41,7 +39,7 @@ public class BookingQueue {
private HealthyThreadPool healthyThreadPool;

public BookingQueue(int healthThreshold, int minUnhealthyPeriodMin, int queueCapacity,
int corePoolSize, int maxPoolSize) {
int corePoolSize, int maxPoolSize) {
this.healthThreshold = healthThreshold;
this.minUnhealthyPeriodMin = minUnhealthyPeriodMin;
this.queueCapacity = queueCapacity;
Expand Down Expand Up @@ -118,4 +116,3 @@ public long getMaximumPoolSize() {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,7 @@ public void dispatch(DispatchFrame frame, VirtualProc proc) {
* The frame is reserved, the proc is created, now update
* the frame to the running state.
*/
dispatchSupport.startFrame(proc, frame);

/*
* Creates a proc to run on the specified frame. Throws
* a ResourceReservationFailureException if the proc
* cannot be created due to lack of resources.
*/
dispatchSupport.reserveProc(proc, frame);
dispatchSupport.startFrameAndProc(proc, frame);

/*
* Communicate with RQD to run the frame.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,6 @@ public interface DispatchSupport {
*/
boolean stopFrame(FrameInterface frame, FrameState state, int exitStatus);

/**
* Updates the frame to the Running state. This should
* be done after RQD has accepted the frame. Setting
* the frame's state to running will result in a
* new entry in the frame_history table for the
* running frame.
*
* @param proc
* @param frame
*/
void startFrame(VirtualProc proc, DispatchFrame frame);

/**
* Updates a frame with completed stats.
*
Expand All @@ -180,6 +168,12 @@ boolean stopFrame(FrameInterface frame, FrameState state,
int exitStatus, long maxrss);

/**
* Updates the frame to the Running state. This should
* be done after RQD has accepted the frame. Setting
* the frame's state to running will result in a
* new entry in the frame_history table for the
* running frame.
*
* Reserve the resources in the specified proc for the
* specified frame. If the proc does not exist, its
* inserted, otherwise its updated.
Expand All @@ -191,7 +185,7 @@ boolean stopFrame(FrameInterface frame, FrameState state,
* @param proc
* @param frame
*/
public void reserveProc(VirtualProc proc, DispatchFrame frame);
public void startFrameAndProc(VirtualProc proc, DispatchFrame frame);

/**
* This method clears out a proc that was lost track of.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,21 @@ public void runFrame(VirtualProc proc, DispatchFrame frame) {
try {
rqdClient.launchFrame(prepareRqdRunFrame(proc, frame), proc);
dispatchedProcs.getAndIncrement();
}
catch (Exception e) {
} catch (Exception e) {
throw new DispatcherException(proc.getName() +
" could not be booked on " + frame.getName() + ", " + e);
}
}

@Override
@Transactional(propagation = Propagation.REQUIRED)
public void startFrameAndProc(VirtualProc proc, DispatchFrame frame) {
logger.trace("starting frame: " + frame);

frameDao.updateFrameStarted(proc, frame);

reserveProc(proc, frame);
}

@Transactional(propagation = Propagation.REQUIRED, readOnly=true)
public boolean isCueBookable(FacilityInterface f) {
Expand Down Expand Up @@ -431,14 +440,6 @@ public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) {
return builder.build();
}


@Override
@Transactional(propagation = Propagation.REQUIRED)
public void startFrame(VirtualProc proc, DispatchFrame frame) {
logger.trace("starting frame: " + frame);
frameDao.updateFrameStarted(proc, frame);
}

@Override
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void fixFrame(DispatchFrame frame) {
Expand Down Expand Up @@ -477,9 +478,7 @@ public void updateUsageCounters(FrameInterface frame, int exitStatus) {
}
}

@Override
@Transactional(propagation = Propagation.REQUIRED)
public void reserveProc(VirtualProc proc, DispatchFrame frame) {
private void reserveProc(VirtualProc proc, DispatchFrame frame) {

proc.jobId = frame.getJobId();
proc.frameId = frame.getFrameId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public int sleepTime() {
}
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
if (isShutdown()) {
Expand All @@ -201,6 +202,7 @@ protected void beforeExecute(Thread t, Runnable r) {
}
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);

Expand Down
Loading