Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Move logic to track terminated workflows to new TerminatedTrackingSto…
Browse files Browse the repository at this point in the history
…re workflowstore
  • Loading branch information
daniel-shuy committed Oct 22, 2022
1 parent c3dd588 commit 0490a53
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 37 deletions.
4 changes: 3 additions & 1 deletion pkg/controller/workflowstore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
PolicyInMemory = "InMemory"
// PolicyPassThrough just calls the underlying Clientset or the shared informer cache to get or write the workflow
PolicyPassThrough = "PassThrough"
// PolicyTrackTerminated tracks terminated workflows
PolicyTrackTerminated = "TrackTerminated"
// PolicyResourceVersionCache uses the resource version on the Workflow object, to determine if the inmemory copy
// of the workflow is stale
PolicyResourceVersionCache = "ResourceVersionCache"
Expand All @@ -28,7 +30,7 @@ var (
)

// Config for Workflow access in the controller.
// Various policies are available like - InMemory, PassThrough, ResourceVersionCache
// Various policies are available like - InMemory, PassThrough, TrackTerminated, ResourceVersionCache
type Config struct {
Policy Policy `json:"policy" pflag:",Workflow Store Policy to initialize"`
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/workflowstore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ func NewWorkflowStore(ctx context.Context, cfg *Config, lister v1alpha1.FlyteWor
workflowStore = NewInMemoryWorkflowStore()
case PolicyPassThrough:
workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister)
case PolicyTrackTerminated:
workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister)
workflowStore, err = NewTerminatedTrackingStore(ctx, scope, workflowStore)
case PolicyResourceVersionCache:
workflowStore, err = NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, workflows, lister))
workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister)
workflowStore, err = NewTerminatedTrackingStore(ctx, scope, workflowStore)
workflowStore = NewResourceVersionCachingStore(ctx, scope, workflowStore)
}

if err != nil {
Expand Down
29 changes: 2 additions & 27 deletions pkg/controller/workflowstore/resource_version_caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ import (

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

"github.com/flyteorg/flytestdlib/fastcheck"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// TODO - optimization maybe? we can move this to predicate check, before we add it to the queue?
Expand All @@ -33,7 +30,6 @@ type resourceVersionCaching struct {
w FlyteWorkflow
metrics *resourceVersionMetrics
lastUpdatedResourceVersionCache sync.Map
terminatedFilter fastcheck.Filter
}

func (r *resourceVersionCaching) updateRevisionCache(ctx context.Context, namespace, name, resourceVersion string, isTerminated bool) {
Expand All @@ -58,13 +54,6 @@ func (r *resourceVersionCaching) isResourceVersionSameAsPrevious(ctx context.Con
}

func (r *resourceVersionCaching) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) {
// Check if the resource version key has already been stored in a terminal phase. Processing
// terminated FlyteWorkflows can occur when workflow updates are reported after a workflow
// has already completed.
if r.terminatedFilter.Contains(ctx, []byte(resourceVersionKey(namespace, name))) {
return nil, ErrWorkflowTerminated
}

w, err := r.w.Get(ctx, namespace, name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -98,10 +87,6 @@ func (r *resourceVersionCaching) UpdateStatus(ctx context.Context, workflow *v1a
} else {
r.metrics.workflowRedundantUpdatesCount.Inc(ctx)
}

if newWF.GetExecutionStatus().IsTerminated() {
r.terminatedFilter.Add(ctx, []byte(resourceVersionKey(workflow.Namespace, workflow.Name)))
}
}

return newWF, nil
Expand All @@ -122,21 +107,12 @@ func (r *resourceVersionCaching) Update(ctx context.Context, workflow *v1alpha1.
} else {
r.metrics.workflowRedundantUpdatesCount.Inc(ctx)
}

if newWF.GetExecutionStatus().IsTerminated() {
r.terminatedFilter.Add(ctx, []byte(resourceVersionKey(workflow.Namespace, workflow.Name)))
}
}

return newWF, nil
}

func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error) {
filter, err := fastcheck.NewLRUCacheFilter(1000, scope.NewSubScope("terminated_filter"))
if err != nil {
return nil, err
}

func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) FlyteWorkflow {
return &resourceVersionCaching{
w: workflowStore,
metrics: &resourceVersionMetrics{
Expand All @@ -145,6 +121,5 @@ func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, wo
workflowRedundantUpdatesCount: labeled.NewCounter("wf_redundant", "Workflow Update called but ectd. detected no actual update to the workflow.", scope, labeled.EmitUnlabeledMetric),
},
lastUpdatedResourceVersionCache: sync.Map{},
terminatedFilter: filter,
}, nil
}
}
23 changes: 15 additions & 8 deletions pkg/controller/workflowstore/resource_version_caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ func TestResourceVersionCaching_Get_NotInCache(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)

t.Run("notFound", func(t *testing.T) {
Expand Down Expand Up @@ -163,7 +165,9 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {
newWf := wf.DeepCopy()
newWf.Status.Phase = v1alpha1.WorkflowPhaseSucceeding

wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, newWf, PriorityClassCritical)
Expand All @@ -183,7 +187,9 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, wf, PriorityClassCritical)
Expand Down Expand Up @@ -216,7 +222,9 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, wf, PriorityClassCritical)
Expand Down Expand Up @@ -254,7 +262,9 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, newWf, PriorityClassCritical)
Expand All @@ -275,9 +285,6 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) {
assert.Nil(t, v)

// validate that terminated workflows are not retrievable
terminated := rvStore.terminatedFilter.Contains(ctx, []byte(resourceVersionKey(namespace, name)))
assert.True(t, terminated)

terminatedWf, err := wfStore.Get(ctx, namespace, name)
assert.Nil(t, terminatedWf)
assert.True(t, IsWorkflowTerminated(err))
Expand Down
75 changes: 75 additions & 0 deletions pkg/controller/workflowstore/terminated_tracking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package workflowstore

import (
"context"
"fmt"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytestdlib/fastcheck"

"github.com/flyteorg/flytestdlib/promutils"
)

func workflowKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

// A specialized store that stores a LRU cache of all the workflows that are in a terminal phase.
// Terminated workflows are ignored (Get returns a nil).
// Processing terminated FlyteWorkflows can occur when workflow updates are reported after a workflow has already completed.
type terminatedTracking struct {
w FlyteWorkflow
terminatedFilter fastcheck.Filter
}

func (t *terminatedTracking) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) {
if t.terminatedFilter.Contains(ctx, []byte(workflowKey(namespace, name))) {
return nil, ErrWorkflowTerminated
}

return t.w.Get(ctx, namespace, name)
}

func (r *terminatedTracking) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
newWF, err = r.w.UpdateStatus(ctx, workflow, priorityClass)
if err != nil {
return nil, err
}

if newWF != nil {
if newWF.GetExecutionStatus().IsTerminated() {
r.terminatedFilter.Add(ctx, []byte(workflowKey(workflow.Namespace, workflow.Name)))
}
}

return newWF, nil
}

func (r *terminatedTracking) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
newWF, err = r.w.Update(ctx, workflow, priorityClass)
if err != nil {
return nil, err
}

if newWF != nil {
if newWF.GetExecutionStatus().IsTerminated() {
r.terminatedFilter.Add(ctx, []byte(workflowKey(workflow.Namespace, workflow.Name)))
}
}

return newWF, nil
}

func NewTerminatedTrackingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error) {
filter, err := fastcheck.NewLRUCacheFilter(1000, scope.NewSubScope("terminated_filter"))
if err != nil {
return nil, err
}

return &terminatedTracking{
w: workflowStore,
terminatedFilter: filter,
}, nil
}
102 changes: 102 additions & 0 deletions pkg/controller/workflowstore/terminated_tracking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package workflowstore

import (
"context"
"testing"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned/fake"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)

func TestTerminatedTrackingStore_Update(t *testing.T) {
ctx := context.TODO()

mockClient := fake.NewSimpleClientset().FlyteworkflowV1alpha1()

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
wfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
assert.NoError(t, err)

t.Run("Succeeding", func(t *testing.T) {
namespace := "test-ns"
name := "succeeding"

wf := dummyWf(namespace, name)
wf.Status.Phase = v1alpha1.WorkflowPhaseSucceeding

_, err := mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{})
assert.NoError(t, err)

_, err = wfStore.Update(ctx, wf, PriorityClassCritical)
assert.NoError(t, err)

l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) {
return wf, nil
}
succeedingWf, err := wfStore.Get(ctx, namespace, name)
assert.NoError(t, err)
assert.Equal(t, succeedingWf, wf)
})

t.Run("Terminated", func(t *testing.T) {
namespace := "test-ns"
name := "terminated"

wf := dummyWf(namespace, name)
wf.Status.Phase = v1alpha1.WorkflowPhaseAborted

_, err := mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{})
assert.NoError(t, err)

_, err = wfStore.Update(ctx, wf, PriorityClassCritical)
assert.NoError(t, err)

l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) {
return wf, nil
}
terminatedWf, err := wfStore.Get(ctx, namespace, name)
assert.Nil(t, terminatedWf)
assert.True(t, IsWorkflowTerminated(err))
})
}

func TestTerminatedTrackingStore_UpdateStatus(t *testing.T) {
ctx := context.TODO()

mockClient := fake.NewSimpleClientset().FlyteworkflowV1alpha1()

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
wfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
assert.NoError(t, err)

namespace := "test-ns"
name := "name"

wf := dummyWf(namespace, name)
wf.Status.Phase = v1alpha1.WorkflowPhaseSucceeding

_, err = mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{})
assert.NoError(t, err)

_, err = wfStore.Update(ctx, wf, PriorityClassCritical)
assert.NoError(t, err)

wf.Status.Phase = v1alpha1.WorkflowPhaseAborted
_, err = wfStore.UpdateStatus(ctx, wf, PriorityClassCritical)
assert.NoError(t, err)

l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) {
return wf, nil
}
terminatedWf, err := wfStore.Get(ctx, namespace, name)
assert.Nil(t, terminatedWf)
assert.True(t, IsWorkflowTerminated(err))
}

0 comments on commit 0490a53

Please sign in to comment.