diff --git a/src/common/compute/interactive/message.js b/src/common/compute/interactive/message.js index a52a50927..6ef99d9d3 100644 --- a/src/common/compute/interactive/message.js +++ b/src/common/compute/interactive/message.js @@ -9,7 +9,8 @@ } }(this, function() { const Constants = makeEnum('STDOUT', 'STDERR', 'RUN', 'ADD_ARTIFACT', 'KILL', - 'ADD_FILE', 'REMOVE_FILE', 'ADD_USER_DATA', 'COMPLETE', 'ERROR', 'SET_ENV'); + 'ADD_FILE', 'REMOVE_FILE', 'ADD_USER_DATA', 'COMPLETE', 'ERROR', 'SET_ENV', + 'SAVE_ARTIFACT'); function makeEnum() { const names = Array.prototype.slice.call(arguments); diff --git a/src/common/compute/interactive/session-with-queue.js b/src/common/compute/interactive/session-with-queue.js index a028fac70..c402900a3 100644 --- a/src/common/compute/interactive/session-with-queue.js +++ b/src/common/compute/interactive/session-with-queue.js @@ -22,7 +22,7 @@ define([ async runTask(task) { const queuedTask = this.queueTask(task); - await queuedTask.promise; + return await queuedTask.promise; } queueTask(task) { @@ -46,9 +46,9 @@ define([ async runNextTask() { const queuedTask = this.tasks[0]; - await super.runTask(queuedTask.unwrap()); + const result = await super.runTask(queuedTask.unwrap()); this.tasks.shift(); - queuedTask.resolve(); + queuedTask.resolve(result); this.checkTaskQueue(); } diff --git a/src/common/compute/interactive/session.js b/src/common/compute/interactive/session.js index 97825c84d..47c179ce2 100644 --- a/src/common/compute/interactive/session.js +++ b/src/common/compute/interactive/session.js @@ -94,9 +94,10 @@ define([ this.ensureIdle('run task'); this.currentTask = task; - await task.run(); + const result = await task.run(); this.currentTask = null; this.checkReady(); + return result; } async whenConnected() { @@ -128,12 +129,24 @@ define([ } async addArtifact(name, dataInfo, type, auth) { + auth = auth || {}; this.ensureIdle('add artifact'); const msg = new Message(Message.ADD_ARTIFACT, [name, dataInfo, type, auth]); const task = new Task(this.channel, msg); await this.runTask(task); } + async saveArtifact(/*path, name, storageId, config*/) { + this.ensureIdle('save artifact'); + const msg = new Message(Message.SAVE_ARTIFACT, [...arguments]); + const task = new Task(this.channel, msg); + const [exitCode, dataInfo] = await this.runTask(task); + if (exitCode) { + throw new CommandFailedError('saveArtifact', {exitCode}); + } + return dataInfo; + } + async addFile(filepath, content) { this.ensureIdle('add file'); const msg = new Message(Message.ADD_FILE, [filepath, content]); diff --git a/src/common/compute/interactive/task.js b/src/common/compute/interactive/task.js index c68f35444..449c221c2 100644 --- a/src/common/compute/interactive/task.js +++ b/src/common/compute/interactive/task.js @@ -28,7 +28,7 @@ define([ this.emitMessage(msg); if (msg.type === Message.COMPLETE) { this.channel.unlisten(handler); - deferred.resolve(); + deferred.resolve(msg.data); } }; this.channel.listen(handler); @@ -37,7 +37,11 @@ define([ } emitMessage(msg) { - this.emit(msg.type, msg.data); + if (msg.type === Message.COMPLETE) { + this.emit(msg.type, ...msg.data); + } else { + this.emit(msg.type, msg.data); + } } static async getMessageData(wsMsg) { diff --git a/src/routers/InteractiveCompute/job-files/start.js b/src/routers/InteractiveCompute/job-files/start.js index 10239e443..7494098af 100644 --- a/src/routers/InteractiveCompute/job-files/start.js +++ b/src/routers/InteractiveCompute/job-files/start.js @@ -29,7 +29,7 @@ class InteractiveClient { if (msg.type === Message.RUN) { const [cmd, ...opts] = InteractiveClient.parseCommand(msg.data); this.subprocess = spawn(cmd, opts); - this.subprocess.on('exit', code => this.sendMessage(Message.COMPLETE, code)); + this.subprocess.on('exit', code => this.sendMessage(Message.COMPLETE, [code])); this.subprocess.stdout.on('data', data => this.sendMessage(Message.STDOUT, data)); this.subprocess.stderr.on('data', data => this.sendMessage(Message.STDERR, data)); } else if (msg.type === Message.KILL) { @@ -46,15 +46,31 @@ class InteractiveClient { Utils, ) => { const {Storage} = Utils; - - async function saveArtifact() { - const client = await Storage.getClient(dataInfo.backend, null, config); + const fetchArtifact = async () => { + const client = await Storage.getClient(dataInfo.backend, undefined, config); const dataPath = path.join(...dirs.concat('data')); const stream = await client.getFileStream(dataInfo); await pipeline(stream, fs.createWriteStream(dataPath)); const filePath = path.join(...dirs.concat('__init__.py')); await fsp.writeFile(filePath, initFile(name, type)); - } + }; + + this.runTask(fetchArtifact); + }); + } else if (msg.type === Message.SAVE_ARTIFACT) { + const [filepath, name, backend, config={}] = msg.data; + requirejs([ + './utils.build', + ], ( + Utils, + ) => { + const {Storage} = Utils; + const saveArtifact = async () => { + const client = await Storage.getClient(backend, null, config); + const stream = await fs.createReadStream(filepath); + const dataInfo = await client.putFileStream(name, stream); + return dataInfo; + }; this.runTask(saveArtifact); }); @@ -76,7 +92,7 @@ class InteractiveClient { await fsp.unlink(filepath); }); } else { - this.sendMessage(Message.COMPLETE, 2); + this.sendMessage(Message.COMPLETE, [2]); } } @@ -99,13 +115,14 @@ class InteractiveClient { async runTask(fn) { let exitCode = 0; + let result; try { - await fn(); + result = await fn(); } catch (err) { exitCode = 1; console.log('Task failed with error:', err); } - this.sendMessage(Message.COMPLETE, exitCode); + this.sendMessage(Message.COMPLETE, [exitCode, result]); } static parseCommand(cmd) { diff --git a/test/integration/InteractiveCompute.js b/test/integration/InteractiveCompute.js index f99b634b1..f459d8b2a 100644 --- a/test/integration/InteractiveCompute.js +++ b/test/integration/InteractiveCompute.js @@ -61,6 +61,13 @@ describe('InteractiveCompute', function() { sleep(100).then(() => session.kill(task)); }); + it('should save artifacts', async function() { + await session.exec('node -e \'fs.writeFileSync("test.txt", "hi")\''); + const dataInfo = await session.saveArtifact('test.txt', 'test', 'gme'); + assert.equal(dataInfo.backend, 'gme'); + assert(dataInfo.data); + }); + function sleep(duration) { return new Promise(resolve => setTimeout(resolve, duration)); }