diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 1053e0126f..bd85d8bee7 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -45,7 +45,6 @@ import ( ) const childContainerQueueKey = "child_queue" -const noSourceExecutionID = 0 const principalContextKeyFormat = "%v" // Map of [project] -> map of [domain] -> stop watch @@ -785,6 +784,7 @@ func (m *ExecutionManager) RelaunchExecution( inputs = spec.Inputs } executionSpec.Metadata.Mode = admin.ExecutionMetadata_RELAUNCH + executionSpec.Metadata.ReferenceExecution = existingExecution.Id var executionModel *models.Execution ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ Project: request.Id.Project, @@ -1027,21 +1027,7 @@ func (m *ExecutionManager) GetExecution( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err) return nil, err } - var execution *admin.Execution - var transformerErr error - if executionModel.SourceExecutionID != noSourceExecutionID { - // Fetch parent execution to reconstruct its WorkflowExecutionIdentifier - referenceExecutionModel, err := m.db.ExecutionRepo().GetByID(ctx, executionModel.SourceExecutionID) - if err != nil { - logger.Debugf(ctx, "Failed to get reference execution source execution id [%s] for descendant execution [%v]", - executionModel.SourceExecutionID) - return nil, err - } - referenceExecutionID := transformers.GetExecutionIdentifier(&referenceExecutionModel) - execution, transformerErr = transformers.FromExecutionModelWithReferenceExecution(*executionModel, &referenceExecutionID) - } else { - execution, transformerErr = transformers.FromExecutionModel(*executionModel) - } + execution, transformerErr := transformers.FromExecutionModel(*executionModel) if transformerErr != nil { logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, transformerErr) diff --git a/flyteadmin/pkg/repositories/gormimpl/execution_repo.go b/flyteadmin/pkg/repositories/gormimpl/execution_repo.go index 941ff7701a..8c5abb3f1d 100644 --- a/flyteadmin/pkg/repositories/gormimpl/execution_repo.go +++ b/flyteadmin/pkg/repositories/gormimpl/execution_repo.go @@ -41,7 +41,7 @@ func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.GetResourceInp Domain: input.Domain, Name: input.Name, }, - }).First(&execution) + }).Take(&execution) timer.Stop() if tx.Error != nil { return models.Execution{}, r.errorTransformer.ToFlyteAdminError(tx.Error) @@ -56,24 +56,6 @@ func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.GetResourceInp return execution, nil } -func (r *ExecutionRepo) GetByID(ctx context.Context, id uint) (models.Execution, error) { - var execution models.Execution - timer := r.metrics.GetDuration.Start() - tx := r.db.Where(&models.Execution{ - BaseModel: models.BaseModel{ - ID: id, - }, - }).First(&execution) - timer.Stop() - if tx.Error != nil { - return models.Execution{}, r.errorTransformer.ToFlyteAdminError(tx.Error) - } - if tx.RecordNotFound() { - return models.Execution{}, errors.GetMissingEntityByIDError("execution") - } - return execution, nil -} - func (r *ExecutionRepo) Update(ctx context.Context, event models.ExecutionEvent, execution models.Execution) error { timer := r.metrics.UpdateDuration.Start() defer timer.Stop() diff --git a/flyteadmin/pkg/repositories/gormimpl/execution_repo_test.go b/flyteadmin/pkg/repositories/gormimpl/execution_repo_test.go index fe2a2615ad..69a6a818f4 100644 --- a/flyteadmin/pkg/repositories/gormimpl/execution_repo_test.go +++ b/flyteadmin/pkg/repositories/gormimpl/execution_repo_test.go @@ -163,7 +163,7 @@ func TestGetExecution(t *testing.T) { // Only match on queries that append expected filters GlobalMock.NewMock().WithQuery(`SELECT * FROM "executions" WHERE "executions"."deleted_at" IS NULL AND ` + `(("executions"."execution_project" = project) AND ("executions"."execution_domain" = domain) AND ` + - `("executions"."execution_name" = 1)) ORDER BY "executions"."id" ASC LIMIT 1`).WithReply(executions) + `("executions"."execution_name" = 1)) LIMIT 1`).WithReply(executions) output, err := executionRepo.Get(context.Background(), interfaces.GetResourceInput{ Project: "project", Domain: "domain", @@ -173,40 +173,6 @@ func TestGetExecution(t *testing.T) { assert.EqualValues(t, expectedExecution, output) } -func TestGetByIDExecution(t *testing.T) { - executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope()) - expectedExecution := models.Execution{ - BaseModel: models.BaseModel{ - ID: uint(20), - }, - ExecutionKey: models.ExecutionKey{ - Project: "project", - Domain: "domain", - Name: "1", - }, - LaunchPlanID: uint(2), - Phase: core.WorkflowExecution_SUCCEEDED.String(), - Closure: []byte{1, 2}, - WorkflowID: uint(3), - Spec: []byte{3, 4}, - StartedAt: &executionStartedAt, - ExecutionCreatedAt: &createdAt, - ExecutionUpdatedAt: &executionUpdatedAt, - } - - executions := make([]map[string]interface{}, 0) - execution := getMockExecutionResponseFromDb(expectedExecution) - executions = append(executions, execution) - - GlobalMock := mocket.Catcher.Reset() - // Only match on queries that append expected filters - GlobalMock.NewMock().WithQuery(`SELECT * FROM "executions" WHERE "executions"."deleted_at" IS NULL AND ` + - `(("executions"."id" = 20)) ORDER BY "executions"."id" ASC LIMIT 1`).WithReply(executions) - output, err := executionRepo.GetByID(context.Background(), uint(20)) - assert.NoError(t, err) - assert.EqualValues(t, expectedExecution, output) -} - func TestListExecutions(t *testing.T) { executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope()) diff --git a/flyteadmin/pkg/repositories/gormimpl/launch_plan_repo.go b/flyteadmin/pkg/repositories/gormimpl/launch_plan_repo.go index ee4cabfd9a..957274df34 100644 --- a/flyteadmin/pkg/repositories/gormimpl/launch_plan_repo.go +++ b/flyteadmin/pkg/repositories/gormimpl/launch_plan_repo.go @@ -58,7 +58,7 @@ func (r *LaunchPlanRepo) Get(ctx context.Context, input interfaces.GetResourceIn Name: input.Name, Version: input.Version, }, - }).First(&launchPlan) + }).Take(&launchPlan) timer.Stop() if tx.Error != nil { return models.LaunchPlan{}, r.errorTransformer.ToFlyteAdminError(tx.Error) diff --git a/flyteadmin/pkg/repositories/gormimpl/launch_plan_repo_test.go b/flyteadmin/pkg/repositories/gormimpl/launch_plan_repo_test.go index a89acf96cd..7371ebea85 100644 --- a/flyteadmin/pkg/repositories/gormimpl/launch_plan_repo_test.go +++ b/flyteadmin/pkg/repositories/gormimpl/launch_plan_repo_test.go @@ -77,8 +77,7 @@ func TestGetLaunchPlan(t *testing.T) { GlobalMock.NewMock().WithQuery( `SELECT * FROM "launch_plans" WHERE "launch_plans"."deleted_at" IS NULL AND ` + `(("launch_plans"."project" = project) AND ("launch_plans"."domain" = domain) AND ` + - `("launch_plans"."name" = name) AND ("launch_plans"."version" = XYZ)) ORDER BY "launch_plans"."id" ` + - `ASC LIMIT 1`).WithReply(launchPlans) + `("launch_plans"."name" = name) AND ("launch_plans"."version" = XYZ)) LIMIT 1`).WithReply(launchPlans) output, err := launchPlanRepo.Get(context.Background(), interfaces.GetResourceInput{ Project: project, Domain: domain, diff --git a/flyteadmin/pkg/repositories/gormimpl/named_entity_repo.go b/flyteadmin/pkg/repositories/gormimpl/named_entity_repo.go index eedd488dbb..158f788355 100644 --- a/flyteadmin/pkg/repositories/gormimpl/named_entity_repo.go +++ b/flyteadmin/pkg/repositories/gormimpl/named_entity_repo.go @@ -152,7 +152,7 @@ func (r *NamedEntityRepo) Get(ctx context.Context, input interfaces.GetNamedEnti } timer := r.metrics.GetDuration.Start() - tx = tx.Select(getSelectForNamedEntity(tableName, input.ResourceType)).First(&namedEntity) + tx = tx.Select(getSelectForNamedEntity(tableName, input.ResourceType)).Take(&namedEntity) timer.Stop() if tx.Error != nil { diff --git a/flyteadmin/pkg/repositories/gormimpl/node_execution_repo.go b/flyteadmin/pkg/repositories/gormimpl/node_execution_repo.go index 9f670c983d..65a33f1894 100644 --- a/flyteadmin/pkg/repositories/gormimpl/node_execution_repo.go +++ b/flyteadmin/pkg/repositories/gormimpl/node_execution_repo.go @@ -56,7 +56,7 @@ func (r *NodeExecutionRepo) Get(ctx context.Context, input interfaces.GetNodeExe Name: input.NodeExecutionIdentifier.ExecutionId.Name, }, }, - }).Preload("ChildNodeExecutions").First(&nodeExecution) + }).Preload("ChildNodeExecutions").Take(&nodeExecution) timer.Stop() if tx.Error != nil { return models.NodeExecution{}, r.errorTransformer.ToFlyteAdminError(tx.Error) diff --git a/flyteadmin/pkg/repositories/gormimpl/node_execution_repo_test.go b/flyteadmin/pkg/repositories/gormimpl/node_execution_repo_test.go index 3390def566..4ce7da445a 100644 --- a/flyteadmin/pkg/repositories/gormimpl/node_execution_repo_test.go +++ b/flyteadmin/pkg/repositories/gormimpl/node_execution_repo_test.go @@ -186,7 +186,7 @@ func TestGetNodeExecution(t *testing.T) { `SELECT * FROM "node_executions" WHERE "node_executions"."deleted_at" IS NULL AND ` + `(("node_executions"."execution_project" = execution_project) AND ("node_executions"."execution_domain" ` + `= execution_domain) AND ("node_executions"."execution_name" = execution_name) AND ("node_executions".` + - `"node_id" = 1)) ORDER BY "node_executions"."id" ASC LIMIT 1`).WithReply(nodeExecutions) + `"node_id" = 1)) LIMIT 1`).WithReply(nodeExecutions) output, err := nodeExecutionRepo.Get(context.Background(), interfaces.GetNodeExecutionInput{ NodeExecutionIdentifier: core.NodeExecutionIdentifier{ NodeId: "1", diff --git a/flyteadmin/pkg/repositories/gormimpl/project_repo.go b/flyteadmin/pkg/repositories/gormimpl/project_repo.go index c22f99e312..fbcb4bc4d4 100644 --- a/flyteadmin/pkg/repositories/gormimpl/project_repo.go +++ b/flyteadmin/pkg/repositories/gormimpl/project_repo.go @@ -37,7 +37,7 @@ func (r *ProjectRepo) Get(ctx context.Context, projectID string) (models.Project timer := r.metrics.GetDuration.Start() tx := r.db.Where(&models.Project{ Identifier: projectID, - }).First(&project) + }).Take(&project) timer.Stop() if tx.Error != nil { return models.Project{}, r.errorTransformer.ToFlyteAdminError(tx.Error) diff --git a/flyteadmin/pkg/repositories/gormimpl/project_repo_test.go b/flyteadmin/pkg/repositories/gormimpl/project_repo_test.go index 924b02c346..dd0b49207c 100644 --- a/flyteadmin/pkg/repositories/gormimpl/project_repo_test.go +++ b/flyteadmin/pkg/repositories/gormimpl/project_repo_test.go @@ -50,7 +50,7 @@ func TestGetProject(t *testing.T) { query := GlobalMock.NewMock() query.WithQuery(`SELECT * FROM "projects" WHERE "projects"."deleted_at" IS NULL AND ` + - `(("projects"."identifier" = project_id)) ORDER BY "projects"."identifier" ASC LIMIT 1`).WithReply( + `(("projects"."identifier" = project_id)) LIMIT 1`).WithReply( []map[string]interface{}{ response, }) diff --git a/flyteadmin/pkg/repositories/gormimpl/task_execution_repo.go b/flyteadmin/pkg/repositories/gormimpl/task_execution_repo.go index 09c41ecbc7..31c69823af 100644 --- a/flyteadmin/pkg/repositories/gormimpl/task_execution_repo.go +++ b/flyteadmin/pkg/repositories/gormimpl/task_execution_repo.go @@ -51,7 +51,7 @@ func (r *TaskExecutionRepo) Get(ctx context.Context, input interfaces.GetTaskExe }, RetryAttempt: &input.TaskExecutionID.RetryAttempt, }, - }).Preload("ChildNodeExecution").First(&taskExecution) + }).Preload("ChildNodeExecution").Take(&taskExecution) timer.Stop() if tx.Error != nil { return models.TaskExecution{}, r.errorTransformer.ToFlyteAdminError(tx.Error) diff --git a/flyteadmin/pkg/repositories/gormimpl/task_execution_repo_test.go b/flyteadmin/pkg/repositories/gormimpl/task_execution_repo_test.go index 555b30b911..4f252be28d 100644 --- a/flyteadmin/pkg/repositories/gormimpl/task_execution_repo_test.go +++ b/flyteadmin/pkg/repositories/gormimpl/task_execution_repo_test.go @@ -107,8 +107,8 @@ func TestGetTaskExecution(t *testing.T) { `"project" = project) AND ("task_executions"."domain" = domain) AND ("task_executions"."name" = task-id) ` + `AND ("task_executions"."version" = task-version) AND ("task_executions"."execution_project" = project) ` + `AND ("task_executions"."execution_domain" = domain) AND ("task_executions"."execution_name" = name) AND` + - ` ("task_executions"."node_id" = node-id) AND ("task_executions"."retry_attempt" = 0)) ` + - `ORDER BY "task_executions"."id" ASC LIMIT 1`).WithReply(taskExecutions) + ` ("task_executions"."node_id" = node-id) AND ("task_executions"."retry_attempt" = 0)) LIMIT 1`). + WithReply(taskExecutions) output, err := taskExecutionRepo.Get(context.Background(), interfaces.GetTaskExecutionInput{ TaskExecutionID: core.TaskExecutionIdentifier{ diff --git a/flyteadmin/pkg/repositories/gormimpl/task_repo.go b/flyteadmin/pkg/repositories/gormimpl/task_repo.go index 8bc4b4a62f..bc6bd5a5ae 100644 --- a/flyteadmin/pkg/repositories/gormimpl/task_repo.go +++ b/flyteadmin/pkg/repositories/gormimpl/task_repo.go @@ -40,7 +40,7 @@ func (r *TaskRepo) Get(ctx context.Context, input interfaces.GetResourceInput) ( Name: input.Name, Version: input.Version, }, - }).First(&task) + }).Take(&task) timer.Stop() if tx.Error != nil { return models.Task{}, r.errorTransformer.ToFlyteAdminError(tx.Error) diff --git a/flyteadmin/pkg/repositories/gormimpl/task_repo_test.go b/flyteadmin/pkg/repositories/gormimpl/task_repo_test.go index bfc7005465..0dd7a69186 100644 --- a/flyteadmin/pkg/repositories/gormimpl/task_repo_test.go +++ b/flyteadmin/pkg/repositories/gormimpl/task_repo_test.go @@ -56,8 +56,8 @@ func TestGetTask(t *testing.T) { // Only match on queries that append expected filters GlobalMock.NewMock().WithQuery( `SELECT * FROM "tasks" WHERE "tasks"."deleted_at" IS NULL AND (("tasks"."project" = project) ` + - `AND ("tasks"."domain" = domain) AND ("tasks"."name" = name) AND ("tasks"."version" = XYZ)) ` + - `ORDER BY "tasks"."id" ASC LIMIT 1`).WithReply(tasks) + `AND ("tasks"."domain" = domain) AND ("tasks"."name" = name) AND ("tasks"."version" = XYZ)) LIMIT 1`). + WithReply(tasks) output, err := taskRepo.Get(context.Background(), interfaces.GetResourceInput{ Project: project, Domain: domain, diff --git a/flyteadmin/pkg/repositories/gormimpl/workflow_repo.go b/flyteadmin/pkg/repositories/gormimpl/workflow_repo.go index dfee74d399..80ebce3db3 100644 --- a/flyteadmin/pkg/repositories/gormimpl/workflow_repo.go +++ b/flyteadmin/pkg/repositories/gormimpl/workflow_repo.go @@ -40,7 +40,7 @@ func (r *WorkflowRepo) Get(ctx context.Context, input interfaces.GetResourceInpu Name: input.Name, Version: input.Version, }, - }).First(&workflow) + }).Take(&workflow) timer.Stop() if tx.Error != nil { return models.Workflow{}, r.errorTransformer.ToFlyteAdminError(tx.Error) diff --git a/flyteadmin/pkg/repositories/interfaces/execution_repo.go b/flyteadmin/pkg/repositories/interfaces/execution_repo.go index c79c3621e1..b414265de1 100644 --- a/flyteadmin/pkg/repositories/interfaces/execution_repo.go +++ b/flyteadmin/pkg/repositories/interfaces/execution_repo.go @@ -17,8 +17,6 @@ type ExecutionRepoInterface interface { UpdateExecution(ctx context.Context, execution models.Execution) error // Returns a matching execution if it exists. Get(ctx context.Context, input GetResourceInput) (models.Execution, error) - // Return a matching execution if it exists - GetByID(ctx context.Context, id uint) (models.Execution, error) // Returns executions matching query parameters. A limit must be provided for the results page size. List(ctx context.Context, input ListResourceInput) (ExecutionCollectionOutput, error) } diff --git a/flyteadmin/pkg/repositories/mocks/execution_repo.go b/flyteadmin/pkg/repositories/mocks/execution_repo.go index 980fb5ee5a..0101b9e781 100644 --- a/flyteadmin/pkg/repositories/mocks/execution_repo.go +++ b/flyteadmin/pkg/repositories/mocks/execution_repo.go @@ -11,7 +11,6 @@ type CreateExecutionFunc func(ctx context.Context, input models.Execution) error type UpdateFunc func(ctx context.Context, event models.ExecutionEvent, execution models.Execution) error type UpdateExecutionFunc func(ctx context.Context, execution models.Execution) error type GetExecutionFunc func(ctx context.Context, input interfaces.GetResourceInput) (models.Execution, error) -type GetExecutionByIDFunc func(ctx context.Context, id uint) (models.Execution, error) type ListExecutionFunc func(ctx context.Context, input interfaces.ListResourceInput) ( interfaces.ExecutionCollectionOutput, error) @@ -20,7 +19,6 @@ type MockExecutionRepo struct { updateFunction UpdateFunc updateExecutionFunc UpdateExecutionFunc getFunction GetExecutionFunc - getByIDFunction GetExecutionByIDFunc listFunction ListExecutionFunc } @@ -68,17 +66,6 @@ func (r *MockExecutionRepo) SetGetCallback(getFunction GetExecutionFunc) { r.getFunction = getFunction } -func (r *MockExecutionRepo) GetByID(ctx context.Context, id uint) (models.Execution, error) { - if r.getByIDFunction != nil { - return r.getByIDFunction(ctx, id) - } - return models.Execution{}, nil -} - -func (r *MockExecutionRepo) SetGetByIDCallback(getByIDFunction GetExecutionByIDFunc) { - r.getByIDFunction = getByIDFunction -} - func (r *MockExecutionRepo) List(ctx context.Context, input interfaces.ListResourceInput) ( interfaces.ExecutionCollectionOutput, error) { if r.listFunction != nil { diff --git a/flyteadmin/pkg/repositories/transformers/execution.go b/flyteadmin/pkg/repositories/transformers/execution.go index 5277798e84..8c26b042f4 100644 --- a/flyteadmin/pkg/repositories/transformers/execution.go +++ b/flyteadmin/pkg/repositories/transformers/execution.go @@ -227,19 +227,6 @@ func FromExecutionModel(executionModel models.Execution) (*admin.Execution, erro }, nil } -func FromExecutionModelWithReferenceExecution(executionModel models.Execution, referenceExecutionID *core.WorkflowExecutionIdentifier) ( - *admin.Execution, error) { - execution, err := FromExecutionModel(executionModel) - if err != nil { - return nil, err - } - if referenceExecutionID != nil && execution.Spec.Metadata != nil && - execution.Spec.Metadata.Mode == admin.ExecutionMetadata_RELAUNCH { - execution.Spec.Metadata.ReferenceExecution = referenceExecutionID - } - return execution, nil -} - func FromExecutionModels(executionModels []models.Execution) ([]*admin.Execution, error) { executions := make([]*admin.Execution, len(executionModels)) for idx, executionModel := range executionModels { diff --git a/flyteadmin/pkg/repositories/transformers/execution_test.go b/flyteadmin/pkg/repositories/transformers/execution_test.go index 1ff63c98d9..118a6f295b 100644 --- a/flyteadmin/pkg/repositories/transformers/execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/execution_test.go @@ -420,57 +420,6 @@ func TestFromExecutionModel_Aborted(t *testing.T) { assert.Empty(t, execution.Closure.GetAbortCause()) } -func TestFromExecutionModelWithReferenceExecution(t *testing.T) { - spec := testutils.GetExecutionRequest().Spec - spec.Metadata = &admin.ExecutionMetadata{ - Mode: admin.ExecutionMetadata_RELAUNCH, - } - specBytes, _ := proto.Marshal(spec) - phase := core.WorkflowExecution_RUNNING.String() - startedAt := time.Date(2018, 8, 30, 0, 0, 0, 0, time.UTC) - startedAtProto, _ := ptypes.TimestampProto(startedAt) - closure := admin.ExecutionClosure{ - ComputedInputs: spec.Inputs, - Phase: core.WorkflowExecution_RUNNING, - StartedAt: startedAtProto, - } - closureBytes, _ := proto.Marshal(&closure) - - executionModel := models.Execution{ - ExecutionKey: models.ExecutionKey{ - Project: "project", - Domain: "domain", - Name: "name", - }, - Spec: specBytes, - Phase: phase, - Closure: closureBytes, - LaunchPlanID: uint(1), - WorkflowID: uint(2), - StartedAt: &startedAt, - } - execution, err := FromExecutionModelWithReferenceExecution(executionModel, nil) - assert.Nil(t, err) - assert.True(t, proto.Equal(&admin.Execution{ - Id: &core.WorkflowExecutionIdentifier{ - Project: "project", - Domain: "domain", - Name: "name", - }, - Spec: spec, - Closure: &closure, - }, execution)) - - referenceExecutionID := &core.WorkflowExecutionIdentifier{ - Project: "ref_project", - Domain: "ref_domain", - Name: "ref_name", - } - execution, err = FromExecutionModelWithReferenceExecution(executionModel, referenceExecutionID) - assert.Nil(t, err) - assert.True(t, proto.Equal(referenceExecutionID, execution.Spec.Metadata.ReferenceExecution)) -} - func TestFromExecutionModels(t *testing.T) { spec := testutils.GetExecutionRequest().Spec specBytes, _ := proto.Marshal(spec)