Skip to content

Commit

Permalink
Update static input support for credentials. Fixes #1862 (#1871)
Browse files Browse the repository at this point in the history
* Update static input support for credentials. Fixes #1862

* remove unused code

* Fix linting issues
  • Loading branch information
brollb authored Aug 19, 2020
1 parent 64e5266 commit ab83c7c
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 192 deletions.
280 changes: 139 additions & 141 deletions src/plugins/Export/Export.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ define([
CONSTANTS,
BlobConfig,
_,
Q
) {
'use strict';

Expand Down Expand Up @@ -71,7 +70,7 @@ define([
*
* @param {function(string, plugin.PluginResult)} callback - the result callback
*/
Export.prototype.main = function (callback) {
Export.prototype.main = async function (callback) {
this.resetVariableNames();
this.dataInputs = {};
this.dataOutputs = {};
Expand All @@ -84,17 +83,17 @@ define([

const files = new GeneratedFiles(this.blobClient);
const name = this.core.getAttribute(this.activeNode, 'name');
const staticInputs = this.getCurrentConfig().staticInputs;
return this.createPipelineFiles(this.activeNode, files)
.then(() => this.addStaticInputs(staticInputs, files))
.then(() => this.createDefaultMainFile(this.activeNode, staticInputs, files))
.then(() => files.save(name))
.then(hash => {
this.result.addArtifact(hash);
this.result.setSuccess(true);
callback(null, this.result);
})
.catch(err => callback(err, this.result));
const staticInputDict = this.getCurrentConfig().staticInputs;
await this.createPipelineFiles(this.activeNode, files);

const staticInputData = Object.values(staticInputDict);
await Promise.all(staticInputData.map(input => this.addStaticInput(input, files)));
await this.createDefaultMainFile(this.activeNode, staticInputDict, files);

const hash = await files.save(name);
this.result.addArtifact(hash);
this.result.setSuccess(true);
callback(null, this.result);
};

Export.prototype.resetVariableNames = function () {
Expand All @@ -115,7 +114,13 @@ define([
return name;
};

Export.prototype.getVariableNameFor = function (nodeId) {
Export.prototype.getVariableNameFor = async function (nodeId) {
if (!this.variableNameFor[nodeId]) {
const node = await this.core.loadByPath(this.rootNode, nodeId);
const basename = this.core.getAttribute(node, 'name')
.replace(/[^a-zA-Z0-9]/g, '_');
this.assignVariableTo(basename, nodeId);
}
return this.variableNameFor[nodeId];
};

Expand All @@ -128,61 +133,59 @@ define([
return varName;
};

Export.prototype.addStaticInputs = function (ids, files) {
Export.prototype.addStaticInput = async function (info, files) {
// Get the static inputs and add them in artifacts/
return Q.all(ids.map(id => this.core.loadByPath(this.rootNode, id)))
.then(nodes => {
nodes.forEach((node, i) => {
const name = this.getVariableNameFor(ids[i]);
const dataInfo = this.getAttribute(node, 'data');
files.addUserAsset(`artifacts/${name}`, dataInfo);
});
return files;
});
const node = await this.core.loadByPath(this.rootNode, info.id);
const name = await this.getVariableNameFor(info.id);
const dataInfo = this.core.getAttribute(node, 'data');
files.addUserAsset(`artifacts/${name}`, dataInfo, info.credentials);
return files;
};

Export.prototype.createDefaultMainFile = function (node, staticInputs, files) {
Export.prototype.createDefaultMainFile = function (node, staticInputDict, files) {
// Get the variable name for the pipeline
const name = PluginBase.toUpperCamelCase(this.core.getAttribute(node, 'name'));
const instanceName = this.getVariableName(name.toLowerCase());
let initCode = null;
return this.getAllInitialCode()
.then(code => initCode = code)
.then(() => this.core.loadChildren(node))
.then(nodes => {
.then(async nodes => {
// Get code for each input
const inputs = this.getPipelineInputs(nodes);
const inputNames = inputs.map(input => this.getVariableNameFor(input[1]));
const inputNames = await Promise.all(inputs.map(input => this.getVariableNameFor(input[1])));
let argIndex = 1;
const parseInputCode = inputs.map((input, i) => {
const parseInputCode = (await Promise.all(inputs.map(async (input, i) => {
const [, , node] = input;
const inputName = inputNames[i];
const pathNameVar = this.getVariableName(`${inputName}_path`);
const type = this.getAttribute(node, 'type');
const type = this.core.getAttribute(node, 'type');
const id = this.core.getPath(node);
const isStatic = staticInputs.includes(id);
const artifactInfo = staticInputDict[id];

console.log(`checking if ${id} is static`, staticInputDict);
const lines = [
`${inputName} = deepforge.serialization.load('${type}', open(${pathNameVar}, 'rb'))`
];

if (isStatic) {
lines.unshift(`${pathNameVar} = 'artifacts/${inputName}'`);
if (artifactInfo) {
const artifactName = await this.getVariableNameFor(artifactInfo.id);
lines.unshift(`${pathNameVar} = 'artifacts/${artifactName}'`);
} else {
lines.unshift(`${pathNameVar} = sys.argv[${argIndex}]`);
argIndex++;
}
return lines.join('\n');
}).join('\n');
}))).join('\n');

// Create code for saving outputs to outputs/
const outputs = this.getPipelineOutputs(nodes);
const outputNames = outputs.map(output => this.getVariableNameFor(output[1]));
const outputNames = await Promise.all(outputs.map(output => this.getVariableNameFor(output[1])));

const saveNames = outputs.map(output => {
const [, , node] = output;
const outputOp = this.core.getParent(this.core.getParent(node));
return this.getAttribute(outputOp, 'saveName');
return this.core.getAttribute(outputOp, 'saveName');
});
const printResults = outputNames
.map((name, i) => `print(' ${saveNames[i]}: ' + str(${name}))`);
Expand Down Expand Up @@ -233,120 +236,115 @@ define([
});
};

Export.prototype.createPipelineFiles = function (node, files) {
Export.prototype.createPipelineFiles = async function (node, files) {
const name = PluginBase.toUpperCamelCase(this.core.getAttribute(node, 'name'));
// Generate the file for the pipeline in pipelines/

let allOperations,
operations,
connections;

return this.core.loadChildren(node)
.then(nodes => { // Assign variable names to all data
const promises = nodes
.filter(node => this.isMetaTypeOf(node, this.META.Operation))
.map(operation => this.cacheDataNodes(operation));

return Q.all(promises).then(() => nodes);
})
.then(nodes => {

// Get the important node types and get all the code for the operations
allOperations = this.getSortedOperations(nodes);
operations = allOperations
.filter(node => !this.isMetaTypeOf(node, this.META.Input))
.filter(node => !this.isMetaTypeOf(node, this.META.Output));

// For each operation, instantiate it with the respective arguments
connections = nodes
.filter(node => !this.isMetaTypeOf(node, this.META.Operation));

connections.forEach(conn => {
const srcId = this.core.getPointerPath(conn, 'src');
const dstId = this.core.getPointerPath(conn, 'dst');
// Get the src data name?
// TODO
this.assignVariableTo('result', srcId, dstId);
});
const nodes = await this.core.loadChildren(node);
const promises = nodes
.filter(node => this.isMetaTypeOf(node, this.META.Operation))
.map(operation => this.cacheDataNodes(operation));

await Promise.all(promises);
// Get the important node types and get all the code for the operations
allOperations = this.getSortedOperations(nodes);
operations = allOperations
.filter(node => !this.isMetaTypeOf(node, this.META.Input))
.filter(node => !this.isMetaTypeOf(node, this.META.Output));

// For each operation, instantiate it with the respective arguments
connections = nodes
.filter(node => !this.isMetaTypeOf(node, this.META.Operation));

connections.forEach(conn => {
const srcId = this.core.getPointerPath(conn, 'src');
const dstId = this.core.getPointerPath(conn, 'dst');
// Get the src data name?
// TODO
this.assignVariableTo('result', srcId, dstId);
});

return Q.all(operations.map(operation => this.createOperation(operation)));
})
.then(operationOutputs => {
let code = [];

operationOutputs.forEach(output => {
// Create the operation
const [lines, opName, operation] = output;
code = lines.concat(code);

// execute it

// Get the inputs of the operation
let inputs = this.getCachedInputs(operation)
.map(tuple => {
const [, id] = tuple;
const srcId = this.getSrcDataId(connections, id);
return this.getVariableNameFor(srcId);
})
.join(',');

// Get the outputs of the operation (assign variable names)
let outputs = this.getCachedOutputs(operation)
.map(tuple => {
const [, id] = tuple;
const variable = this.getVariableNameFor(id);
return variable;
})
.filter(name => !!name)
.join(',');

if (outputs) {
code.unshift(`${outputs} = ${opName}.execute(${inputs})`);
} else {
code.unshift(`${opName}.execute(${inputs})`);
}
});
const createOps = operations.map(operation => this.createOperation(operation));
const operationOutputs = await Promise.all(createOps);
let code = [];

for (let i = 0; i < operationOutputs.length; i++) {
const output = operationOutputs[i];
const [lines, opName, operation] = output;
code = lines.concat(code);

// execute it

// Get the inputs of the operation
let inputs = (await Promise.all(this.getCachedInputs(operation)
.map(tuple => {
const [, id] = tuple;
const srcId = this.getSrcDataId(connections, id);
return this.getVariableNameFor(srcId);
})))
.join(',');

// Get the outputs of the operation (assign variable names)
const outputs = await this.getOutputs(operation);
const outputNames = (await Promise.all(
outputs.map(async tuple => {
const [, id] = tuple;
const variable = await this.getVariableNameFor(id);
return variable;
})))
.filter(name => !!name)
.join(',');

if (outputNames) {
code.unshift(`${outputNames} = ${opName}.execute(${inputs})`);
} else {
code.unshift(`${opName}.execute(${inputs})`);
}
}

// Import each operation
let operationTypes = operations.map(node => {
const base = this.core.getBase(node);
return this.core.getAttribute(base, 'name');
});
operationTypes = _.uniq(operationTypes);
operationTypes.forEach(type => code.unshift(`from operations import ${type}\n`));


// Create the pipeline file
const inputs = this.getPipelineInputs(allOperations)
.map(tuple => this.getVariableNameFor(tuple[1]))
.join(', ');
const outputs = this.getPipelineOutputs(allOperations)
.map(tuple => this.getVariableNameFor(tuple[1]))
.filter(name => !!name)
.join(', ');

// Move imports to the top
const importCode = code.filter(line => line.includes('import'));
code = code.filter(line => !line.includes('import'));

// Move all operation construction to the front
const opInvocations = code.filter(line => line.includes('execute'));
code = code.filter(line => !line.includes('execute'));
code = code.concat(opInvocations);

const filename = PluginBase.toSnakeCase(name);
const pipelinePy = [
importCode.join('\n'),
'',
`class ${name}():`,
indent(`def execute(self${inputs && ', '}${inputs}):`),
indent(indent(code.join('\n'))),
indent(indent(`return ${outputs}`))
].join('\n');
files.addFile(`pipelines/${filename}.py`, pipelinePy);
files.appendToFile('pipelines/__init__.py', `from pipelines.${filename} import ${name}\n`);
return Q.all(operations.map(node => this.createOperationFiles(node, files)));
});
// Import each operation
let operationTypes = operations.map(node => {
const base = this.core.getBase(node);
return this.core.getAttribute(base, 'name');
});
operationTypes = _.uniq(operationTypes);
operationTypes.forEach(type => code.unshift(`from operations import ${type}\n`));


// Create the pipeline file
const inputs = (await Promise.all(this.getPipelineInputs(allOperations)
.map(tuple => this.getVariableNameFor(tuple[1]))))
.join(', ');
const outputs = (await Promise.all(this.getPipelineOutputs(allOperations)
.map(tuple => this.getVariableNameFor(tuple[1]))))
.filter(name => !!name)
.join(', ');

// Move imports to the top
const importCode = code.filter(line => line.includes('import'));
code = code.filter(line => !line.includes('import'));

// Move all operation construction to the front
const opInvocations = code.filter(line => line.includes('execute'));
code = code.filter(line => !line.includes('execute'));
code = code.concat(opInvocations);

const filename = PluginBase.toSnakeCase(name);
const pipelinePy = [
importCode.join('\n'),
'',
`class ${name}():`,
indent(`def execute(self${inputs && ', '}${inputs}):`),
indent(indent(code.join('\n'))),
indent(indent(`return ${outputs}`))
].join('\n');
files.addFile(`pipelines/${filename}.py`, pipelinePy);
files.appendToFile('pipelines/__init__.py', `from pipelines.${filename} import ${name}\n`);
return Promise.all(operations.map(node => this.createOperationFiles(node, files)));
};

Export.prototype.getPipelineInputs = function (nodes) {
Expand Down Expand Up @@ -495,7 +493,7 @@ define([

Export.prototype.getCurrentConfig = function () {
var config = PluginBase.prototype.getCurrentConfig.call(this);
config.staticInputs = config.staticInputs || [];
config.staticInputs = config.staticInputs || {};
return config;
};

Expand Down
Loading

0 comments on commit ab83c7c

Please sign in to comment.