diff --git a/src/common/compute/backends/ComputeBackend.js b/src/common/compute/backends/ComputeBackend.js index 3815f2335..7cf202e46 100644 --- a/src/common/compute/backends/ComputeBackend.js +++ b/src/common/compute/backends/ComputeBackend.js @@ -1,43 +1,41 @@ /* globals define, requirejs */ define([ - 'module', - 'q', ], function( - module, - Q, ) { - const ComputeBackend = function(id, metadata) { - const {name, dashboard, client} = metadata; - this.id = id; - this.name = name; - this.dashboardPath = dashboard; - this.clientPath = client || './Client'; - }; - - ComputeBackend.prototype.getClient = function(logger, blobClient, config) { - if (require.isBrowser) { - throw new Error('Compute clients cannot be loaded in the browser.'); + class ComputeBackend { + constructor (id, metadata) { + const {name, dashboard, client} = metadata; + this.id = id; + this.name = name; + this.dashboardPath = dashboard; + this.clientPath = client || './Client'; } - const Client = requirejs(`deepforge/compute/backends/${this.id}/${this.clientPath}`); - return new Client(logger, blobClient, config); - }; + getClient (logger, blobClient, config) { + if (require.isBrowser) { + throw new Error('Compute clients cannot be loaded in the browser.'); + } + + const Client = requirejs(`deepforge/compute/backends/${this.id}/${this.clientPath}`); + return new Client(logger, blobClient, config); + } - ComputeBackend.prototype.getDashboard = async function() { - if (this.dashboardPath) { - const absPath = `deepforge/compute/backends/${this.id}/${this.dashboardPath}`; - return await this.require(absPath); - } else { - return null; + async getDashboard () { + if (this.dashboardPath) { + const absPath = `deepforge/compute/backends/${this.id}/${this.dashboardPath}`; + return await this.require(absPath); + } else { + return null; + } } - }; - ComputeBackend.prototype.require = function(path) { // helper for loading async - const deferred = Q.defer(); - require([path], deferred.resolve, deferred.reject); - return deferred.promise; - }; + require (path) { // helper for loading async + return new Promise((resolve, reject) => + require([path], resolve, reject) + ); + } + } return ComputeBackend; }); diff --git a/src/common/compute/backends/ComputeClient.js b/src/common/compute/backends/ComputeClient.js index efcbf90b7..c1abe2300 100644 --- a/src/common/compute/backends/ComputeClient.js +++ b/src/common/compute/backends/ComputeClient.js @@ -1,47 +1,50 @@ /* globals define */ define([], function() { - const ComputeClient = function(logger, blobClient) { - this.logger = logger.fork('compute'); - this.blobClient = blobClient; - this._events = {}; - }; - - ComputeClient.prototype.cancelJob = function(/*job*/) { - unimplemented(this.logger, 'cancelJob'); - }; - - ComputeClient.prototype.createJob = async function(/*hash*/) { - unimplemented(this.logger, 'createJob'); - }; - - ComputeClient.prototype.getStatus = async function(/*jobInfo*/) { - unimplemented(this.logger, 'getStatus'); - }; - - ComputeClient.prototype.getResultsInfo = async function(/*jobInfo*/) { - unimplemented(this.logger, 'getResultsInfo'); - }; - - ComputeClient.prototype.getConsoleOutput = async function(/*hash*/) { - unimplemented(this.logger, 'getConsoleOutput'); - }; - - ComputeClient.prototype.isFinishedStatus = function(status) { - const notFinishedStatuses = [this.QUEUED, this.PENDING, this.RUNNING]; - return !notFinishedStatuses.includes(status); - }; - - // Some functions for event support - ComputeClient.prototype.on = function(ev, cb) { - this._events[ev] = this._events[ev] || []; - this._events[ev].push(cb); - }; - - ComputeClient.prototype.emit = function(ev) { - const args = Array.prototype.slice.call(arguments, 1); - const handlers = this._events[ev] || []; - return Promise.all(handlers.map(fn => fn.apply(this, args))); - }; + + class ComputeClient { + constructor (logger, blobClient) { + this.logger = logger.fork('compute'); + this.blobClient = blobClient; + this._events = {}; + } + + cancelJob (/*job*/) { + unimplemented(this.logger, 'cancelJob'); + } + + createJob (/*hash*/) { + unimplemented(this.logger, 'createJob'); + } + + getStatus (/*jobInfo*/) { + unimplemented(this.logger, 'getStatus'); + } + + getResultsInfo (/*jobInfo*/) { + unimplemented(this.logger, 'getResultsInfo'); + } + + getConsoleOutput (/*hash*/) { + unimplemented(this.logger, 'getConsoleOutput'); + } + + isFinishedStatus (status) { + const notFinishedStatuses = [this.QUEUED, this.PENDING, this.RUNNING]; + return !notFinishedStatuses.includes(status); + } + + // Some functions for event support + on (ev, cb) { + this._events[ev] = this._events[ev] || []; + this._events[ev].push(cb); + } + + emit (ev) { + const args = Array.prototype.slice.call(arguments, 1); + const handlers = this._events[ev] || []; + return Promise.all(handlers.map(fn => fn.apply(this, args))); + } + } ComputeClient.prototype.QUEUED = 'queued'; ComputeClient.prototype.PENDING = 'pending'; diff --git a/src/common/compute/backends/gme/Client.js b/src/common/compute/backends/gme/Client.js index 6404fe014..d3e86de7e 100644 --- a/src/common/compute/backends/gme/Client.js +++ b/src/common/compute/backends/gme/Client.js @@ -17,131 +17,132 @@ define([ module, ) { const PROJECT_ROOT = path.join(path.dirname(module.uri), '..', '..', '..', '..', '..'); - const GMEExecutor = function(logger, blobClient, config={}) { - ComputeClient.apply(this, arguments); - const configPath = path.join(PROJECT_ROOT, 'config'); - const gmeConfig = require.nodeRequire(configPath); - this.pollInterval = 1500; - this.previousGMEInfo = {}; - this.webgmeToken = config.webgmeToken; - this.executor = new ExecutorClient({ - logger: this.logger, - serverPort: gmeConfig.server.port, - httpsecure: false, - webgmeToken: this.webgmeToken, - }); - }; - GMEExecutor.prototype = Object.create(ComputeClient.prototype); - - GMEExecutor.prototype.getConsoleOutput = async function(job) { - const info = await this.executor.getInfo(job.hash); - const isComplete = this.isFinishedStatus(this._getComputeStatus(info.status)); - - if (isComplete) { - const mdHash = await this._getResultHash(job, 'stdout'); - const hash = await this._getContentHash(mdHash, 'job_stdout.txt'); - assert(hash, 'Console output data not found.'); - return await this.blobClient.getObjectAsString(hash); - } else { - return (await this.executor.getOutput(job.hash)) - .map(o => o.output).join(''); + class GMEExecutor extends ComputeClient { + constructor(logger, blobClient, config={}) { + super(logger, blobClient, config); + const configPath = path.join(PROJECT_ROOT, 'config'); + const gmeConfig = require.nodeRequire(configPath); + this.pollInterval = 1500; + this.previousGMEInfo = {}; + this.webgmeToken = config.webgmeToken; + this.executor = new ExecutorClient({ + logger: this.logger, + serverPort: gmeConfig.server.port, + httpsecure: false, + webgmeToken: this.webgmeToken, + }); } - }; - - GMEExecutor.prototype.cancelJob = function(job) { - return this.executor.cancelJob(job.hash, job.secret); - }; - - GMEExecutor.prototype._getResultHash = async function(job, name) { - const {resultHashes} = await this.executor.getInfo(job.hash); - return resultHashes[name]; - }; - - GMEExecutor.prototype.getResultsInfo = async function(job) { - const mdHash = await this._getResultHash(job, 'results'); - const hash = await this._getContentHash(mdHash, 'results.json'); - assert(hash, 'Metadata about result types not found.'); - return await this.blobClient.getObjectAsJSON(hash); - }; - - GMEExecutor.prototype._getContentHash = async function (artifactHash, fileName) { - const artifact = await this.blobClient.getArtifact(artifactHash); - const contents = artifact.descriptor.content; - - return contents[fileName] && contents[fileName].content; - }; - - GMEExecutor.prototype.getStatus = async function(job) { - const info = await this.executor.getInfo(job.hash); - return this.getJobResultsFrom(info).status; - }; - - GMEExecutor.prototype._getComputeStatus = function(gmeStatus) { - const gmeStatusToStatus = { - 'CREATED': this.QUEUED, - 'SUCCESS': this.SUCCESS, - 'CANCELED': this.CANCELED, - 'FAILED_TO_EXECUTE': this.FAILED, - 'RUNNING': this.RUNNING, - }; - return gmeStatusToStatus[gmeStatus] || gmeStatus; - }; - - GMEExecutor.prototype.getJobResultsFrom = function(gmeInfo) { - const gmeStatus = gmeInfo.status; - return new JobResults(this._getComputeStatus(gmeStatus)); - }; - - GMEExecutor.prototype.getInfo = function(job) { - return this.executor.getInfo(job.hash); - }; - - GMEExecutor.prototype.createJob = async function(hash) { - await this.checkExecutionEnv(); - - const result = await this.executor.createJob({hash}); - - this.poll(hash); - - return result; - }; - - GMEExecutor.prototype.checkExecutionEnv = async function () { - this.logger.info('Checking execution environment'); - const workers = await ExecutorHelper.getWorkers(this.webgmeToken); - if (workers.length === 0) { - this.logger.info('Cannot execute job(s): No connected workers'); - throw new Error('No connected workers'); + + async getConsoleOutput (job) { + const info = await this.executor.getInfo(job.hash); + const isComplete = this.isFinishedStatus(this._getComputeStatus(info.status)); + + if (isComplete) { + const mdHash = await this._getResultHash(job, 'stdout'); + const hash = await this._getContentHash(mdHash, 'job_stdout.txt'); + assert(hash, 'Console output data not found.'); + return await this.blobClient.getObjectAsString(hash); + } else { + return (await this.executor.getOutput(job.hash)) + .map(o => o.output).join(''); + } + } + + cancelJob (job) { + return this.executor.cancelJob(job.hash, job.secret); + } + + async _getResultHash (job, name) { + const {resultHashes} = await this.executor.getInfo(job.hash); + return resultHashes[name]; + } + + async getResultsInfo (job) { + const mdHash = await this._getResultHash(job, 'results'); + const hash = await this._getContentHash(mdHash, 'results.json'); + assert(hash, 'Metadata about result types not found.'); + return await this.blobClient.getObjectAsJSON(hash); + } + + async _getContentHash (artifactHash, fileName) { + const artifact = await this.blobClient.getArtifact(artifactHash); + const contents = artifact.descriptor.content; + + return contents[fileName] && contents[fileName].content; + } + + async getStatus (job) { + const info = await this.executor.getInfo(job.hash); + return this.getJobResultsFrom(info).status; + } + + _getComputeStatus (gmeStatus) { + const gmeStatusToStatus = { + 'CREATED': this.QUEUED, + 'SUCCESS': this.SUCCESS, + 'CANCELED': this.CANCELED, + 'FAILED_TO_EXECUTE': this.FAILED, + 'RUNNING': this.RUNNING, + }; + return gmeStatusToStatus[gmeStatus] || gmeStatus; } - }; - - GMEExecutor.prototype.poll = async function(id) { - const gmeInfo = await this.executor.getInfo(id); - - // Check for new stdout. Emit 'data' with the content - const prevInfo = this.previousGMEInfo[id] || {}; - const currentLine = prevInfo.outputNumber + 1; - const actualLine = gmeInfo.outputNumber; - if (actualLine !== null && actualLine >= currentLine) { - const stdout = (await this.executor.getOutput(id, currentLine, actualLine + 1)) - .map(o => o.output).join(''); - this.emit('data', id, stdout); + + getJobResultsFrom (gmeInfo) { + const gmeStatus = gmeInfo.status; + return new JobResults(this._getComputeStatus(gmeStatus)); + } + + getInfo (job) { + return this.executor.getInfo(job.hash); + } + + async createJob (hash) { + await this.checkExecutionEnv(); + + const result = await this.executor.createJob({hash}); + + this.poll(hash); + + return result; } - if (gmeInfo.status !== prevInfo.status) { - const results = this.getJobResultsFrom(gmeInfo); - this.emit('update', id, results.status); + async checkExecutionEnv () { + this.logger.info('Checking execution environment'); + const workers = await ExecutorHelper.getWorkers(this.webgmeToken); + if (workers.length === 0) { + this.logger.info('Cannot execute job(s): No connected workers'); + throw new Error('No connected workers'); + } } - this.previousGMEInfo[id] = gmeInfo; - if (gmeInfo.status === 'CREATED' || gmeInfo.status === 'RUNNING') { - setTimeout(() => this.poll(id), this.pollInterval); - } else { - const results = this.getJobResultsFrom(gmeInfo); - this.emit('end', id, results); - delete this.previousGMEInfo[id]; + async poll (id) { + const gmeInfo = await this.executor.getInfo(id); + + // Check for new stdout. Emit 'data' with the content + const prevInfo = this.previousGMEInfo[id] || {}; + const currentLine = prevInfo.outputNumber + 1; + const actualLine = gmeInfo.outputNumber; + if (actualLine !== null && actualLine >= currentLine) { + const stdout = (await this.executor.getOutput(id, currentLine, actualLine + 1)) + .map(o => o.output).join(''); + this.emit('data', id, stdout); + } + + if (gmeInfo.status !== prevInfo.status) { + const results = this.getJobResultsFrom(gmeInfo); + this.emit('update', id, results.status); + } + + this.previousGMEInfo[id] = gmeInfo; + if (gmeInfo.status === 'CREATED' || gmeInfo.status === 'RUNNING') { + setTimeout(() => this.poll(id), this.pollInterval); + } else { + const results = this.getJobResultsFrom(gmeInfo); + this.emit('end', id, results); + delete this.previousGMEInfo[id]; + } } - }; + } return GMEExecutor; }); diff --git a/src/common/compute/backends/local/Client.js b/src/common/compute/backends/local/Client.js index 3e4eeed12..b6f05e527 100644 --- a/src/common/compute/backends/local/Client.js +++ b/src/common/compute/backends/local/Client.js @@ -46,195 +46,195 @@ define([ const symlink = promisify(fs.symlink); const touch = async name => await closeFile(await openFile(name, 'w')); - const LocalExecutor = function() { - ComputeClient.apply(this, arguments); - - this.jobQueue = []; - this.currentJob = null; - this.subprocess = null; - this.canceled = false; - }; - - LocalExecutor.prototype = Object.create(ComputeClient.prototype); - - LocalExecutor.prototype.cancelJob = function(jobInfo) { - const {hash} = jobInfo; - - if (this.currentJob === hash) { - this.canceled = true; - this.subprocess.kill(); - } else if (this.jobQueue.includes(hash)) { - const i = this.jobQueue.indexOf(hash); - this.jobQueue.splice(i, 1); - this._onJobCompleted(hash, new JobResults(this.CANCELED)); + class LocalExecutor extends ComputeClient { + constructor () { + super(...arguments); + + this.jobQueue = []; + this.currentJob = null; + this.subprocess = null; + this.canceled = false; } - }; - - LocalExecutor.prototype.getStatus = async function(jobInfo) { - const {hash} = jobInfo; - if (hash === this.currentJob) { - return this.RUNNING; - } else if (this.jobQueue.includes(hash)) { - return this.QUEUED; - } else { - return await this._getJobFile(hash, 'status.txt', 'Job Not Found'); + + cancelJob (jobInfo) { + const {hash} = jobInfo; + + if (this.currentJob === hash) { + this.canceled = true; + this.subprocess.kill(); + } else if (this.jobQueue.includes(hash)) { + const i = this.jobQueue.indexOf(hash); + this.jobQueue.splice(i, 1); + this._onJobCompleted(hash, new JobResults(this.CANCELED)); + } } - }; - - LocalExecutor.prototype.getConsoleOutput = async function(job) { - const msg = 'Console output data not found.'; - return await this._getJobFile(job.hash, STDOUT_FILE, msg); - }; - - LocalExecutor.prototype.getResultsInfo = async function(job) { - const msg = 'Metadata about result types not found.'; - const resultsTxt = await this._getJobFile(job.hash, 'results.json', msg); - return JSON.parse(resultsTxt); - }; - - LocalExecutor.prototype._getJobFile = async function(hash, name, notFoundMsg) { - const filename = path.join(this._getWorkingDir(hash), name); - try { - return await readFile(filename, 'utf8'); - } catch (err) { - if (err.code === 'ENOENT') { - throw new Error(notFoundMsg); + + async getStatus (jobInfo) { + const {hash} = jobInfo; + if (hash === this.currentJob) { + return this.RUNNING; + } else if (this.jobQueue.includes(hash)) { + return this.QUEUED; + } else { + return await this._getJobFile(hash, 'status.txt', 'Job Not Found'); } - throw err; } - }; - LocalExecutor.prototype.createJob = async function(hash) { - this.jobQueue.push(hash); - this._processNextJob(); + async getConsoleOutput (job) { + const msg = 'Console output data not found.'; + return await this._getJobFile(job.hash, STDOUT_FILE, msg); + } - return {hash}; - }; + async getResultsInfo (job) { + const msg = 'Metadata about result types not found.'; + const resultsTxt = await this._getJobFile(job.hash, 'results.json', msg); + return JSON.parse(resultsTxt); + } - LocalExecutor.prototype._onJobCompleted = async function(hash, jobResults) { - if (hash === this.currentJob) { - this.currentJob = null; + async _getJobFile (hash, name, notFoundMsg) { + const filename = path.join(this._getWorkingDir(hash), name); + try { + return await readFile(filename, 'utf8'); + } catch (err) { + if (err.code === 'ENOENT') { + throw new Error(notFoundMsg); + } + throw err; + } } - const tmpdir = this._getWorkingDir(hash); - //await this._cleanDirectory(tmpdir); - await writeFile(path.join(tmpdir, 'status.txt'), jobResults.status); + async createJob (hash) { + this.jobQueue.push(hash); + this._processNextJob(); + + return {hash}; + } + + async _onJobCompleted (hash, jobResults) { + if (hash === this.currentJob) { + this.currentJob = null; + } + + const tmpdir = this._getWorkingDir(hash); + //await this._cleanDirectory(tmpdir); + await writeFile(path.join(tmpdir, 'status.txt'), jobResults.status); - this.emit('update', hash, jobResults.status); - this.emit('end', hash, jobResults); - this._processNextJob(); - }; + this.emit('update', hash, jobResults.status); + this.emit('end', hash, jobResults); + this._processNextJob(); + } - LocalExecutor.prototype._cleanDirectory = async function(workdir) { - const SKIP_FILES = ['results.json', STDOUT_FILE]; - const files = (await readdir(workdir)) - .filter(name => !SKIP_FILES.includes(name)) - .map(name => path.join(workdir, name)); + async _cleanDirectory (workdir) { + const SKIP_FILES = ['results.json', STDOUT_FILE]; + const files = (await readdir(workdir)) + .filter(name => !SKIP_FILES.includes(name)) + .map(name => path.join(workdir, name)); - return Promise.all(files.map(file => rm_rf(file))); - }; + return Promise.all(files.map(file => rm_rf(file))); + } - LocalExecutor.prototype._processNextJob = function() { - if (this.currentJob) return; + _processNextJob () { + if (this.currentJob) return; - this.currentJob = this.jobQueue.shift(); - if (this.currentJob) { - return this._createJob(this.currentJob); + this.currentJob = this.jobQueue.shift(); + if (this.currentJob) { + return this._createJob(this.currentJob); + } } - }; - - LocalExecutor.prototype._getWorkingDir = function(hash) { - return path.join(os.tmpdir(), `deepforge-local-exec-${hash}`); - }; - - LocalExecutor.prototype._createJob = async function(hash) { - const jobInfo = {hash}; - this.emit('update', jobInfo.hash, this.PENDING); - const tmpdir = this._getWorkingDir(hash); - try { - await mkdir(tmpdir); - } catch (err) { - if (err.code === 'EEXIST') { - await rm_rf(tmpdir); + + _getWorkingDir (hash) { + return path.join(os.tmpdir(), `deepforge-local-exec-${hash}`); + } + + async _createJob (hash) { + const jobInfo = {hash}; + this.emit('update', jobInfo.hash, this.PENDING); + const tmpdir = this._getWorkingDir(hash); + try { await mkdir(tmpdir); - } else { - throw err; + } catch (err) { + if (err.code === 'EEXIST') { + await rm_rf(tmpdir); + await mkdir(tmpdir); + } else { + throw err; + } } + this.logger.info('created working directory at', tmpdir); + + // Fetch the required files from deepforge + await this.prepareWorkspace(hash, tmpdir); + + // Spin up a subprocess + const config = JSON.parse(await readFile(tmpdir.replace(path.sep, '/') + '/executor_config.json', 'utf8')); + + const env = process.env; + env.DEEPFORGE_ROOT = DEEPFORGE_ROOT; + const options = { + cwd: tmpdir, + env, + }; + this.logger.info(`Running ${config.cmd} ${config.args.join(' ')}`); + this.subprocess = spawn(config.cmd, config.args, options); + this.emit('update', jobInfo.hash, this.RUNNING); + this.subprocess.stdout.on('data', data => this.onConsoleOutput(tmpdir, hash, data)); + + this.subprocess.on('close', async code => { + const status = this.canceled ? this.CANCELED : + (code !== 0 ? this.FAILED : this.SUCCESS); + + const jobResults = new JobResults(status); + this.canceled = false; + + this._onJobCompleted(hash, jobResults); + }); } - this.logger.info('created working directory at', tmpdir); - - // Fetch the required files from deepforge - await this.prepareWorkspace(hash, tmpdir); - - // Spin up a subprocess - const config = JSON.parse(await readFile(tmpdir.replace(path.sep, '/') + '/executor_config.json', 'utf8')); - - const env = process.env; - env.DEEPFORGE_ROOT = DEEPFORGE_ROOT; - const options = { - cwd: tmpdir, - env, - }; - this.logger.info(`Running ${config.cmd} ${config.args.join(' ')}`); - this.subprocess = spawn(config.cmd, config.args, options); - this.emit('update', jobInfo.hash, this.RUNNING); - this.subprocess.stdout.on('data', data => this.onConsoleOutput(tmpdir, hash, data)); - - this.subprocess.on('close', async code => { - const status = this.canceled ? this.CANCELED : - (code !== 0 ? this.FAILED : this.SUCCESS); - - const jobResults = new JobResults(status); - this.canceled = false; - this._onJobCompleted(hash, jobResults); - }); - }; - - LocalExecutor.prototype.onConsoleOutput = async function(workdir, hash, data) { - const filename = path.join(workdir, STDOUT_FILE); - appendFile(filename, data); - this.emit('data', hash, data); - }; - - LocalExecutor.prototype._getAllFiles = async function(workdir) { - const dirs = (await readdir(workdir)) - .filter(n => !n.includes('node_modules')) - .map(name => path.join(workdir, name)); - const files = []; - - // Read each directory - while (dirs.length) { - const abspath = dirs.shift(); - const isDirectory = (await statFile(abspath)).isDirectory(); - if (isDirectory) { - const childpaths = (await readdir(abspath)) - .map(name => path.join(abspath, name)); - dirs.push.apply(dirs, childpaths); - } else { - files.push(abspath); - } + async onConsoleOutput (workdir, hash, data) { + const filename = path.join(workdir, STDOUT_FILE); + appendFile(filename, data); + this.emit('data', hash, data); } - return files; - }; + async _getAllFiles (workdir) { + const dirs = (await readdir(workdir)) + .filter(n => !n.includes('node_modules')) + .map(name => path.join(workdir, name)); + const files = []; + + // Read each directory + while (dirs.length) { + const abspath = dirs.shift(); + const isDirectory = (await statFile(abspath)).isDirectory(); + if (isDirectory) { + const childpaths = (await readdir(abspath)) + .map(name => path.join(abspath, name)); + dirs.push.apply(dirs, childpaths); + } else { + files.push(abspath); + } + } + + return files; + } - LocalExecutor.prototype.prepareWorkspace = async function(hash, dirname) { - this.logger.info('about to fetch job data'); - const content = new Buffer(new Uint8Array(await this.blobClient.getObject(hash))); // TODO: Handle errors... - const zipPath = path.join(dirname, `${hash}.zip`); - await writeFile(zipPath, content); - this.logger.info(`Fetched job data: ${zipPath}`); + async prepareWorkspace (hash, dirname) { + this.logger.info('about to fetch job data'); + const content = new Buffer(new Uint8Array(await this.blobClient.getObject(hash))); // TODO: Handle errors... + const zipPath = path.join(dirname, `${hash}.zip`); + await writeFile(zipPath, content); + this.logger.info(`Fetched job data: ${zipPath}`); - this.logger.info(`unzipping ${zipPath} in ${dirname}`); - await unzip(zipPath, dirname); + this.logger.info(`unzipping ${zipPath} in ${dirname}`); + await unzip(zipPath, dirname); - // Set up a symbolic link to the node_modules - await symlink(NODE_MODULES, path.join(dirname, 'node_modules')); + // Set up a symbolic link to the node_modules + await symlink(NODE_MODULES, path.join(dirname, 'node_modules')); - // Prepare for the stdout - await touch(path.join(dirname, STDOUT_FILE)); - }; + // Prepare for the stdout + await touch(path.join(dirname, STDOUT_FILE)); + } + } async function unzip(filename, dirname) { const args = UNZIP_ARGS.concat(path.basename(filename)); diff --git a/src/common/compute/backends/sciserver-compute/Client.js b/src/common/compute/backends/sciserver-compute/Client.js index 0074cd24f..5b8e29be3 100644 --- a/src/common/compute/backends/sciserver-compute/Client.js +++ b/src/common/compute/backends/sciserver-compute/Client.js @@ -22,245 +22,250 @@ define([ ) { const Headers = fetch.Headers; const POLL_INTERVAL = 1000; - const SciServerClient = function(logger, blobClient, config) { - ComputeClient.apply(this, arguments); - this.username = config.username; - this.password = config.password; - this.computeDomain = config.computeDomain; - this.previousJobState = {}; - this.consoleOutputLen = {}; - }; - - SciServerClient.prototype = Object.create(ComputeClient.prototype); - - SciServerClient.prototype.createJob = async function(hash) { - const filesInfo = await this._uploadFiles(hash); - const job = await this._createJob(filesInfo); - const jobInfo = { - id: job.id, - hash, - }; - this._poll(jobInfo); - - return jobInfo; - }; - - SciServerClient.prototype._createConfig = async function(filesInfo) { - const {dirname, volumePool, volume} = filesInfo; - const domain = await this._getComputeDomain(); - const userVolumes = domain.userVolumes.map(volume => ({ - userVolumeId: volume.id, - needsWriteAccess: SciServerClient.isWritable(volume), - })); - const filepath = `/home/idies/workspace/${volumePool}/${volume}/${dirname}`; - - return { - command: `bash ${filepath}/prepare-and-run.sh ${filepath}`, - dockerComputeEndpoint: domain.apiEndpoint, - dockerImageName: 'SciServer Essentials', - resultsFolderURI: '', - submitterDID: 'DeepForge Job', - volumeContainers: [], - userVolumes: userVolumes - }; - }; - - SciServerClient.prototype._uploadFiles = async function(hash) { - const dirname = `execution-files/${hash}`; - const metadata = await this.blobClient.getMetadata(hash); - const config = { - username: this.username, - password: this.password, - volume: `${this.username}/scratch`, - volumePool: 'Temporary' - }; - const storage = await Storage.getClient('sciserver-files', this.logger, config); - const files = Object.entries(metadata.content) - .map(async pair => { - const [filename, metadata] = pair; - const contents = await this.blobClient.getObject(metadata.content); - const filepath = `${dirname}/${filename}`; - await storage.putFile(filepath, contents); - }); - - await storage.putFile(`${dirname}/prepare-and-run.sh`, PREPARE_AND_RUN); - await Promise.all(files); - const filesInfo = Object.assign({}, config); - filesInfo.dirname = dirname; - return filesInfo; - }; - - SciServerClient.prototype._createJob = async function(filesInfo) { - const config = await this._createConfig(filesInfo); - const url = 'https://apps.sciserver.org/racm//jobm/rest/jobs/docker'; - - const opts = { - method: 'POST', - body: JSON.stringify(config), - headers: new Headers(), - }; - - opts.headers.append('Content-Type', 'application/json'); - - const response = await this.fetch(url, opts); - const {status} = response; - if (status === 400) { - throw new Error('Received "Bad Request" from SciServer. Is the token invalid?'); - } else if (status > 399) { - const contents = await response.json(); - throw new Error(`SciServer Files request failed: ${contents.error}`); + class SciServerClient extends ComputeClient { + constructor(logger, blobClient, config) { + super(logger, blobClient, config); + this.username = config.username; + this.password = config.password; + this.computeDomain = config.computeDomain; + this.previousJobState = {}; + this.consoleOutputLen = {}; } - return await response.json(); - }; - - SciServerClient.prototype.fetch = async function(url, opts={}) { - const token = await this.token(); - opts.headers = opts.headers || new Headers(); - opts.headers.append('X-Auth-Token', token); - return fetch(url, opts); - }; - - SciServerClient.prototype.token = async function() { - return login(this.username, this.password); - }; - - SciServerClient.prototype.getJobState = async function(jobInfo) { - const url = 'https://apps.sciserver.org/racm//jobm/rest/dockerjobs'; - - const opts = { - headers: new Headers(), - }; - opts.headers.append('X-Auth-Token', await this.token()); - - const response = await fetch(url, opts); - const {status} = response; - if (status === 400) { - throw new Error('Received "Bad Request" from SciServer. Is the token invalid?'); - } else if (status > 399) { - const contents = await response.json(); - throw new Error(`SciServer Files request failed: ${contents.error}`); + + async createJob (hash) { + const filesInfo = await this._uploadFiles(hash); + const job = await this._createJob(filesInfo); + const jobInfo = { + id: job.id, + hash, + }; + this._poll(jobInfo); + + return jobInfo; } - const results = await response.json(); - return results.find(result => result.id === jobInfo.id); - }; + async _createConfig (filesInfo) { + const {dirname, volumePool, volume} = filesInfo; + const domain = await this._getComputeDomain(); + const userVolumes = domain.userVolumes.map(volume => ({ + userVolumeId: volume.id, + needsWriteAccess: SciServerClient.isWritable(volume), + })); + const filepath = `/home/idies/workspace/${volumePool}/${volume}/${dirname}`; + + return { + command: `bash ${filepath}/prepare-and-run.sh ${filepath}`, + dockerComputeEndpoint: domain.apiEndpoint, + dockerImageName: 'SciServer Essentials', + resultsFolderURI: '', + submitterDID: 'DeepForge Job', + volumeContainers: [], + userVolumes: userVolumes + }; + } - SciServerClient.prototype.getJobResults = async function(jobInfo) { - const result = await this.getJobState(jobInfo); - if (result) { - const status = SciServerClient.getStatus(result.status); - return new JobResults(status); + async _uploadFiles (hash) { + const dirname = `execution-files/${hash}`; + const metadata = await this.blobClient.getMetadata(hash); + const config = { + username: this.username, + password: this.password, + volume: `${this.username}/scratch`, + volumePool: 'Temporary' + }; + const storage = await Storage.getClient('sciserver-files', this.logger, config); + const files = Object.entries(metadata.content) + .map(async pair => { + const [filename, metadata] = pair; + const contents = await this.blobClient.getObject(metadata.content); + const filepath = `${dirname}/${filename}`; + await storage.putFile(filepath, contents); + }); + + await storage.putFile(`${dirname}/prepare-and-run.sh`, PREPARE_AND_RUN); + await Promise.all(files); + const filesInfo = Object.assign({}, config); + filesInfo.dirname = dirname; + return filesInfo; } - }; - SciServerClient.prototype._poll = async function(jobInfo) { - const state = await this.getJobState(jobInfo); - if (state) { - const status = SciServerClient.getStatus(state.status); - const prevState = this.previousJobState[jobInfo.id]; - const prevStatus = prevState && SciServerClient.getStatus(prevState.status); + async _createJob (filesInfo) { + const config = await this._createConfig(filesInfo); + const url = 'https://apps.sciserver.org/racm//jobm/rest/jobs/docker'; + + const opts = { + method: 'POST', + body: JSON.stringify(config), + headers: new Headers(), + }; + + opts.headers.append('Content-Type', 'application/json'); + + const response = await this.fetch(url, opts); + const {status} = response; + if (status === 400) { + throw new Error('Received "Bad Request" from SciServer. Is the token invalid?'); + } else if (status > 399) { + const contents = await response.json(); + throw new Error(`SciServer Files request failed: ${contents.error}`); + } + return await response.json(); + } - if (prevStatus !== status) { - this.emit('update', jobInfo.hash, status); + async fetch (url, opts={}) { + const token = await this.token(); + opts.headers = opts.headers || new Headers(); + opts.headers.append('X-Auth-Token', token); + return fetch(url, opts); + } + + async token () { + return login(this.username, this.password); + } + + async getJobState (jobInfo) { + const url = 'https://apps.sciserver.org/racm//jobm/rest/dockerjobs'; + + const opts = { + headers: new Headers(), + }; + opts.headers.append('X-Auth-Token', await this.token()); + + const response = await fetch(url, opts); + const {status} = response; + if (status === 400) { + throw new Error('Received "Bad Request" from SciServer. Is the token invalid?'); + } else if (status > 399) { + const contents = await response.json(); + throw new Error(`SciServer Files request failed: ${contents.error}`); } - this.previousJobState[jobInfo.id] = state; - if (this.isFinishedStatus(status)) { - return this._onJobComplete(jobInfo, state); - } else if (status === this.RUNNING) { // update stdout - const stdout = await this.getConsoleOutput(jobInfo); - const prevLen = this.consoleOutputLen[jobInfo.id] || 0; + const results = await response.json(); + return results.find(result => result.id === jobInfo.id); + } - this.emit('data', jobInfo.hash, stdout.substring(prevLen)); - this.consoleOutputLen[jobInfo.id] = stdout.length; + async getJobResults (jobInfo) { + const result = await this.getJobState(jobInfo); + if (result) { + const status = SciServerClient.getStatus(result.status); + return new JobResults(status); } } - return setTimeout(() => this._poll(jobInfo), POLL_INTERVAL); - }; + async _poll (jobInfo) { + const state = await this.getJobState(jobInfo); + if (state) { + const status = SciServerClient.getStatus(state.status); + const prevState = this.previousJobState[jobInfo.id]; + const prevStatus = prevState && SciServerClient.getStatus(prevState.status); + + if (prevStatus !== status) { + this.emit('update', jobInfo.hash, status); + } + + this.previousJobState[jobInfo.id] = state; + if (this.isFinishedStatus(status)) { + return this._onJobComplete(jobInfo, state); + } else if (status === this.RUNNING) { // update stdout + const stdout = await this.getConsoleOutput(jobInfo); + const prevLen = this.consoleOutputLen[jobInfo.id] || 0; + + this.emit('data', jobInfo.hash, stdout.substring(prevLen)); + this.consoleOutputLen[jobInfo.id] = stdout.length; + } + } + + return setTimeout(() => this._poll(jobInfo), POLL_INTERVAL); + } + + async _onJobComplete (jobInfo, state) { + const {hash} = jobInfo; + const stdout = await this.getConsoleOutput(hash); + this.emit('data', hash, stdout); + + const status = SciServerClient.getStatus(state.status); + const results = new JobResults(status); - SciServerClient.prototype._onJobComplete = async function(jobInfo, state) { - const {hash} = jobInfo; - const stdout = await this.getConsoleOutput(hash); - this.emit('data', hash, stdout); + if (status === this.SUCCESS) { + // TODO: Move the debug files to the blob + } - const status = SciServerClient.getStatus(state.status); - const results = new JobResults(status); + this._deleteFileDir(state); + this.emit('end', hash, results); + delete this.previousJobState[jobInfo.id]; + delete this.consoleOutputLen[jobInfo.id]; + } - if (status === this.SUCCESS) { - // TODO: Move the debug files to the blob + async cancelJob (jobInfo) { + const {id} = jobInfo; + const url = `https://apps.sciserver.org/racm/jobm/rest/jobs/${id}/cancel`; + await this.fetch(url, {method: 'POST'}); } - this._deleteFileDir(state); - this.emit('end', hash, results); - delete this.previousJobState[jobInfo.id]; - delete this.consoleOutputLen[jobInfo.id]; - }; - - SciServerClient.prototype.cancelJob = async function(jobInfo) { - const {id} = jobInfo; - const url = `https://apps.sciserver.org/racm/jobm/rest/jobs/${id}/cancel`; - await this.fetch(url, {method: 'POST'}); - }; - - SciServerClient.prototype.getResultsInfo = async function(jobInfo) { - const text = await this._getFile(jobInfo, 'results.json'); - assert(text, 'Metadata about result types not found.'); - return JSON.parse(text); - }; - - SciServerClient.prototype.getStatus = async function(jobInfo) { - const results = await this.getJobResults(jobInfo); - return results && results.status; - }; - - SciServerClient.prototype.getConsoleOutput = async function(jobInfo) { - return await this._getFile(jobInfo, 'stdout.txt') || ''; - }; - - SciServerClient.prototype._getFile = async function(jobInfo, filename) { - const state = await this.getJobState(jobInfo); - if (state) { - const baseUrl = 'https://apps.sciserver.org/fileservice/api/file/'; - const filepath = state.resultsFolderURI.replace(/\/?$/, '/') + filename; - const fileUrl = baseUrl + this._getEncodedFilePath(filepath); + async getResultsInfo (jobInfo) { + const text = await this._getFile(jobInfo, 'results.json'); + assert(text, 'Metadata about result types not found.'); + return JSON.parse(text); + } - const response = await this.fetch(fileUrl); + async getStatus (jobInfo) { + const results = await this.getJobResults(jobInfo); + return results && results.status; + } + + async getConsoleOutput (jobInfo) { + return await this._getFile(jobInfo, 'stdout.txt') || ''; + } + + async _getFile (jobInfo, filename) { + const state = await this.getJobState(jobInfo); + if (state) { + const baseUrl = 'https://apps.sciserver.org/fileservice/api/file/'; + const filepath = state.resultsFolderURI.replace(/\/?$/, '/') + filename; + const fileUrl = baseUrl + this._getEncodedFilePath(filepath); + + const response = await this.fetch(fileUrl); + return await response.text(); + } + } + + async _deleteFileDir (state) { + const baseUrl = 'https://apps.sciserver.org/fileservice/api/data/'; + const filepath = state.command.split(' ').pop(); + const fileUrl = baseUrl + this._getEncodedFilePath(filepath); + const response = await this.fetch(fileUrl, {method: 'DELETE'}); return await response.text(); } - }; - - SciServerClient.prototype._deleteFileDir = async function(state) { - const baseUrl = 'https://apps.sciserver.org/fileservice/api/data/'; - const filepath = state.command.split(' ').pop(); - const fileUrl = baseUrl + this._getEncodedFilePath(filepath); - const response = await this.fetch(fileUrl, {method: 'DELETE'}); - return await response.text(); - }; - - SciServerClient.prototype._getEncodedFilePath = function(filepath) { - const dirs = filepath.split('/').slice(4); - const filename = dirs.pop(); - const drive = dirs.slice(0, 3).join('/') + '/'; - const dirname = '/' + dirs.slice(3).join('/'); - - return drive + encodeURIComponent(dirname) + '/' + filename; - }; - - SciServerClient.prototype._getComputeDomain = async function() { - const url = 'https://apps.sciserver.org/racm/jobm/rest/computedomains?batch=true'; - const response = await this.fetch(url); - const domains = await response.json(); - - const domain = domains.find(domain => domain.name === this.computeDomain); - assert(domain, `Compute domain not found: ${this.computeDomain}`); - return domain; - }; - - SciServerClient.getStatus = function(code) { - const index = Math.log2(code); - return SciServerClient.STATUSES[index]; - }; + + _getEncodedFilePath (filepath) { + const dirs = filepath.split('/').slice(4); + const filename = dirs.pop(); + const drive = dirs.slice(0, 3).join('/') + '/'; + const dirname = '/' + dirs.slice(3).join('/'); + + return drive + encodeURIComponent(dirname) + '/' + filename; + } + + async _getComputeDomain () { + const url = 'https://apps.sciserver.org/racm/jobm/rest/computedomains?batch=true'; + const response = await this.fetch(url); + const domains = await response.json(); + + const domain = domains.find(domain => domain.name === this.computeDomain); + assert(domain, `Compute domain not found: ${this.computeDomain}`); + return domain; + } + + static getStatus (code) { + const index = Math.log2(code); + return SciServerClient.STATUSES[index]; + } + + static isWritable (volume) { + return volume.allowedActions.includes('write'); + } + + } SciServerClient.STATUSES = [ ComputeClient.prototype.QUEUED, @@ -273,9 +278,5 @@ define([ ComputeClient.prototype.CANCELED, ]; - SciServerClient.isWritable = function(volume) { - return volume.allowedActions.includes('write'); - }; - return SciServerClient; });