From 30196783884f4d788160001fd6550763c579d69a Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Wed, 22 Apr 2020 11:57:32 -0500 Subject: [PATCH] Use correct storage client for inputs. Fixes #1662 (#1667) * Use correct storage client for inputs. Fixes #1662 * Use correct storage for non-Input node data --- src/common/plugin/LocalExecutor.js | 22 ++++++++++++++++------ src/plugins/ExecuteJob/ExecuteJob.js | 18 ++++++++++++++++++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/common/plugin/LocalExecutor.js b/src/common/plugin/LocalExecutor.js index 10cad897e..f65a920c4 100644 --- a/src/common/plugin/LocalExecutor.js +++ b/src/common/plugin/LocalExecutor.js @@ -77,17 +77,23 @@ define([ const jobLogger = new JobLogger(this.core, this.core.getParent(node)); jobLogger.log('About to save output artifacts.'); const saveDir = `${this.projectId}/artifacts/`; - const storage = await this.getStorageClient(); - jobLogger.append(`Saving output data to ${storage.name}...`); + const dstStorage = await this.getStorageClient(); + jobLogger.append(`Saving output data to ${dstStorage.name}...`); const createParams = {base: this.META.Data, parent: artifactsDir}; for (let i = dataNodes.length; i--;) { const artifact = this.core.createNode(createParams); - const name = this.core.getOwnAttribute(node, 'saveName') || - this.core.getAttribute(dataNodes[i], 'name'); const createdAt = Date.now(); const originalData = JSON.parse(this.core.getAttribute(dataNodes[i], 'data')); - const userAsset = await storage.copy(originalData, saveDir + name); + + const name = this.core.getOwnAttribute(node, 'saveName') || + this.core.getAttribute(dataNodes[i], 'name'); + + const srcStorage = this.isPipelineInput(dataNodes[i]) ? + await this.getStorageClientForInputData(originalData) + : dstStorage; + const content = await srcStorage.getFile(originalData); + const userAsset = await dstStorage.putFile(saveDir + name, content); this.core.setAttribute(artifact, 'data', JSON.stringify(userAsset)); this.core.setAttribute(artifact, 'name', name); @@ -96,7 +102,11 @@ define([ } this.logger.info(`Saved ${dataNodes.length} artifacts in ${this.projectId}.`); - jobLogger.append(`Saved output data to ${storage.name}`); + jobLogger.append(`Saved output data to ${dstStorage.name}`); + }; + + LocalExecutor.prototype.isPipelineInput = function(node) { + return this.isMetaTypeOf(node, this.META.Input); }; // Helper methods diff --git a/src/plugins/ExecuteJob/ExecuteJob.js b/src/plugins/ExecuteJob/ExecuteJob.js index 52cf2d4e0..78a8e54b8 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.js +++ b/src/plugins/ExecuteJob/ExecuteJob.js @@ -209,6 +209,24 @@ define([ return await backend.getClient(this.logger, storage.config); }; + ExecuteJob.prototype.getInputStorageConfigs = async function () { + const inputs = Object.entries(this.getCurrentConfig().inputs || {}); + const [nodeIds=[], configs=[]] = _.unzip(inputs); + + const nodes = await Promise.all(nodeIds.map(id => this.core.loadByPath(this.rootNode, id))); + const dataInfos = nodes.map(node => this.core.getAttribute(node, 'data')); + + const config = _.object(_.zip(dataInfos, configs)); + return config; + }; + + ExecuteJob.prototype.getStorageClientForInputData = async function (dataInfo) { + const configDict = await this.getInputStorageConfigs(); + const config = configDict[JSON.stringify(dataInfo)]; + const client = await Storage.getClient(dataInfo.backend, null, config); + return client; + }; + ExecuteJob.prototype.getJobId = function (node) { return this.getJobInfo(node).hash; };