Skip to content

Commit

Permalink
Added execution canceling. Fixes #481
Browse files Browse the repository at this point in the history
WIP #481 Added buttons and jobId, secret setting

WIP #481 Added 'jobId', 'secret' to Job

WIP #481 Canceling job exec support

WIP #481 Added canceling executing pipelines

WIP #481 Fixed canceling pipelines

WIP #481 Improved result messages from executions

WIP #481 Updated decorator and status setting for ExecJob

WIP #481 Updated job colors in lists

WIP #481 Updated pipeline library

WIP #481 Fixed code climate issues
  • Loading branch information
brollb committed Aug 3, 2016
1 parent a8e5876 commit 035d71d
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 43 deletions.
4 changes: 4 additions & 0 deletions src/common/styles/global.css
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@
.create-node text {
font-style: italic;
}

.job-canceled {
background-color: #ffe0b2;
}
7 changes: 7 additions & 0 deletions src/common/viz/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,12 @@ define({
date = `Today (${new Date(timestamp).toLocaleTimeString()})`;
}
return date;
},
ClassForJobStatus: {
success: 'success',
canceled: 'job-canceled',
failed: 'danger',
pending: '',
running: 'warning'
}
});
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
/*globals define, _*/
/*jshint browser: true, camelcase: false*/

/**
* @author brollb / https://github.com/brollb
*/

define([
'decorators/EllipseDecorator/EasyDAG/EllipseDecorator.EasyDAGWidget',
'css!./JobDecorator.EasyDAGWidget.css'
Expand All @@ -20,6 +16,7 @@ define([
pending: '#9e9e9e',
queued: '#cfd8dc',
running: '#fff59d',
canceled: '#ffcc80',
success: '#66bb6a',
fail: '#e57373'
};
Expand All @@ -35,6 +32,8 @@ define([
status: true,
execFiles: true,
stdout: true,
secret: true,
jobId: true,
debug: true
};
EllipseDecorator.call(this, options);
Expand Down
61 changes: 53 additions & 8 deletions src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ define([
this._markForDeletion = {}; // id -> node
this._oldMetadataByName = {}; // name -> id
this.lastAppliedCmd = {};
this.canceled = false;
};

/**
Expand Down Expand Up @@ -212,6 +213,24 @@ define([
}
delete this.lastAppliedCmd[nodeId];
delete this._markForDeletion[nodeId];

this.core.delAttribute(job, 'jobId');
this.core.delAttribute(job, 'secret');
};

ExecuteJob.prototype.resultMsg = function(msg) {
this.sendNotification(msg);
this.createMessage(null, msg);
};

ExecuteJob.prototype.onOperationCanceled = function(op) {
var job = this.core.getParent(op),
name = this.core.getAttribute(op, 'name'),
msg = `"${name}" canceled!`;

this.core.setAttribute(job, 'status', 'canceled');
this.resultMsg(msg);
this.onComplete(op, null);
};

ExecuteJob.prototype.onOperationFail =
Expand All @@ -221,7 +240,7 @@ define([
exec = this.core.getParent(job),
name = this.core.getAttribute(job, 'name'),
jobId = this.core.getPath(job),
status = err ? 'fail' : 'success',
status = err ? 'fail' : (this.canceled ? 'canceled' : 'success'),
msg = err ? `${name} execution failed: ${err}` :
`${name} executed successfully!`,
promise = Q();
Expand All @@ -236,6 +255,9 @@ define([
}
if (err) {
this.core.setAttribute(exec, 'status', 'failed');
} else if (this.canceled) {
// Should I set this to 'canceled'?
this.core.setAttribute(exec, 'status', 'canceled');
} else {
// Check if all the other jobs are successful. If so, set the
// execution status to 'success'
Expand All @@ -261,6 +283,7 @@ define([
});
}

this.createMessage(null, msg);
promise
.then(() => this.save(msg))
.then(() => {
Expand Down Expand Up @@ -421,7 +444,13 @@ define([
this.logger.debug(`Making a commit from ${this.currentHash}`);
this.save(`Queued "${name}" operation in ${this.pipelineName}`)
.then(() => executor.createJob({hash}))
.then(() => this.watchOperation(executor, hash, opNode, job))
.then(info => {
this.core.setAttribute(job, 'jobId', info.hash);
if (info.secret) { // o.w. it is a cached job!
this.core.setAttribute(job, 'secret', info.secret);
}
return this.watchOperation(executor, hash, opNode, job);
})
.catch(err => this.logger.error(`Could not execute "${name}": ${err}`));

};
Expand Down Expand Up @@ -742,7 +771,19 @@ define([
var jobId = this.core.getPath(job),
opId = this.core.getPath(op),
info,
name;
secret,
name = this.core.getAttribute(job, 'name');

// If canceled, stop the operation
if (this.canceled) {
secret = this.core.getAttribute(job, 'secret');
if (secret) {
executor.cancelJob(hash, secret);
this.core.delAttribute(job, 'secret');
this.canceled = true;
return this.onOperationCanceled(op);
}
}

return executor.getInfo(hash)
.then(_info => { // Update the job's stdout
Expand All @@ -756,16 +797,15 @@ define([
return executor.getOutput(hash, currentLine, actualLine+1)
.then(outputLines => {
var stdout = this.core.getAttribute(job, 'stdout'),
output = outputLines.map(o => o.output).join(''),
jobName = this.core.getAttribute(job, 'name');
output = outputLines.map(o => o.output).join('');

// parse deepforge commands
output = this.parseForMetadataCmds(job, output);

if (output) {
stdout += output;
this.core.setAttribute(job, 'stdout', stdout);
return this.save(`Received stdout for ${jobName}`);
return this.save(`Received stdout for ${name}`);
}
});
}
Expand All @@ -775,7 +815,6 @@ define([
if (info.status === 'RUNNING' &&
this.core.getAttribute(job, 'status') !== 'running') {

name = this.core.getAttribute(job, 'name');
this.core.setAttribute(job, 'status', 'running');
this.save(`Started "${name}" operation in ${this.pipelineName}`);
}
Expand All @@ -787,7 +826,13 @@ define([
return;
}

name = this.core.getAttribute(job, 'name');
if (info.status === 'CANCELED') {
// If it was cancelled, the pipeline has been stopped
this.logger.debug(`"${name}" has been CANCELED!`);
this.canceled = true;
return this.onOperationCanceled(op);
}

this.core.setAttribute(job, 'execFiles', info.resultHashes[name + '-all-files']);
return this.blobClient.getArtifact(info.resultHashes.stdout)
.then(artifact => {
Expand Down
31 changes: 26 additions & 5 deletions src/plugins/ExecutePipeline/ExecutePipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ define([
// - keep track if the pipeline has errored
// - if so, don't start any more jobs
this.pipelineError = null;
this.canceled = false;
this.runningJobs = 0;

// metadata records
Expand Down Expand Up @@ -273,17 +274,28 @@ define([
this.onPipelineComplete(err);
};

ExecutePipeline.prototype.onOperationCanceled = function(op) {
var job = this.core.getParent(op);
this.core.setAttribute(job, 'status', 'canceled');
this.runningJobs--;
this.logger.debug(`${this.core.getAttribute(job, 'name')} has been canceled`);
this.onPipelineComplete();
};

ExecutePipeline.prototype.onPipelineComplete = function(err) {
var name = this.core.getAttribute(this.activeNode, 'name');
var name = this.core.getAttribute(this.activeNode, 'name'),
msg = `"${this.pipelineName}" `;

if (err) {
this.runningJobs--;
}

this.pipelineError = this.pipelineError || err;

if (this.pipelineError && this.runningJobs > 0) {
this.logger.info('Pipeline errored but is waiting for the running ' +
this.logger.debug(`${this.runningJobs} remaining jobs`);
if ((this.pipelineError || this.canceled) && this.runningJobs > 0) {
var action = this.pipelineError ? 'error' : 'cancel';
this.logger.info(`Pipeline ${action}ed but is waiting for the running ` +
'jobs to finish');
return;
}
Expand All @@ -293,11 +305,20 @@ define([
this.sendNotification(`"${this.pipelineName}" execution completed on branch "${this.currentForkName}"`);
}

if (this.pipelineError) {
msg += 'failed!';
} else if (this.canceled) {
msg += 'canceled!';
} else {
msg += 'finished!';
}

this.logger.debug(`Pipeline "${name}" complete!`);
this.core.setAttribute(this.activeNode, 'status',
(!this.pipelineError ? 'success' : 'failed'));
(this.pipelineError ? 'failed' : (this.canceled ? 'canceled' : 'success')));

this._finished = true;
this.resultMsg(msg);
this.save('Pipeline execution finished')
.then(() => {
this.result.setSuccess(!this.pipelineError);
Expand All @@ -314,7 +335,7 @@ define([
this.logger.info(`About to execute ${readyOps.length} operations`);

// If the pipeline has errored don't start any more jobs
if (this.pipelineError) {
if (this.pipelineError || this.canceled) {
if (this.runningJobs === 0) {
this.onPipelineComplete();
}
Expand Down
Binary file modified src/seeds/cifar10/cifar10.webgmex
Binary file not shown.
Binary file modified src/seeds/pipeline/pipeline.webgmex
Binary file not shown.
Binary file modified src/seeds/project/project.webgmex
Binary file not shown.
Binary file modified src/seeds/xor/xor.webgmex
Binary file not shown.
36 changes: 35 additions & 1 deletion src/visualizers/panels/ForgeActionButton/Actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,43 @@ define([
icon: 'play_for_work',
priority: 1,
href: download.execFiles
},
// Stop execution button
{
name: 'Stop Current Job',
icon: 'stop',
priority: 1001,
filter: function() {
var job = this.client.getNode(this._currentNodeId);
return this.isJobRunning(job);
},
action: function() {
this.stopJob();
}
}
],
Execution: [
makeRestartButton('Execution', 'ExecutePipeline'),
// Stop execution button
{
name: 'Stop Running Execution',
icon: 'stop',
priority: 1001,
filter: function() {
var exec = this.client.getNode(this._currentNodeId);
return exec.getAttribute('status') === 'running';
},
action: function() {
// Stop every running job
var execNode = this.client.getNode(this._currentNodeId),
jobIds = execNode.getChildrenIds();

jobIds.map(id => this.client.getNode(id))
.filter(job => this.isJobRunning(job)) // get running jobs
.forEach(job => this.stopJob(job)); // stop them
}
}
],
Execution: [makeRestartButton('Execution', 'ExecutePipeline')],
Pipeline: [
{
name: 'Create new node',
Expand Down
49 changes: 42 additions & 7 deletions src/visualizers/panels/ForgeActionButton/ForgeActionButton.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/*globals DeepForge, $, Materialize, define, _ */
/*globals DeepForge, $, WebGMEGlobal, window, Materialize, define, _ */
/*jshint browser: true*/

define([
'blob/BlobClient',
'executor/ExecutorClient',
'js/Constants',
'panel/FloatingActionButton/FloatingActionButton',
'deepforge/viz/PipelineControl',
Expand All @@ -16,6 +17,7 @@ define([
'deepforge/globals'
], function (
BlobClient,
ExecutorClient,
CONSTANTS,
PluginButton,
PipelineControl,
Expand All @@ -33,6 +35,11 @@ define([
var ForgeActionButton= function (layoutManager, params) {
PluginButton.call(this, layoutManager, params);
this._pluginConfig = JSON.parse(PluginConfig);
this._executor = new ExecutorClient({
logger: this.logger.fork('ExecutorClient'),
serverPort: WebGMEGlobal.gmeConfig.server.port,
httpsecure: window.location.protocol === 'https:'
});
this._client = this.client;
this._actions = [];
this._blobClient = new BlobClient({
Expand Down Expand Up @@ -60,7 +67,7 @@ define([
if (!base) { // must be ROOT or FCO
basename = node.getAttribute('name') || 'ROOT_NODE';
actions = (ACTIONS[basename] || [])
.filter(action => !action.filter || action.filter());
.filter(action => !action.filter || action.filter.call(this));
return actions;
}

Expand All @@ -69,7 +76,7 @@ define([
base = this.client.getNode(base.getBaseId());
actions = ACTIONS[basename];
if (actions) {
actions = actions.filter(action => !action.filter || action.filter());
actions = actions.filter(action => !action.filter || action.filter.call(this));
}
}

Expand Down Expand Up @@ -328,14 +335,42 @@ define([

context.managerConfig.namespace = 'pipeline';
method = useSecondary ? 'runBrowserPlugin' : 'runServerPlugin';
this.client[method](pluginId, context, err => {
if (err) {
return Materialize.toast(`${name} failed!`, 4000);
this.client[method](pluginId, context, (err, result) => {
var msg = err ? `${name} failed!` : `${name} executed successfully!`,
duration = err ? 4000 : 2000;

// Check if it was canceled - if so, show that type of message
if (result) {
msg = result.messages[0].message;
duration = 4000;
}

Materialize.toast(`${name} executed successfully!`, 2000);
Materialize.toast(msg, duration);
});
};

ForgeActionButton.prototype.isJobRunning = function(job) {
var status = job.getAttribute('status');

return (status === 'running' || status === 'pending') &&
job.getAttribute('secret') && job.getAttribute('jobId');
};

ForgeActionButton.prototype.stopJob = function(job) {
var jobHash,
secret;

job = job || this.client.getNode(this._currentNodeId);
jobHash = job.getAttribute('jobId');
secret = job.getAttribute('secret');
if (!jobHash || !secret) {
this.logger.error('Cannot stop job. Missing jobHash or secret');
}

return this._executor.cancelJob(jobHash, secret)
.then(() => this.logger.info(`${jobHash} has been cancelled!`))
.fail(err => this.logger.error(`Job cancel failed: ${err}`));
};

return ForgeActionButton;
});
Loading

0 comments on commit 035d71d

Please sign in to comment.