Skip to content

Commit

Permalink
Small bugfix: always return execution data for no storage scheme (fly…
Browse files Browse the repository at this point in the history
…teorg#209)

Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
  • Loading branch information
Katrina Rogan committed Jun 22, 2021
1 parent cc40a39 commit 6a1d9bc
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 10 deletions.
4 changes: 2 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,15 +1173,15 @@ func (m *ExecutionManager) GetExecutionData(
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
remoteDataScheme := m.config.ApplicationConfiguration().GetRemoteDataConfig().Scheme
if remoteDataScheme == common.Local || inputsURLBlob.Bytes < maxDataSize {
if util.ShouldFetchData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), inputsURLBlob) {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, executionModel.InputsURI, &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", executionModel.InputsURI, err)
}
response.FullInputs = &fullInputs
}
if remoteDataScheme == common.Local || (signedOutputsURLBlob.Bytes < maxDataSize && execution.Closure.GetOutputs() != nil) {
if remoteDataScheme == common.Local || remoteDataScheme == common.None || (signedOutputsURLBlob.Bytes < maxDataSize && execution.Closure.GetOutputs() != nil) {
var fullOutputs core.LiteralMap
outputsURI := execution.Closure.GetOutputs().GetUri()
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(outputsURI), &fullOutputs)
Expand Down
7 changes: 3 additions & 4 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,17 +452,16 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
Inputs: &signedInputsURLBlob,
Outputs: &signedOutputsURLBlob,
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
remoteDataScheme := m.config.ApplicationConfiguration().GetRemoteDataConfig().Scheme
if remoteDataScheme == common.Local || signedInputsURLBlob.Bytes < maxDataSize {
if util.ShouldFetchData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), signedInputsURLBlob) {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.InputUri), &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", nodeExecution.InputUri, err)
}
response.FullInputs = &fullInputs
}
if remoteDataScheme == common.Local || (signedOutputsURLBlob.Bytes < maxDataSize && len(nodeExecution.Closure.GetOutputUri()) > 0) {
if util.ShouldFetchOutputData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), signedOutputsURLBlob,
nodeExecution.Closure.GetOutputUri()) {
var fullOutputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.Closure.GetOutputUri()), &fullOutputs)
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,17 +302,16 @@ func (m *TaskExecutionManager) GetTaskExecutionData(
Inputs: &signedInputsURLBlob,
Outputs: &signedOutputsURLBlob,
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
remoteDataScheme := m.config.ApplicationConfiguration().GetRemoteDataConfig().Scheme
if remoteDataScheme == common.Local || signedInputsURLBlob.Bytes < maxDataSize {
if util.ShouldFetchData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), signedInputsURLBlob) {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(taskExecution.InputUri), &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", taskExecution.InputUri, err)
}
response.FullInputs = &fullInputs
}
if remoteDataScheme == common.Local || (signedOutputsURLBlob.Bytes < maxDataSize && len(taskExecution.Closure.GetOutputUri()) > 0) {
if util.ShouldFetchOutputData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), signedOutputsURLBlob,
taskExecution.Closure.GetOutputUri()) {
var fullOutputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(taskExecution.Closure.GetOutputUri()), &fullOutputs)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions flyteadmin/pkg/manager/impl/util/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package util

import (
"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
)

func ShouldFetchData(config *interfaces.RemoteDataConfig, urlBlob admin.UrlBlob) bool {
return config.Scheme == common.Local || config.Scheme == common.None || urlBlob.Bytes < config.MaxSizeInBytes
}

func ShouldFetchOutputData(config *interfaces.RemoteDataConfig, urlBlob admin.UrlBlob, outputURI string) bool {
return ShouldFetchData(config, urlBlob) && len(outputURI) > 0
}
72 changes: 72 additions & 0 deletions flyteadmin/pkg/manager/impl/util/data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package util

import (
"testing"

"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/stretchr/testify/assert"
)

func TestShouldFetchData(t *testing.T) {
t.Run("local config", func(t *testing.T) {
assert.True(t, ShouldFetchData(&interfaces.RemoteDataConfig{
Scheme: common.Local,
MaxSizeInBytes: 100,
}, admin.UrlBlob{
Bytes: 200,
}))
})
t.Run("no config", func(t *testing.T) {
assert.True(t, ShouldFetchData(&interfaces.RemoteDataConfig{
Scheme: common.None,
MaxSizeInBytes: 100,
}, admin.UrlBlob{
Bytes: 200,
}))
})
t.Run("max size under limit", func(t *testing.T) {
assert.True(t, ShouldFetchData(&interfaces.RemoteDataConfig{
Scheme: common.AWS,
MaxSizeInBytes: 1000,
}, admin.UrlBlob{
Bytes: 200,
}))
})
t.Run("max size over limit", func(t *testing.T) {
assert.False(t, ShouldFetchData(&interfaces.RemoteDataConfig{
Scheme: common.AWS,
MaxSizeInBytes: 100,
}, admin.UrlBlob{
Bytes: 200,
}))
})
}

func TestShouldFetchOutputData(t *testing.T) {
t.Run("local config", func(t *testing.T) {
assert.True(t, ShouldFetchOutputData(&interfaces.RemoteDataConfig{
Scheme: common.Local,
MaxSizeInBytes: 100,
}, admin.UrlBlob{
Bytes: 200,
}, "s3://foo/bar.txt"))
})
t.Run("max size under limit", func(t *testing.T) {
assert.True(t, ShouldFetchOutputData(&interfaces.RemoteDataConfig{
Scheme: common.AWS,
MaxSizeInBytes: 1000,
}, admin.UrlBlob{
Bytes: 200,
}, "s3://foo/bar.txt"))
})
t.Run("output uri empty", func(t *testing.T) {
assert.False(t, ShouldFetchOutputData(&interfaces.RemoteDataConfig{
Scheme: common.AWS,
MaxSizeInBytes: 1000,
}, admin.UrlBlob{
Bytes: 200,
}, ""))
})
}

0 comments on commit 6a1d9bc

Please sign in to comment.