Skip to content

Commit

Permalink
GetExecution performance improvements (flyteorg#171)
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
  • Loading branch information
Katrina Rogan authored Mar 30, 2021
1 parent 4fc817c commit 7e13a8d
Show file tree
Hide file tree
Showing 19 changed files with 18 additions and 164 deletions.
18 changes: 2 additions & 16 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
)

const childContainerQueueKey = "child_queue"
const noSourceExecutionID = 0
const principalContextKeyFormat = "%v"

// Map of [project] -> map of [domain] -> stop watch
Expand Down Expand Up @@ -785,6 +784,7 @@ func (m *ExecutionManager) RelaunchExecution(
inputs = spec.Inputs
}
executionSpec.Metadata.Mode = admin.ExecutionMetadata_RELAUNCH
executionSpec.Metadata.ReferenceExecution = existingExecution.Id
var executionModel *models.Execution
ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{
Project: request.Id.Project,
Expand Down Expand Up @@ -1027,21 +1027,7 @@ func (m *ExecutionManager) GetExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
var execution *admin.Execution
var transformerErr error
if executionModel.SourceExecutionID != noSourceExecutionID {
// Fetch parent execution to reconstruct its WorkflowExecutionIdentifier
referenceExecutionModel, err := m.db.ExecutionRepo().GetByID(ctx, executionModel.SourceExecutionID)
if err != nil {
logger.Debugf(ctx, "Failed to get reference execution source execution id [%s] for descendant execution [%v]",
executionModel.SourceExecutionID)
return nil, err
}
referenceExecutionID := transformers.GetExecutionIdentifier(&referenceExecutionModel)
execution, transformerErr = transformers.FromExecutionModelWithReferenceExecution(*executionModel, &referenceExecutionID)
} else {
execution, transformerErr = transformers.FromExecutionModel(*executionModel)
}
execution, transformerErr := transformers.FromExecutionModel(*executionModel)
if transformerErr != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id,
transformerErr)
Expand Down
20 changes: 1 addition & 19 deletions flyteadmin/pkg/repositories/gormimpl/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.GetResourceInp
Domain: input.Domain,
Name: input.Name,
},
}).First(&execution)
}).Take(&execution)
timer.Stop()
if tx.Error != nil {
return models.Execution{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand All @@ -56,24 +56,6 @@ func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.GetResourceInp
return execution, nil
}

func (r *ExecutionRepo) GetByID(ctx context.Context, id uint) (models.Execution, error) {
var execution models.Execution
timer := r.metrics.GetDuration.Start()
tx := r.db.Where(&models.Execution{
BaseModel: models.BaseModel{
ID: id,
},
}).First(&execution)
timer.Stop()
if tx.Error != nil {
return models.Execution{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
}
if tx.RecordNotFound() {
return models.Execution{}, errors.GetMissingEntityByIDError("execution")
}
return execution, nil
}

func (r *ExecutionRepo) Update(ctx context.Context, event models.ExecutionEvent, execution models.Execution) error {
timer := r.metrics.UpdateDuration.Start()
defer timer.Stop()
Expand Down
36 changes: 1 addition & 35 deletions flyteadmin/pkg/repositories/gormimpl/execution_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestGetExecution(t *testing.T) {
// Only match on queries that append expected filters
GlobalMock.NewMock().WithQuery(`SELECT * FROM "executions" WHERE "executions"."deleted_at" IS NULL AND ` +
`(("executions"."execution_project" = project) AND ("executions"."execution_domain" = domain) AND ` +
`("executions"."execution_name" = 1)) ORDER BY "executions"."id" ASC LIMIT 1`).WithReply(executions)
`("executions"."execution_name" = 1)) LIMIT 1`).WithReply(executions)
output, err := executionRepo.Get(context.Background(), interfaces.GetResourceInput{
Project: "project",
Domain: "domain",
Expand All @@ -173,40 +173,6 @@ func TestGetExecution(t *testing.T) {
assert.EqualValues(t, expectedExecution, output)
}

func TestGetByIDExecution(t *testing.T) {
executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())
expectedExecution := models.Execution{
BaseModel: models.BaseModel{
ID: uint(20),
},
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "1",
},
LaunchPlanID: uint(2),
Phase: core.WorkflowExecution_SUCCEEDED.String(),
Closure: []byte{1, 2},
WorkflowID: uint(3),
Spec: []byte{3, 4},
StartedAt: &executionStartedAt,
ExecutionCreatedAt: &createdAt,
ExecutionUpdatedAt: &executionUpdatedAt,
}

executions := make([]map[string]interface{}, 0)
execution := getMockExecutionResponseFromDb(expectedExecution)
executions = append(executions, execution)

GlobalMock := mocket.Catcher.Reset()
// Only match on queries that append expected filters
GlobalMock.NewMock().WithQuery(`SELECT * FROM "executions" WHERE "executions"."deleted_at" IS NULL AND ` +
`(("executions"."id" = 20)) ORDER BY "executions"."id" ASC LIMIT 1`).WithReply(executions)
output, err := executionRepo.GetByID(context.Background(), uint(20))
assert.NoError(t, err)
assert.EqualValues(t, expectedExecution, output)
}

func TestListExecutions(t *testing.T) {
executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())

Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/repositories/gormimpl/launch_plan_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r *LaunchPlanRepo) Get(ctx context.Context, input interfaces.GetResourceIn
Name: input.Name,
Version: input.Version,
},
}).First(&launchPlan)
}).Take(&launchPlan)
timer.Stop()
if tx.Error != nil {
return models.LaunchPlan{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ func TestGetLaunchPlan(t *testing.T) {
GlobalMock.NewMock().WithQuery(
`SELECT * FROM "launch_plans" WHERE "launch_plans"."deleted_at" IS NULL AND ` +
`(("launch_plans"."project" = project) AND ("launch_plans"."domain" = domain) AND ` +
`("launch_plans"."name" = name) AND ("launch_plans"."version" = XYZ)) ORDER BY "launch_plans"."id" ` +
`ASC LIMIT 1`).WithReply(launchPlans)
`("launch_plans"."name" = name) AND ("launch_plans"."version" = XYZ)) LIMIT 1`).WithReply(launchPlans)
output, err := launchPlanRepo.Get(context.Background(), interfaces.GetResourceInput{
Project: project,
Domain: domain,
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/repositories/gormimpl/named_entity_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (r *NamedEntityRepo) Get(ctx context.Context, input interfaces.GetNamedEnti
}

timer := r.metrics.GetDuration.Start()
tx = tx.Select(getSelectForNamedEntity(tableName, input.ResourceType)).First(&namedEntity)
tx = tx.Select(getSelectForNamedEntity(tableName, input.ResourceType)).Take(&namedEntity)
timer.Stop()

if tx.Error != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (r *NodeExecutionRepo) Get(ctx context.Context, input interfaces.GetNodeExe
Name: input.NodeExecutionIdentifier.ExecutionId.Name,
},
},
}).Preload("ChildNodeExecutions").First(&nodeExecution)
}).Preload("ChildNodeExecutions").Take(&nodeExecution)
timer.Stop()
if tx.Error != nil {
return models.NodeExecution{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestGetNodeExecution(t *testing.T) {
`SELECT * FROM "node_executions" WHERE "node_executions"."deleted_at" IS NULL AND ` +
`(("node_executions"."execution_project" = execution_project) AND ("node_executions"."execution_domain" ` +
`= execution_domain) AND ("node_executions"."execution_name" = execution_name) AND ("node_executions".` +
`"node_id" = 1)) ORDER BY "node_executions"."id" ASC LIMIT 1`).WithReply(nodeExecutions)
`"node_id" = 1)) LIMIT 1`).WithReply(nodeExecutions)
output, err := nodeExecutionRepo.Get(context.Background(), interfaces.GetNodeExecutionInput{
NodeExecutionIdentifier: core.NodeExecutionIdentifier{
NodeId: "1",
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/repositories/gormimpl/project_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (r *ProjectRepo) Get(ctx context.Context, projectID string) (models.Project
timer := r.metrics.GetDuration.Start()
tx := r.db.Where(&models.Project{
Identifier: projectID,
}).First(&project)
}).Take(&project)
timer.Stop()
if tx.Error != nil {
return models.Project{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/repositories/gormimpl/project_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestGetProject(t *testing.T) {

query := GlobalMock.NewMock()
query.WithQuery(`SELECT * FROM "projects" WHERE "projects"."deleted_at" IS NULL AND ` +
`(("projects"."identifier" = project_id)) ORDER BY "projects"."identifier" ASC LIMIT 1`).WithReply(
`(("projects"."identifier" = project_id)) LIMIT 1`).WithReply(
[]map[string]interface{}{
response,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *TaskExecutionRepo) Get(ctx context.Context, input interfaces.GetTaskExe
},
RetryAttempt: &input.TaskExecutionID.RetryAttempt,
},
}).Preload("ChildNodeExecution").First(&taskExecution)
}).Preload("ChildNodeExecution").Take(&taskExecution)
timer.Stop()
if tx.Error != nil {
return models.TaskExecution{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func TestGetTaskExecution(t *testing.T) {
`"project" = project) AND ("task_executions"."domain" = domain) AND ("task_executions"."name" = task-id) ` +
`AND ("task_executions"."version" = task-version) AND ("task_executions"."execution_project" = project) ` +
`AND ("task_executions"."execution_domain" = domain) AND ("task_executions"."execution_name" = name) AND` +
` ("task_executions"."node_id" = node-id) AND ("task_executions"."retry_attempt" = 0)) ` +
`ORDER BY "task_executions"."id" ASC LIMIT 1`).WithReply(taskExecutions)
` ("task_executions"."node_id" = node-id) AND ("task_executions"."retry_attempt" = 0)) LIMIT 1`).
WithReply(taskExecutions)

output, err := taskExecutionRepo.Get(context.Background(), interfaces.GetTaskExecutionInput{
TaskExecutionID: core.TaskExecutionIdentifier{
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/repositories/gormimpl/task_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *TaskRepo) Get(ctx context.Context, input interfaces.GetResourceInput) (
Name: input.Name,
Version: input.Version,
},
}).First(&task)
}).Take(&task)
timer.Stop()
if tx.Error != nil {
return models.Task{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/pkg/repositories/gormimpl/task_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestGetTask(t *testing.T) {
// Only match on queries that append expected filters
GlobalMock.NewMock().WithQuery(
`SELECT * FROM "tasks" WHERE "tasks"."deleted_at" IS NULL AND (("tasks"."project" = project) ` +
`AND ("tasks"."domain" = domain) AND ("tasks"."name" = name) AND ("tasks"."version" = XYZ)) ` +
`ORDER BY "tasks"."id" ASC LIMIT 1`).WithReply(tasks)
`AND ("tasks"."domain" = domain) AND ("tasks"."name" = name) AND ("tasks"."version" = XYZ)) LIMIT 1`).
WithReply(tasks)
output, err := taskRepo.Get(context.Background(), interfaces.GetResourceInput{
Project: project,
Domain: domain,
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/repositories/gormimpl/workflow_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *WorkflowRepo) Get(ctx context.Context, input interfaces.GetResourceInpu
Name: input.Name,
Version: input.Version,
},
}).First(&workflow)
}).Take(&workflow)
timer.Stop()
if tx.Error != nil {
return models.Workflow{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand Down
2 changes: 0 additions & 2 deletions flyteadmin/pkg/repositories/interfaces/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ type ExecutionRepoInterface interface {
UpdateExecution(ctx context.Context, execution models.Execution) error
// Returns a matching execution if it exists.
Get(ctx context.Context, input GetResourceInput) (models.Execution, error)
// Return a matching execution if it exists
GetByID(ctx context.Context, id uint) (models.Execution, error)
// Returns executions matching query parameters. A limit must be provided for the results page size.
List(ctx context.Context, input ListResourceInput) (ExecutionCollectionOutput, error)
}
Expand Down
13 changes: 0 additions & 13 deletions flyteadmin/pkg/repositories/mocks/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ type CreateExecutionFunc func(ctx context.Context, input models.Execution) error
type UpdateFunc func(ctx context.Context, event models.ExecutionEvent, execution models.Execution) error
type UpdateExecutionFunc func(ctx context.Context, execution models.Execution) error
type GetExecutionFunc func(ctx context.Context, input interfaces.GetResourceInput) (models.Execution, error)
type GetExecutionByIDFunc func(ctx context.Context, id uint) (models.Execution, error)
type ListExecutionFunc func(ctx context.Context, input interfaces.ListResourceInput) (
interfaces.ExecutionCollectionOutput, error)

Expand All @@ -20,7 +19,6 @@ type MockExecutionRepo struct {
updateFunction UpdateFunc
updateExecutionFunc UpdateExecutionFunc
getFunction GetExecutionFunc
getByIDFunction GetExecutionByIDFunc
listFunction ListExecutionFunc
}

Expand Down Expand Up @@ -68,17 +66,6 @@ func (r *MockExecutionRepo) SetGetCallback(getFunction GetExecutionFunc) {
r.getFunction = getFunction
}

func (r *MockExecutionRepo) GetByID(ctx context.Context, id uint) (models.Execution, error) {
if r.getByIDFunction != nil {
return r.getByIDFunction(ctx, id)
}
return models.Execution{}, nil
}

func (r *MockExecutionRepo) SetGetByIDCallback(getByIDFunction GetExecutionByIDFunc) {
r.getByIDFunction = getByIDFunction
}

func (r *MockExecutionRepo) List(ctx context.Context, input interfaces.ListResourceInput) (
interfaces.ExecutionCollectionOutput, error) {
if r.listFunction != nil {
Expand Down
13 changes: 0 additions & 13 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,6 @@ func FromExecutionModel(executionModel models.Execution) (*admin.Execution, erro
}, nil
}

func FromExecutionModelWithReferenceExecution(executionModel models.Execution, referenceExecutionID *core.WorkflowExecutionIdentifier) (
*admin.Execution, error) {
execution, err := FromExecutionModel(executionModel)
if err != nil {
return nil, err
}
if referenceExecutionID != nil && execution.Spec.Metadata != nil &&
execution.Spec.Metadata.Mode == admin.ExecutionMetadata_RELAUNCH {
execution.Spec.Metadata.ReferenceExecution = referenceExecutionID
}
return execution, nil
}

func FromExecutionModels(executionModels []models.Execution) ([]*admin.Execution, error) {
executions := make([]*admin.Execution, len(executionModels))
for idx, executionModel := range executionModels {
Expand Down
51 changes: 0 additions & 51 deletions flyteadmin/pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,57 +420,6 @@ func TestFromExecutionModel_Aborted(t *testing.T) {
assert.Empty(t, execution.Closure.GetAbortCause())
}

func TestFromExecutionModelWithReferenceExecution(t *testing.T) {
spec := testutils.GetExecutionRequest().Spec
spec.Metadata = &admin.ExecutionMetadata{
Mode: admin.ExecutionMetadata_RELAUNCH,
}
specBytes, _ := proto.Marshal(spec)
phase := core.WorkflowExecution_RUNNING.String()
startedAt := time.Date(2018, 8, 30, 0, 0, 0, 0, time.UTC)
startedAtProto, _ := ptypes.TimestampProto(startedAt)
closure := admin.ExecutionClosure{
ComputedInputs: spec.Inputs,
Phase: core.WorkflowExecution_RUNNING,
StartedAt: startedAtProto,
}
closureBytes, _ := proto.Marshal(&closure)

executionModel := models.Execution{
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Phase: phase,
Closure: closureBytes,
LaunchPlanID: uint(1),
WorkflowID: uint(2),
StartedAt: &startedAt,
}
execution, err := FromExecutionModelWithReferenceExecution(executionModel, nil)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.Execution{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Spec: spec,
Closure: &closure,
}, execution))

referenceExecutionID := &core.WorkflowExecutionIdentifier{
Project: "ref_project",
Domain: "ref_domain",
Name: "ref_name",
}
execution, err = FromExecutionModelWithReferenceExecution(executionModel, referenceExecutionID)
assert.Nil(t, err)
assert.True(t, proto.Equal(referenceExecutionID, execution.Spec.Metadata.ReferenceExecution))
}

func TestFromExecutionModels(t *testing.T) {
spec := testutils.GetExecutionRequest().Spec
specBytes, _ := proto.Marshal(spec)
Expand Down

0 comments on commit 7e13a8d

Please sign in to comment.