Skip to content

Commit

Permalink
Set max parallelism from execution or launch plan spec (flyteorg#205)
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
  • Loading branch information
Katrina Rogan authored Jun 14, 2021
1 parent 3f0b43b commit 96148bf
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.18.54
github.com/flyteorg/flyteidl v0.19.2
github.com/flyteorg/flyteplugins v0.5.49
github.com/flyteorg/flytepropeller v0.10.16
github.com/flyteorg/flytestdlib v0.3.22
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/flyteorg/flyteidl v0.18.48/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.18.50 h1:L1fMj6QEXoKin+cPQn9sfwJ1x14tlChdz1mG1WaaIW4=
github.com/flyteorg/flyteidl v0.18.50/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.18.54 h1:OWgA9LUqH7rTLd8DywFzfm+LkTtLzrPztP5JaO/4hpM=
github.com/flyteorg/flyteidl v0.18.54/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.2 h1:jXuRrLJEzSo33N9pw7bMEd6mRYSL7LCz/vnazz5XcOg=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.49 h1:jqmNrsTQ2+m+vYKqDVNO3CYy9q3XYTms3XzOr3roAT0=
github.com/flyteorg/flyteplugins v0.5.49/go.mod h1:567WIA0Rr6QjmXsqvGsU+Cyb57Ia6qzddxIw//RPwYk=
github.com/flyteorg/flytepropeller v0.10.16 h1:WSVh0X0F9xSw1BxGKbHqu0oha37YaBmO3bOS7GjR3Qo=
Expand Down
21 changes: 18 additions & 3 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,22 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
return parentNodeExecutionID, sourceExecutionID, nil
}

func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admin.ExecutionCreateRequest) (*admin.WorkflowExecutionConfig, error) {
// Produces execution-time attributes for workflow execution.
// Defaults to overridable execution values set in the execution create request, then looks at the launch plan values
// (if any) before defaulting to values set in the matchable resource db.
func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admin.ExecutionCreateRequest,
launchPlan *admin.LaunchPlan) (*admin.WorkflowExecutionConfig, error) {
if request.Spec.MaxParallelism > 0 {
return &admin.WorkflowExecutionConfig{
MaxParallelism: request.Spec.MaxParallelism,
}, nil
}
if launchPlan != nil && launchPlan.Spec.MaxParallelism > 0 {
return &admin.WorkflowExecutionConfig{
MaxParallelism: launchPlan.Spec.MaxParallelism,
}, nil
}

resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: request.Project,
Domain: request.Domain,
Expand Down Expand Up @@ -512,7 +527,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err)
return nil, nil, err
}
executionConfig, err := m.getExecutionConfig(ctx, &request)
executionConfig, err := m.getExecutionConfig(ctx, &request, nil)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -684,7 +699,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err)
return nil, nil, err
}
executionConfig, err := m.getExecutionConfig(ctx, &request)
executionConfig, err := m.getExecutionConfig(ctx, &request, launchPlan)
if err != nil {
return nil, nil, err
}
Expand Down
39 changes: 39 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2932,7 +2932,46 @@ func TestGetExecutionConfig(t *testing.T) {
execConfig, err := executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
}, nil)
assert.NoError(t, err)
assert.Equal(t, execConfig.MaxParallelism, int32(100))
}

func TestGetExecutionConfig_Spec(t *testing.T) {
resourceManager := managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
t.Errorf("When a user specifies max parallelism in a spec, the db should not be queried")
return nil, nil
}

executionManager := ExecutionManager{
resourceManager: &resourceManager,
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{
MaxParallelism: 100,
},
}, &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{
MaxParallelism: 50,
},
})
assert.NoError(t, err)
assert.Equal(t, execConfig.MaxParallelism, int32(100))

execConfig, err = executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
}, &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{
MaxParallelism: 50,
},
})
assert.NoError(t, err)
assert.Equal(t, execConfig.MaxParallelism, int32(50))
}

0 comments on commit 96148bf

Please sign in to comment.