Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added execution canceling. Fixes #481 #602

Merged
merged 1 commit into from
Aug 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/styles/global.css
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@
.create-node text {
font-style: italic;
}

.job-canceled {
background-color: #ffe0b2;
}
7 changes: 7 additions & 0 deletions src/common/viz/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,12 @@ define({
date = `Today (${new Date(timestamp).toLocaleTimeString()})`;
}
return date;
},
ClassForJobStatus: {
success: 'success',
canceled: 'job-canceled',
failed: 'danger',
pending: '',
running: 'warning'
}
});
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
/*globals define, _*/
/*jshint browser: true, camelcase: false*/

/**
* @author brollb / https://github.com/brollb
*/

define([
'decorators/EllipseDecorator/EasyDAG/EllipseDecorator.EasyDAGWidget',
'css!./JobDecorator.EasyDAGWidget.css'
Expand All @@ -20,6 +16,7 @@ define([
pending: '#9e9e9e',
queued: '#cfd8dc',
running: '#fff59d',
canceled: '#ffcc80',
success: '#66bb6a',
fail: '#e57373'
};
Expand All @@ -35,6 +32,8 @@ define([
status: true,
execFiles: true,
stdout: true,
secret: true,
jobId: true,
debug: true
};
EllipseDecorator.call(this, options);
Expand Down
61 changes: 53 additions & 8 deletions src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ define([
this._markForDeletion = {}; // id -> node
this._oldMetadataByName = {}; // name -> id
this.lastAppliedCmd = {};
this.canceled = false;
};

/**
Expand Down Expand Up @@ -212,6 +213,24 @@ define([
}
delete this.lastAppliedCmd[nodeId];
delete this._markForDeletion[nodeId];

this.core.delAttribute(job, 'jobId');
this.core.delAttribute(job, 'secret');
};

ExecuteJob.prototype.resultMsg = function(msg) {
this.sendNotification(msg);
this.createMessage(null, msg);
};

ExecuteJob.prototype.onOperationCanceled = function(op) {
var job = this.core.getParent(op),
name = this.core.getAttribute(op, 'name'),
msg = `"${name}" canceled!`;

this.core.setAttribute(job, 'status', 'canceled');
this.resultMsg(msg);
this.onComplete(op, null);
};

ExecuteJob.prototype.onOperationFail =
Expand All @@ -221,7 +240,7 @@ define([
exec = this.core.getParent(job),
name = this.core.getAttribute(job, 'name'),
jobId = this.core.getPath(job),
status = err ? 'fail' : 'success',
status = err ? 'fail' : (this.canceled ? 'canceled' : 'success'),
msg = err ? `${name} execution failed: ${err}` :
`${name} executed successfully!`,
promise = Q();
Expand All @@ -236,6 +255,9 @@ define([
}
if (err) {
this.core.setAttribute(exec, 'status', 'failed');
} else if (this.canceled) {
// Should I set this to 'canceled'?
this.core.setAttribute(exec, 'status', 'canceled');
} else {
// Check if all the other jobs are successful. If so, set the
// execution status to 'success'
Expand All @@ -261,6 +283,7 @@ define([
});
}

this.createMessage(null, msg);
promise
.then(() => this.save(msg))
.then(() => {
Expand Down Expand Up @@ -421,7 +444,13 @@ define([
this.logger.debug(`Making a commit from ${this.currentHash}`);
this.save(`Queued "${name}" operation in ${this.pipelineName}`)
.then(() => executor.createJob({hash}))
.then(() => this.watchOperation(executor, hash, opNode, job))
.then(info => {
this.core.setAttribute(job, 'jobId', info.hash);
if (info.secret) { // o.w. it is a cached job!
this.core.setAttribute(job, 'secret', info.secret);
}
return this.watchOperation(executor, hash, opNode, job);
})
.catch(err => this.logger.error(`Could not execute "${name}": ${err}`));

};
Expand Down Expand Up @@ -742,7 +771,19 @@ define([
var jobId = this.core.getPath(job),
opId = this.core.getPath(op),
info,
name;
secret,
name = this.core.getAttribute(job, 'name');

// If canceled, stop the operation
if (this.canceled) {
secret = this.core.getAttribute(job, 'secret');
if (secret) {
executor.cancelJob(hash, secret);
this.core.delAttribute(job, 'secret');
this.canceled = true;
return this.onOperationCanceled(op);
}
}

return executor.getInfo(hash)
.then(_info => { // Update the job's stdout
Expand All @@ -756,16 +797,15 @@ define([
return executor.getOutput(hash, currentLine, actualLine+1)
.then(outputLines => {
var stdout = this.core.getAttribute(job, 'stdout'),
output = outputLines.map(o => o.output).join(''),
jobName = this.core.getAttribute(job, 'name');
output = outputLines.map(o => o.output).join('');

// parse deepforge commands
output = this.parseForMetadataCmds(job, output);

if (output) {
stdout += output;
this.core.setAttribute(job, 'stdout', stdout);
return this.save(`Received stdout for ${jobName}`);
return this.save(`Received stdout for ${name}`);
}
});
}
Expand All @@ -775,7 +815,6 @@ define([
if (info.status === 'RUNNING' &&
this.core.getAttribute(job, 'status') !== 'running') {

name = this.core.getAttribute(job, 'name');
this.core.setAttribute(job, 'status', 'running');
this.save(`Started "${name}" operation in ${this.pipelineName}`);
}
Expand All @@ -787,7 +826,13 @@ define([
return;
}

name = this.core.getAttribute(job, 'name');
if (info.status === 'CANCELED') {
// If it was cancelled, the pipeline has been stopped
this.logger.debug(`"${name}" has been CANCELED!`);
this.canceled = true;
return this.onOperationCanceled(op);
}

this.core.setAttribute(job, 'execFiles', info.resultHashes[name + '-all-files']);
return this.blobClient.getArtifact(info.resultHashes.stdout)
.then(artifact => {
Expand Down
31 changes: 26 additions & 5 deletions src/plugins/ExecutePipeline/ExecutePipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ define([
// - keep track if the pipeline has errored
// - if so, don't start any more jobs
this.pipelineError = null;
this.canceled = false;
this.runningJobs = 0;

// metadata records
Expand Down Expand Up @@ -273,17 +274,28 @@ define([
this.onPipelineComplete(err);
};

ExecutePipeline.prototype.onOperationCanceled = function(op) {
var job = this.core.getParent(op);
this.core.setAttribute(job, 'status', 'canceled');
this.runningJobs--;
this.logger.debug(`${this.core.getAttribute(job, 'name')} has been canceled`);
this.onPipelineComplete();
};

ExecutePipeline.prototype.onPipelineComplete = function(err) {
var name = this.core.getAttribute(this.activeNode, 'name');
var name = this.core.getAttribute(this.activeNode, 'name'),
msg = `"${this.pipelineName}" `;

if (err) {
this.runningJobs--;
}

this.pipelineError = this.pipelineError || err;

if (this.pipelineError && this.runningJobs > 0) {
this.logger.info('Pipeline errored but is waiting for the running ' +
this.logger.debug(`${this.runningJobs} remaining jobs`);
if ((this.pipelineError || this.canceled) && this.runningJobs > 0) {
var action = this.pipelineError ? 'error' : 'cancel';
this.logger.info(`Pipeline ${action}ed but is waiting for the running ` +
'jobs to finish');
return;
}
Expand All @@ -293,11 +305,20 @@ define([
this.sendNotification(`"${this.pipelineName}" execution completed on branch "${this.currentForkName}"`);
}

if (this.pipelineError) {
msg += 'failed!';
} else if (this.canceled) {
msg += 'canceled!';
} else {
msg += 'finished!';
}

this.logger.debug(`Pipeline "${name}" complete!`);
this.core.setAttribute(this.activeNode, 'status',
(!this.pipelineError ? 'success' : 'failed'));
(this.pipelineError ? 'failed' : (this.canceled ? 'canceled' : 'success')));

this._finished = true;
this.resultMsg(msg);
this.save('Pipeline execution finished')
.then(() => {
this.result.setSuccess(!this.pipelineError);
Expand All @@ -314,7 +335,7 @@ define([
this.logger.info(`About to execute ${readyOps.length} operations`);

// If the pipeline has errored don't start any more jobs
if (this.pipelineError) {
if (this.pipelineError || this.canceled) {
if (this.runningJobs === 0) {
this.onPipelineComplete();
}
Expand Down
Binary file modified src/seeds/cifar10/cifar10.webgmex
Binary file not shown.
Binary file modified src/seeds/pipeline/pipeline.webgmex
Binary file not shown.
Binary file modified src/seeds/project/project.webgmex
Binary file not shown.
Binary file modified src/seeds/xor/xor.webgmex
Binary file not shown.
36 changes: 35 additions & 1 deletion src/visualizers/panels/ForgeActionButton/Actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,43 @@ define([
icon: 'play_for_work',
priority: 1,
href: download.execFiles
},
// Stop execution button
{
name: 'Stop Current Job',
icon: 'stop',
priority: 1001,
filter: function() {
var job = this.client.getNode(this._currentNodeId);
return this.isJobRunning(job);
},
action: function() {
this.stopJob();
}
}
],
Execution: [
makeRestartButton('Execution', 'ExecutePipeline'),
// Stop execution button
{
name: 'Stop Running Execution',
icon: 'stop',
priority: 1001,
filter: function() {
var exec = this.client.getNode(this._currentNodeId);
return exec.getAttribute('status') === 'running';
},
action: function() {
// Stop every running job
var execNode = this.client.getNode(this._currentNodeId),
jobIds = execNode.getChildrenIds();

jobIds.map(id => this.client.getNode(id))
.filter(job => this.isJobRunning(job)) // get running jobs
.forEach(job => this.stopJob(job)); // stop them
}
}
],
Execution: [makeRestartButton('Execution', 'ExecutePipeline')],
Pipeline: [
{
name: 'Create new node',
Expand Down
49 changes: 42 additions & 7 deletions src/visualizers/panels/ForgeActionButton/ForgeActionButton.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/*globals DeepForge, $, Materialize, define, _ */
/*globals DeepForge, $, WebGMEGlobal, window, Materialize, define, _ */
/*jshint browser: true*/

define([
'blob/BlobClient',
'executor/ExecutorClient',
'js/Constants',
'panel/FloatingActionButton/FloatingActionButton',
'deepforge/viz/PipelineControl',
Expand All @@ -16,6 +17,7 @@ define([
'deepforge/globals'
], function (
BlobClient,
ExecutorClient,
CONSTANTS,
PluginButton,
PipelineControl,
Expand All @@ -33,6 +35,11 @@ define([
var ForgeActionButton= function (layoutManager, params) {
PluginButton.call(this, layoutManager, params);
this._pluginConfig = JSON.parse(PluginConfig);
this._executor = new ExecutorClient({
logger: this.logger.fork('ExecutorClient'),
serverPort: WebGMEGlobal.gmeConfig.server.port,
httpsecure: window.location.protocol === 'https:'
});
this._client = this.client;
this._actions = [];
this._blobClient = new BlobClient({
Expand Down Expand Up @@ -60,7 +67,7 @@ define([
if (!base) { // must be ROOT or FCO
basename = node.getAttribute('name') || 'ROOT_NODE';
actions = (ACTIONS[basename] || [])
.filter(action => !action.filter || action.filter());
.filter(action => !action.filter || action.filter.call(this));
return actions;
}

Expand All @@ -69,7 +76,7 @@ define([
base = this.client.getNode(base.getBaseId());
actions = ACTIONS[basename];
if (actions) {
actions = actions.filter(action => !action.filter || action.filter());
actions = actions.filter(action => !action.filter || action.filter.call(this));
}
}

Expand Down Expand Up @@ -328,14 +335,42 @@ define([

context.managerConfig.namespace = 'pipeline';
method = useSecondary ? 'runBrowserPlugin' : 'runServerPlugin';
this.client[method](pluginId, context, err => {
if (err) {
return Materialize.toast(`${name} failed!`, 4000);
this.client[method](pluginId, context, (err, result) => {
var msg = err ? `${name} failed!` : `${name} executed successfully!`,
duration = err ? 4000 : 2000;

// Check if it was canceled - if so, show that type of message
if (result) {
msg = result.messages[0].message;
duration = 4000;
}

Materialize.toast(`${name} executed successfully!`, 2000);
Materialize.toast(msg, duration);
});
};

ForgeActionButton.prototype.isJobRunning = function(job) {
var status = job.getAttribute('status');

return (status === 'running' || status === 'pending') &&
job.getAttribute('secret') && job.getAttribute('jobId');
};

ForgeActionButton.prototype.stopJob = function(job) {
var jobHash,
secret;

job = job || this.client.getNode(this._currentNodeId);
jobHash = job.getAttribute('jobId');
secret = job.getAttribute('secret');
if (!jobHash || !secret) {
this.logger.error('Cannot stop job. Missing jobHash or secret');
}

return this._executor.cancelJob(jobHash, secret)
.then(() => this.logger.info(`${jobHash} has been cancelled!`))
.fail(err => this.logger.error(`Job cancel failed: ${err}`));
};

return ForgeActionButton;
});
Loading