Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plugin for reifying artifact provenance as pipeline. Closes #1734 #1946

Merged
merged 18 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 67 additions & 3 deletions src/common/plugin/ExecutionHelpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ define([
parent: dst
});

const names = this.core.getValidAttributeNames(node);
const names = this.core.getAttributeNames(node);
const values = names.map(name => this.core.getAttribute(node, name));
names.forEach((name, i) =>
this.core.setAttribute(snapshot, name, values[i]));
Expand Down Expand Up @@ -52,14 +52,33 @@ define([
.sort(metaTypeComparator);
const [dstInput, dstOutput] = (await this.core.loadChildren(snapshot))
.sort(metaTypeComparator);
const [srcInputs, srcOutputs] = await Promise.all(srcCntrs.map(ctr => this.core.loadChildren(ctr)));
const copies = srcInputs.map(n => this.core.copyNode(n, dstInput));

const [srcInputs, srcOutputs] = (await Promise.all(srcCntrs.map(ctr => this.core.loadChildren(ctr))));

const copies = srcInputs.map(n => {
const copy = this.core.copyNode(n, dstInput);
const inheritancePath = this.getInheritedAncestors(n);
const dataMetaNode = inheritancePath.reverse()
.find(node => this.core.getAttribute(node, 'name') === 'Data');
this.core.setPointer(copy, 'base', dataMetaNode);
this.core.setAttribute(copy, 'name', this.core.getAttribute(n, 'name'));
return copy;
});
copies.push(...srcOutputs.map(n => this.shallowCopy(n, dstOutput)));
const oldNewPairs = _.zip(srcInputs.concat(srcOutputs), copies);
oldNewPairs.push([node, snapshot]);
return {snapshot, pairs: oldNewPairs};
}

getInheritedAncestors (node) {
const path = [];
while (node) {
path.push(node);
node = this.core.getBase(node);
}
return path;
}

shallowCopy (original, dst) {
const attrNames = this.core.getAttributeNames(original);
const copy = this.core.createNode({
Expand All @@ -73,6 +92,51 @@ define([

return copy;
}

async setDataContents(node, dataNode) {
const dataType = this.core.getAttribute(dataNode, 'type');
this.core.setAttribute(node, 'type', dataType);

const hash = this.core.getAttribute(dataNode, 'data');
this.core.setAttribute(node, 'data', hash);

const provOutput = this.core.getAttribute(dataNode, 'provOutput');
if (provOutput) {
this.core.setAttribute(node, 'provOutput', provOutput);
}

await this.clearProvenance(node);

const provDataId = this.core.getPointerPath(dataNode, 'provenance');
if (provDataId) {
const implOp = await this.core.loadByPath(this.rootNode, provDataId);
const provCopy = this.core.copyNode(implOp, node);
this.core.setPointer(node, 'provenance', provCopy);
}
}

async clearProvenance(dataNode) {
const provDataId = this.core.getPointerPath(dataNode, 'provenance');
if (provDataId) {
const provData = await this.core.loadByPath(this.rootNode, provDataId);
const {node} = this.getImplicitOperation(provData);
this.core.deleteNode(node);
}
}

getImplicitOperation(dataNode) {
const metanodes = Object.values(this.core.getAllMetaNodes(dataNode));
const implicitOp = metanodes
.find(node => this.core.getAttribute(node, 'name') === 'ImplicitOperation');
let node = dataNode;
const path = [];
while (node && !this.core.isTypeOf(node, implicitOp)) {
path.push(this.core.getAttribute(node, 'name'));
node = this.core.getParent(node);
}

return {node, path};
}
}

return ExecutionHelpers;
Expand Down
16 changes: 11 additions & 5 deletions src/common/plugin/LocalExecutor.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ define([
this.core.getAttribute(node, 'data')
]);
const inputs = await this.getInputs(node);
const ids = inputs.map(i => this.core.getPath(i[2]));
const incomingData = Object.values(this.nodes)
const inputIds = inputs.map(i => this.core.getPath(i[2]));
const execution = this.core.getParent(
this.core.getParent(node)
);
const incomingDataIds = (await this.core.loadChildren(execution))
.filter(node => this.isMetaTypeOf(node, this.META.Transporter))
.filter(node => ids.includes(this.core.getPointerPath(node, 'dst')))
.map(node => this.core.getPointerPath(node, 'src'))
.map(id => this.nodes[id]);
.filter(node => inputIds.includes(this.core.getPointerPath(node, 'dst')))
.map(node => this.core.getPointerPath(node, 'src'));

const incomingData = await Promise.all(
incomingDataIds.map(id => this.core.loadByPath(this.rootNode, id))
);

// Remove nodes that already exist
const dataNodes = incomingData.filter(dataNode => {
Expand Down
21 changes: 20 additions & 1 deletion src/common/viz/Buttons.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ define([
return n && n.getBaseId();
};

var GoToOperationDefinition = function(params) {
EasyDAGButtons.ButtonBase.call(this, params);
};

GoToOperationDefinition.prototype = Object.create(GoToBase.prototype);
GoToOperationDefinition.prototype._onClick = function(item) {
var node = client.getNode(item.id),
baseId = node.getBaseId();

const base = client.getNode(baseId);
const isSnapshot = base.getAttribute('name') === 'Operation';
if (isSnapshot) {
WebGMEGlobal.State.registerActiveObject(item.id);
} else {
GoToBase.prototype._onClick.call(this, item);
}
};

var CloneAndEdit = function(params) {
GoToBase.call(this, params);
};
Expand Down Expand Up @@ -131,7 +149,8 @@ define([
DeleteOne: EasyDAGButtons.DeleteOne,
GoToBase: GoToBase,
CloneAndEdit: CloneAndEdit,
Insert: Insert
Insert: Insert,
GoToOperationDefinition,
};
});

8 changes: 7 additions & 1 deletion src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -730,9 +730,15 @@ define([
base: this.META.ExecutedJob,
parent: dataNode
});
const {snapshot} = await helpers.snapshotOperation(opNode, executedJob, this.META.Operation);
const {snapshot} = await helpers.snapshotOperation(
opNode,
executedJob,
this.META.Operation
);
this.core.setPointer(executedJob, 'operation', snapshot);
this.core.setPointer(dataNode, 'provenance', executedJob);
const name = this.core.getAttribute(dataNode, 'name');
this.core.setAttribute(dataNode, 'provOutput', name);
};

//////////////////////////// Special Operations ////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/ExecuteJob/metadata/Metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ define([
async loadChildren() {
const provPath = this.core.getPointerPath(this.node, 'provenance');
const children = (await this.core.loadChildren(this.node))
.filter(node => this.core.getPath(node) !== provPath);
.filter(node => !provPath.includes(this.core.getPath(node)));

return children;
}
Expand Down
26 changes: 7 additions & 19 deletions src/plugins/ExecutePipeline/ExecutePipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
define([
'plugin/CreateExecution/CreateExecution/CreateExecution',
'plugin/ExecuteJob/ExecuteJob/ExecuteJob',
'deepforge/plugin/ExecutionHelpers',
'common/storage/constants',
'common/core/constants',
'deepforge/Constants',
Expand All @@ -13,6 +14,7 @@ define([
], function (
CreateExecution,
ExecuteJob,
ExecutionHelpers,
STORAGE_CONSTANTS,
GME_CONSTANTS,
CONSTANTS,
Expand Down Expand Up @@ -554,26 +556,12 @@ define([
const portPairs = resultPorts
.map((id, i) => [this.nodes[id], this.nodes[nextPortIds[i]]]);

const forwardData = portPairs.map(async pair => { // [ resultPort, nextPort ]
const [result, next] = pair;

let dataType = this.core.getAttribute(result, 'type');
this.core.setAttribute(next, 'type', dataType);

let hash = this.core.getAttribute(result, 'data');
this.core.setAttribute(next, 'data', hash);

const provInfoId = this.core.getPointerPath(result, 'provenance', true);
if (provInfoId) {
const provNode = await this.core.loadByPath(result, provInfoId);
const provCopy = this.core.copyNode(provNode, next);
this.core.setPointer(next, 'provenance', provCopy);
}

this.logger.info(`forwarding data (${dataType}) from ${this.core.getPath(result)} ` +
`to ${this.core.getPath(next)}`);
const helpers = new ExecutionHelpers(this.core, this.rootNode);
const forwardData = portPairs.map(pair => {
const [resultPort, nextPort] = pair;
return helpers.setDataContents(nextPort, resultPort);
});
await forwardData;
await Promise.all(forwardData);

// For all the nextPortIds, decrement the corresponding operation's incoming counts
const counts = nextPortIds.map(id => this.getSiblingIdContaining(id))
Expand Down
120 changes: 120 additions & 0 deletions src/plugins/ReifyArtifactProv/ReifyArtifactProv.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*globals define*/
/*eslint-env node, browser*/

define([
'plugin/PluginBase',
'deepforge/plugin/ExecutionHelpers',
'text!./metadata.json',
], function (
PluginBase,
ExecutionHelpers,
pluginMetadata,
) {
'use strict';

pluginMetadata = JSON.parse(pluginMetadata);

class ReifyArtifactProv extends PluginBase {
constructor() {
super();
this.pluginMetadata = pluginMetadata;
}

async main(callback) {
const {artifactId} = this.getCurrentConfig();
const artifact = await this.core.loadByPath(this.rootNode, artifactId);
if (!artifact) {
throw new Error(`Could not load artifact: ${artifactId}`);
}

const name = this.core.getAttribute(artifact, 'name');
const pipeline = this.core.createNode({
base: this.META.Pipeline,
parent: this.activeNode,
});
this.core.setAttribute(pipeline, 'name', `Provenance of ${name}`);

const outputOp = await this.createOutputOperation(pipeline, artifact);
const [input] = await this.getOperationInputs(outputOp);
await this.addProvenanceOperation(pipeline, input);

await this.save(`Created provenance pipeline of ${name}`);
this.result.setSuccess(true);
this.createMessage(pipeline, 'New Provenance Pipeline');
callback(null, this.result);
}

async addProvenanceOperation(pipeline, input) {
const operation = await this.getProvAsOperation(input);
const newOperation = this.core.copyNode(operation, pipeline);
const outputData = await this.getOutputData(newOperation, input);
if (!outputData) {
throw new Error(`Could not find output in ${this.core.getPath(operation)} referencing data ${this.core.getAttribute(input, 'data')}`);
}
this.connect(pipeline, outputData, input);

const inputs = await this.getOperationInputs(newOperation);
await Promise.all(
inputs.map(input => this.addProvenanceOperation(pipeline, input))
);
// TODO: should I create a new meta type for each?
}

async createOutputOperation(pipeline, data) {
const output = this.core.createNode({
parent: pipeline,
base: this.META.Output,
});
const [input] = await this.getOperationInputs(output);
const helpers = new ExecutionHelpers(this.core, this.rootNode);
await helpers.setDataContents(input, data);
const name = this.core.getAttribute(data, 'name');
this.core.setAttribute(output, 'saveName', name);
return output;
}

async getOperationInputs(operation) {
const inputs = (await this.core.loadChildren(operation))
.find(node => this.core.isTypeOf(node, this.META.Inputs));
return this.core.loadChildren(inputs);
}

async getOperationOutputs(operation) {
const outputs = (await this.core.loadChildren(operation))
.find(node => this.core.isTypeOf(node, this.META.Outputs));
return this.core.loadChildren(outputs);
}

async getProvAsOperation(artifact) {
const implOpId = this.core.getPointerPath(artifact, 'provenance');
if (!implOpId) return;
const implicitOp = await this.core.loadByPath(this.rootNode, implOpId);
const operationId = this.core.getPointerPath(implicitOp, 'operation');
if (!operationId) {
const name = this.core.getAttribute(implicitOp, 'name');
throw new Error(`No operation found for ${implOpId} (${name})`);
}
return await this.core.loadByPath(this.rootNode, operationId);
}

async getOutputData(operation, artifact) {
const outputs = await this.getOperationOutputs(operation);
const provOutput = this.core.getAttribute(artifact, 'provOutput');
return outputs.find(
data => this.core.getAttribute(data, 'name') === provOutput
);
}

async connect(parent, src, dst) {
const base = this.META.Transporter;
const connection = this.core.createNode({parent, base});
this.core.setPointer(connection, 'src', src);
this.core.setPointer(connection, 'dst', dst);
return connection;
}
}

ReifyArtifactProv.metadata = pluginMetadata;

return ReifyArtifactProv;
});
24 changes: 24 additions & 0 deletions src/plugins/ReifyArtifactProv/metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"id": "ReifyArtifactProv",
"name": "Reify Artifact Provenance",
"version": "0.1.0",
"description": "",
"icon": {
"class": "glyphicon glyphicon-cog",
"src": ""
},
"disableServerSideExecution": false,
"disableBrowserSideExecution": false,
"dependencies": [],
"writeAccessRequired": false,
"configStructure": [
{
"name": "artifactId",
"displayName": "Artifact",
"description": "Create a pipeline of the provenance of the given artifact",
"value": "",
"valueType": "string",
"readOnly": false
}
]
}
Binary file modified src/seeds/devProject/devProject.webgmex
Binary file not shown.
Binary file modified src/seeds/pipeline/pipeline.webgmex
Binary file not shown.
2 changes: 2 additions & 0 deletions src/seeds/pipeline/releases.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
{"version":"0.21.1","changelog":"Update Inheritance of Subgraph, Line, Images, ScatterPoints etc.. nodes"}
{"version":"0.22.0","changelog":"Incorporate PlotlyJSON into Graph meta node"}
{"version":"0.23.0","changelog":"Add TrainKeras implicit operation"}

{"version":"0.24.0","changelog":"Add provOutput to WithProvenance mixin (required for pipeline reconstruction)"}
Loading