Skip to content

Commit

Permalink
Use stream functions in pipelines. Closes #1777 (#1814)
Browse files Browse the repository at this point in the history
* Use stream functions in pipelines. (Closes #1777)

This commit adds stream support function added in #1702 for pipeline
execution as well as Interactive Compute router. build job utils
now uses webgme-engine module in webgme/node_modules. This will
add the support for streams in blobclient.

* WIP- fix unused variables; remove logger in write file
  • Loading branch information
umesh-timalsina authored Jul 29, 2020
1 parent 7108107 commit ba1ec5b
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 16 deletions.
4 changes: 2 additions & 2 deletions src/common/plugin/GeneratedFiles.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ define([
const objectHashes = {};
for (let i = userAssets.length; i--;) {
const [filepath, dataInfo] = userAssets[i];
const contents = await Storage.getFile(dataInfo);
const contentsStream = await Storage.getFileStream(dataInfo);
const filename = filepath.split('/').pop();
const hash = await this.blobClient.putFile(filename, contents);
const hash = await this.blobClient.putFile(filename, contentsStream);
objectHashes[filepath] = hash;
}
await artifact.addObjectHashes(objectHashes);
Expand Down
6 changes: 3 additions & 3 deletions src/common/plugin/LocalExecutor.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ define([
const srcStorage = this.isPipelineInput(dataNodes[i]) ?
await this.getStorageClientForInputData(originalData)
: dstStorage;
const content = await srcStorage.getFile(originalData);
const userAsset = await dstStorage.putFile(saveDir + name, content);
const contentStream = await srcStorage.getFileStream(originalData);
const userAsset = await dstStorage.putFileStream(saveDir + name, contentStream);

this.core.setAttribute(artifact, 'data', JSON.stringify(userAsset));
this.core.setAttribute(artifact, 'name', name);
Expand Down Expand Up @@ -125,7 +125,7 @@ define([
LocalExecutor.OPERATIONS = Object.keys(LocalExecutor.prototype)
.filter(name => name.indexOf('_') !== 0)
.filter(name => name !== 'isLocalOperation' && name !== 'getLocalOperationType');

class JobLogger{
constructor(core, node) {
this.core = core;
Expand Down
14 changes: 11 additions & 3 deletions src/plugins/GenerateJob/templates/run-debug.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const fs = require('fs');
const path = require('path');
const {promisify} = require('util');
const mkdir = promisify(fs.mkdir);
const writeFile = promisify(fs.writeFile);
const pipeline = promisify(require('stream').pipeline);
const {spawn} = require('child_process');

const Config = require('./config.json');
Expand Down Expand Up @@ -47,9 +47,9 @@ requirejs([
async function fetchInputData(filename, dataInfo, config) {
const {backend} = dataInfo;
const client = await Storage.getClient(backend, null, config);
const buffer = await client.getFile(dataInfo);
const stream = await client.getFileStream(dataInfo);
filename = fromRelative(filename);
await writeFile(filename, buffer);
await writeFile(filename, stream);
}

function fromRelative(filename) {
Expand All @@ -59,4 +59,12 @@ requirejs([
async function tryMkdir(filename) {
await mkdir(filename).catch(nop);
}

async function writeFile(path, readStream) {
const dstStream = fs.createWriteStream(path, {
encoding: readStream.readableEncoding
});
await pipeline(readStream, dstStream);
}

});
17 changes: 12 additions & 5 deletions src/plugins/GenerateJob/templates/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const {spawn} = childProcess;
const fs = require('fs');
const {promisify} = require('util');
const mkdir = promisify(fs.mkdir);
const writeFile = promisify(fs.writeFile);
const pipeline = promisify(require('stream').pipeline);
const lstat = promisify(fs.lstat);
const exec = promisify(childProcess.exec);
const rm_rf = require('rimraf');
Expand Down Expand Up @@ -172,8 +172,8 @@ requirejs([
for (let i = outputNames.length; i--;) {
const filename = outputNames[i];
const storagePath = `${storageDir}/${filename}`;
const contents = fs.readFileSync(`outputs/${filename}`);
const dataInfo = await client.putFile(storagePath, contents);
const contentsStream = fs.createReadStream(`outputs/${filename}`);
const dataInfo = await client.putFileStream(storagePath, contentsStream);
const type = results[filename];
results[filename] = {type, dataInfo};
}
Expand Down Expand Up @@ -278,8 +278,8 @@ requirejs([
if (!exists) {
await createCacheDir(cachePath);
const client = await Storage.getClient(dataInfo.backend, null, config);
const buffer = await client.getFile(dataInfo);
await writeFile(cachePath, buffer);
const stream = await client.getFileStream(dataInfo);
await writeFile(cachePath, stream);
} else {
logger.info(`${inputName} already cached. Skipping retrieval from blob`);
}
Expand Down Expand Up @@ -418,5 +418,12 @@ requirejs([
return path.join(cacheDir, relPath);
}

async function writeFile(path, readStream) {
const dstStream = fs.createWriteStream(path, {
encoding: readStream.readableEncoding
});
await pipeline(readStream, dstStream);
}

function nop() {}
});
6 changes: 4 additions & 2 deletions src/routers/InteractiveCompute/job-files/start.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const {spawn} = require('child_process');
const WebSocket = require('ws');
const fs = require('fs').promises;
const { promisify } = require('util');
const pipeline = promisify(require('stream').pipeline);
const path = require('path');
const requirejs = require('requirejs');
let Message;
Expand Down Expand Up @@ -43,8 +45,8 @@ class InteractiveClient {
async function saveArtifact() {
const client = await Storage.getClient(dataInfo.backend, null, config);
const dataPath = path.join(...dirs.concat('data'));
const buffer = await client.getFile(dataInfo);
await fs.writeFile(dataPath, buffer);
const stream = await client.getFileStream(dataInfo);
await pipeline(stream, fs.createWriteStream(dataPath));
const filePath = path.join(...dirs.concat('__init__.py'));
await fs.writeFile(filePath, initFile(name, type));
}
Expand Down
14 changes: 13 additions & 1 deletion utils/build-job-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@ const babel = require('@babel/core');
const requirejs = require('requirejs');
const path = require('path');
const fs = require('fs');
const webgmeEngineSrc = path.join(path.dirname(require.resolve('webgme-engine')), 'src');
const webgmeEngineSrc = getWebGMEEngineSrcPath();
const JOB_FILES_DIR = `${__dirname}/../src/plugins/GenerateJob/templates/`;
const os = require('os');
const gmeConfig = require('../config');
const includeFile = path.join(__dirname, 'build-includes.js');
const _ = require('underscore');

function getWebGMEEngineSrcPath() {
let webGMEEngine;

try{
webGMEEngine = require.resolve('webgme/node_modules/webgme-engine');
} catch (e) {
webGMEEngine = require.resolve('webgme-engine');
}

return path.join(path.dirname(webGMEEngine), 'src');
}

function getFiles(dirname) {
return fs.readdirSync(dirname)
.map(name => {
Expand Down

0 comments on commit ba1ec5b

Please sign in to comment.