Skip to content

Commit

Permalink
Use correct storage client for inputs. Fixes #1662 (#1667)
Browse files Browse the repository at this point in the history
* Use correct storage client for inputs. Fixes #1662

* Use correct storage for non-Input node data
  • Loading branch information
brollb authored Apr 22, 2020
1 parent bb5f9e0 commit 3019678
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
22 changes: 16 additions & 6 deletions src/common/plugin/LocalExecutor.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,23 @@ define([
const jobLogger = new JobLogger(this.core, this.core.getParent(node));
jobLogger.log('About to save output artifacts.');
const saveDir = `${this.projectId}/artifacts/`;
const storage = await this.getStorageClient();
jobLogger.append(`Saving output data to ${storage.name}...`);
const dstStorage = await this.getStorageClient();
jobLogger.append(`Saving output data to ${dstStorage.name}...`);

const createParams = {base: this.META.Data, parent: artifactsDir};
for (let i = dataNodes.length; i--;) {
const artifact = this.core.createNode(createParams);
const name = this.core.getOwnAttribute(node, 'saveName') ||
this.core.getAttribute(dataNodes[i], 'name');
const createdAt = Date.now();
const originalData = JSON.parse(this.core.getAttribute(dataNodes[i], 'data'));
const userAsset = await storage.copy(originalData, saveDir + name);

const name = this.core.getOwnAttribute(node, 'saveName') ||
this.core.getAttribute(dataNodes[i], 'name');

const srcStorage = this.isPipelineInput(dataNodes[i]) ?
await this.getStorageClientForInputData(originalData)
: dstStorage;
const content = await srcStorage.getFile(originalData);
const userAsset = await dstStorage.putFile(saveDir + name, content);

this.core.setAttribute(artifact, 'data', JSON.stringify(userAsset));
this.core.setAttribute(artifact, 'name', name);
Expand All @@ -96,7 +102,11 @@ define([
}

this.logger.info(`Saved ${dataNodes.length} artifacts in ${this.projectId}.`);
jobLogger.append(`Saved output data to ${storage.name}`);
jobLogger.append(`Saved output data to ${dstStorage.name}`);
};

LocalExecutor.prototype.isPipelineInput = function(node) {
return this.isMetaTypeOf(node, this.META.Input);
};

// Helper methods
Expand Down
18 changes: 18 additions & 0 deletions src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,24 @@ define([
return await backend.getClient(this.logger, storage.config);
};

ExecuteJob.prototype.getInputStorageConfigs = async function () {
const inputs = Object.entries(this.getCurrentConfig().inputs || {});
const [nodeIds=[], configs=[]] = _.unzip(inputs);

const nodes = await Promise.all(nodeIds.map(id => this.core.loadByPath(this.rootNode, id)));
const dataInfos = nodes.map(node => this.core.getAttribute(node, 'data'));

const config = _.object(_.zip(dataInfos, configs));
return config;
};

ExecuteJob.prototype.getStorageClientForInputData = async function (dataInfo) {
const configDict = await this.getInputStorageConfigs();
const config = configDict[JSON.stringify(dataInfo)];
const client = await Storage.getClient(dataInfo.backend, null, config);
return client;
};

ExecuteJob.prototype.getJobId = function (node) {
return this.getJobInfo(node).hash;
};
Expand Down

0 comments on commit 3019678

Please sign in to comment.