This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Move logic to track terminated workflows to new TerminatedTrackingSto…
…re workflowstore Signed-off-by: Daniel Shuy <daniel_shuy@hotmail.com>
- Loading branch information
1 parent
e79dae7
commit 41e312b
Showing
6 changed files
with
228 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package workflowstore | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" | ||
"github.com/flyteorg/flytestdlib/fastcheck" | ||
|
||
"github.com/flyteorg/flytestdlib/promutils" | ||
) | ||
|
||
func workflowKey(namespace, name string) string { | ||
return fmt.Sprintf("%s/%s", namespace, name) | ||
} | ||
|
||
// A specialized store that stores a LRU cache of all the workflows that are in a terminal phase. | ||
// Terminated workflows are ignored (Get returns a nil). | ||
// Processing terminated FlyteWorkflows can occur when workflow updates are reported after a workflow has already completed. | ||
type terminatedTracking struct { | ||
w FlyteWorkflow | ||
terminatedFilter fastcheck.Filter | ||
} | ||
|
||
func (t *terminatedTracking) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) { | ||
if t.terminatedFilter.Contains(ctx, []byte(workflowKey(namespace, name))) { | ||
return nil, ErrWorkflowTerminated | ||
} | ||
|
||
return t.w.Get(ctx, namespace, name) | ||
} | ||
|
||
func (t *terminatedTracking) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( | ||
newWF *v1alpha1.FlyteWorkflow, err error) { | ||
newWF, err = t.w.UpdateStatus(ctx, workflow, priorityClass) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if newWF != nil { | ||
if newWF.GetExecutionStatus().IsTerminated() { | ||
t.terminatedFilter.Add(ctx, []byte(workflowKey(workflow.Namespace, workflow.Name))) | ||
} | ||
} | ||
|
||
return newWF, nil | ||
} | ||
|
||
func (t *terminatedTracking) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( | ||
newWF *v1alpha1.FlyteWorkflow, err error) { | ||
newWF, err = t.w.Update(ctx, workflow, priorityClass) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if newWF != nil { | ||
if newWF.GetExecutionStatus().IsTerminated() { | ||
t.terminatedFilter.Add(ctx, []byte(workflowKey(workflow.Namespace, workflow.Name))) | ||
} | ||
} | ||
|
||
return newWF, nil | ||
} | ||
|
||
func NewTerminatedTrackingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error) { | ||
filter, err := fastcheck.NewLRUCacheFilter(1000, scope.NewSubScope("terminated_filter")) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &terminatedTracking{ | ||
w: workflowStore, | ||
terminatedFilter: filter, | ||
}, nil | ||
} |
Oops, something went wrong.