From 26ad85757c57cda41bf23a2b054d49eccaa8145d Mon Sep 17 00:00:00 2001 From: Daniel Shuy Date: Tue, 25 Oct 2022 01:12:04 +0800 Subject: [PATCH] Move workflowstore logic to correct abstraction (#496) * Move logic to clear workflow CRD managed fields to passthrough workflowstore Signed-off-by: Daniel Shuy * Move logic to track terminated workflows to new TerminatedTrackingStore workflowstore Signed-off-by: Daniel Shuy Signed-off-by: Daniel Shuy Co-authored-by: Dan Rammer --- pkg/controller/workflowstore/config.go | 4 +- pkg/controller/workflowstore/factory.go | 7 +- pkg/controller/workflowstore/passthrough.go | 8 ++ .../workflowstore/resource_version_caching.go | 37 +------ .../resource_version_caching_test.go | 70 ++++++------ .../workflowstore/terminated_tracking.go | 75 +++++++++++++ .../workflowstore/terminated_tracking_test.go | 103 ++++++++++++++++++ 7 files changed, 236 insertions(+), 68 deletions(-) create mode 100644 pkg/controller/workflowstore/terminated_tracking.go create mode 100644 pkg/controller/workflowstore/terminated_tracking_test.go diff --git a/pkg/controller/workflowstore/config.go b/pkg/controller/workflowstore/config.go index aa70f3810..8f4f96477 100644 --- a/pkg/controller/workflowstore/config.go +++ b/pkg/controller/workflowstore/config.go @@ -13,6 +13,8 @@ const ( PolicyInMemory = "InMemory" // PolicyPassThrough just calls the underlying Clientset or the shared informer cache to get or write the workflow PolicyPassThrough = "PassThrough" + // PolicyTrackTerminated tracks terminated workflows + PolicyTrackTerminated = "TrackTerminated" // PolicyResourceVersionCache uses the resource version on the Workflow object, to determine if the inmemory copy // of the workflow is stale PolicyResourceVersionCache = "ResourceVersionCache" @@ -28,7 +30,7 @@ var ( ) // Config for Workflow access in the controller. -// Various policies are available like - InMemory, PassThrough, ResourceVersionCache +// Various policies are available like - InMemory, PassThrough, TrackTerminated, ResourceVersionCache type Config struct { Policy Policy `json:"policy" pflag:",Workflow Store Policy to initialize"` } diff --git a/pkg/controller/workflowstore/factory.go b/pkg/controller/workflowstore/factory.go index b083d66af..1cadc7354 100644 --- a/pkg/controller/workflowstore/factory.go +++ b/pkg/controller/workflowstore/factory.go @@ -20,8 +20,13 @@ func NewWorkflowStore(ctx context.Context, cfg *Config, lister v1alpha1.FlyteWor workflowStore = NewInMemoryWorkflowStore() case PolicyPassThrough: workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister) + case PolicyTrackTerminated: + workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister) + workflowStore, err = NewTerminatedTrackingStore(ctx, scope, workflowStore) case PolicyResourceVersionCache: - workflowStore, err = NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, workflows, lister)) + workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister) + workflowStore, err = NewTerminatedTrackingStore(ctx, scope, workflowStore) + workflowStore = NewResourceVersionCachingStore(ctx, scope, workflowStore) } if err != nil { diff --git a/pkg/controller/workflowstore/passthrough.go b/pkg/controller/workflowstore/passthrough.go index 8e5b03f60..7ff565ea9 100644 --- a/pkg/controller/workflowstore/passthrough.go +++ b/pkg/controller/workflowstore/passthrough.go @@ -76,6 +76,14 @@ func (p *passthroughWorkflowStore) UpdateStatus(ctx context.Context, workflow *v func (p *passthroughWorkflowStore) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( newWF *v1alpha1.FlyteWorkflow, err error) { + // If the workflow has any managed fields setting the array to one empty ManagedField clears them in the CRD. + // FlyteWorkflow CRDs are only managed by a single FlytePropeller instance and therefore the managed fields paradigm + // does not add useful functionality. Clearing them reduces CRD size, improving etcd I/O performance. + if len(workflow.ObjectMeta.ManagedFields) > 0 { + workflow.ObjectMeta.ManagedFields = workflow.ObjectMeta.ManagedFields[:1] + workflow.ObjectMeta.ManagedFields[0] = v1.ManagedFieldsEntry{} + } + p.metrics.workflowUpdateCount.Inc() // Something has changed. Lets save logger.Debugf(ctx, "Observed FlyteWorkflow Update (maybe finalizer)") diff --git a/pkg/controller/workflowstore/resource_version_caching.go b/pkg/controller/workflowstore/resource_version_caching.go index 5486437aa..d04c65cc8 100644 --- a/pkg/controller/workflowstore/resource_version_caching.go +++ b/pkg/controller/workflowstore/resource_version_caching.go @@ -7,11 +7,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" - "github.com/flyteorg/flytestdlib/fastcheck" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // TODO - optimization maybe? we can move this to predicate check, before we add it to the queue? @@ -33,7 +30,6 @@ type resourceVersionCaching struct { w FlyteWorkflow metrics *resourceVersionMetrics lastUpdatedResourceVersionCache sync.Map - terminatedFilter fastcheck.Filter } func (r *resourceVersionCaching) updateRevisionCache(ctx context.Context, namespace, name, resourceVersion string, isTerminated bool) { @@ -58,13 +54,6 @@ func (r *resourceVersionCaching) isResourceVersionSameAsPrevious(ctx context.Con } func (r *resourceVersionCaching) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) { - // Check if the resource version key has already been stored in a terminal phase. Processing - // terminated FlyteWorkflows can occur when workflow updates are reported after a workflow - // has already completed. - if r.terminatedFilter.Contains(ctx, []byte(resourceVersionKey(namespace, name))) { - return nil, ErrWorkflowTerminated - } - w, err := r.w.Get(ctx, namespace, name) if err != nil { return nil, err @@ -98,10 +87,6 @@ func (r *resourceVersionCaching) UpdateStatus(ctx context.Context, workflow *v1a } else { r.metrics.workflowRedundantUpdatesCount.Inc(ctx) } - - if newWF.GetExecutionStatus().IsTerminated() { - r.terminatedFilter.Add(ctx, []byte(resourceVersionKey(workflow.Namespace, workflow.Name))) - } } return newWF, nil @@ -109,14 +94,6 @@ func (r *resourceVersionCaching) UpdateStatus(ctx context.Context, workflow *v1a func (r *resourceVersionCaching) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( newWF *v1alpha1.FlyteWorkflow, err error) { - // If the workflow has any managed fields setting the array to one empty ManagedField clears them in the CRD. - // FlyteWorkflow CRDs are only managed by a single FlytePropeller instance and therefore the managed fields paradigm - // does not add useful functionality. Clearing them reduces CRD size, improving etcd I/O performance. - if len(workflow.ObjectMeta.ManagedFields) > 0 { - workflow.ObjectMeta.ManagedFields = workflow.ObjectMeta.ManagedFields[:1] - workflow.ObjectMeta.ManagedFields[0] = metav1.ManagedFieldsEntry{} - } - newWF, err = r.w.Update(ctx, workflow, priorityClass) if err != nil { return nil, err @@ -130,21 +107,12 @@ func (r *resourceVersionCaching) Update(ctx context.Context, workflow *v1alpha1. } else { r.metrics.workflowRedundantUpdatesCount.Inc(ctx) } - - if newWF.GetExecutionStatus().IsTerminated() { - r.terminatedFilter.Add(ctx, []byte(resourceVersionKey(workflow.Namespace, workflow.Name))) - } } return newWF, nil } -func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error) { - filter, err := fastcheck.NewLRUCacheFilter(1000, scope.NewSubScope("terminated_filter")) - if err != nil { - return nil, err - } - +func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) FlyteWorkflow { return &resourceVersionCaching{ w: workflowStore, metrics: &resourceVersionMetrics{ @@ -153,6 +121,5 @@ func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, wo workflowRedundantUpdatesCount: labeled.NewCounter("wf_redundant", "Workflow Update called but ectd. detected no actual update to the workflow.", scope, labeled.EmitUnlabeledMetric), }, lastUpdatedResourceVersionCache: sync.Map{}, - terminatedFilter: filter, - }, nil + } } diff --git a/pkg/controller/workflowstore/resource_version_caching_test.go b/pkg/controller/workflowstore/resource_version_caching_test.go index 807179dfc..10970cfe7 100644 --- a/pkg/controller/workflowstore/resource_version_caching_test.go +++ b/pkg/controller/workflowstore/resource_version_caching_test.go @@ -23,6 +23,11 @@ import ( kubeerrors "k8s.io/apimachinery/pkg/api/errors" ) +const ( + resourceVersionCachingNamespace = "ns" + resourceVersionCachingName = "name" +) + func init() { labeled.SetMetricKeys(contextutils.WorkflowIDKey) } @@ -33,14 +38,16 @@ func TestResourceVersionCaching_Get_NotInCache(t *testing.T) { scope := promutils.NewTestScope() l := &mockWFNamespaceLister{} - wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})) + passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}) + trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore) + wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore) assert.NoError(t, err) t.Run("notFound", func(t *testing.T) { l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) { - return nil, kubeerrors.NewNotFound(v1alpha1.Resource(v1alpha1.FlyteWorkflowKind), "name") + return nil, kubeerrors.NewNotFound(v1alpha1.Resource(v1alpha1.FlyteWorkflowKind), resourceVersionCachingName) } - w, err := wfStore.Get(ctx, "ns", "name") + w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName) assert.Error(t, err) assert.True(t, IsNotFound(err)) assert.Nil(t, w) @@ -48,9 +55,9 @@ func TestResourceVersionCaching_Get_NotInCache(t *testing.T) { t.Run("alreadyExists?", func(t *testing.T) { l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) { - return nil, kubeerrors.NewAlreadyExists(v1alpha1.Resource(v1alpha1.FlyteWorkflowKind), "name") + return nil, kubeerrors.NewAlreadyExists(v1alpha1.Resource(v1alpha1.FlyteWorkflowKind), resourceVersionCachingName) } - w, err := wfStore.Get(ctx, "ns", "name") + w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName) assert.Error(t, err) assert.Nil(t, w) }) @@ -59,7 +66,7 @@ func TestResourceVersionCaching_Get_NotInCache(t *testing.T) { l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) { return nil, fmt.Errorf("error") } - w, err := wfStore.Get(ctx, "ns", "name") + w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName) assert.Error(t, err) assert.Nil(t, w) }) @@ -69,7 +76,7 @@ func TestResourceVersionCaching_Get_NotInCache(t *testing.T) { l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) { return expW, nil } - w, err := wfStore.Get(ctx, "ns", "name") + w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName) assert.NoError(t, err) assert.Equal(t, expW, w) }) @@ -139,15 +146,13 @@ func createFakeClientSet() *fake.Clientset { func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) { ctx := context.TODO() - namespace := "ns" - name := "name" resourceVersion := "r1" mockClient := createFakeClientSet().FlyteworkflowV1alpha1() t.Run("Stale", func(t *testing.T) { - staleName := name + ".stale" - wf := dummyWf(namespace, staleName) + staleName := resourceVersionCachingName + ".stale" + wf := dummyWf(resourceVersionCachingNamespace, staleName) wf.ResourceVersion = resourceVersion scope := promutils.NewTestScope() @@ -163,13 +168,15 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) { newWf := wf.DeepCopy() newWf.Status.Phase = v1alpha1.WorkflowPhaseSucceeding - wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})) + passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}) + trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore) + wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore) assert.NoError(t, err) // Insert a new workflow with R1 _, err = wfStore.Update(ctx, newWf, PriorityClassCritical) assert.NoError(t, err) - w, err := wfStore.Get(ctx, namespace, staleName) + w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, staleName) assert.Error(t, err) assert.False(t, IsNotFound(err)) assert.True(t, IsWorkflowStale(err)) @@ -177,13 +184,15 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) { }) t.Run("Updated", func(t *testing.T) { - updatedName := name + ".updated" - wf := dummyWf(namespace, updatedName) + updatedName := resourceVersionCachingName + ".updated" + wf := dummyWf(resourceVersionCachingNamespace, updatedName) wf.ResourceVersion = resourceVersion scope := promutils.NewTestScope() l := &mockWFNamespaceLister{} - wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})) + passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}) + trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore) + wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore) assert.NoError(t, err) // Insert a new workflow with R1 _, err = wfStore.Update(ctx, wf, PriorityClassCritical) @@ -198,7 +207,7 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) { return wf2, nil } - w, err := wfStore.Get(ctx, namespace, updatedName) + w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, updatedName) assert.NoError(t, err) assert.NotNil(t, w) assert.Equal(t, "r2", w.ResourceVersion) @@ -207,8 +216,8 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) { // If we -mistakenly- attempted to update the store with the exact workflow object, etcd. will not bump the // resource version. Next read operation should continue to retrieve the same instance of the object. t.Run("NotUpdated", func(t *testing.T) { - notUpdatedName := name + ".not-updated" - wf := dummyWf(namespace, notUpdatedName) + notUpdatedName := resourceVersionCachingName + ".not-updated" + wf := dummyWf(resourceVersionCachingNamespace, notUpdatedName) wf.ResourceVersion = resourceVersion _, err := mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{}) @@ -216,7 +225,9 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) { scope := promutils.NewTestScope() l := &mockWFNamespaceLister{} - wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})) + passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}) + trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore) + wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore) assert.NoError(t, err) // Insert a new workflow with R1 _, err = wfStore.Update(ctx, wf, PriorityClassCritical) @@ -227,7 +238,7 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) { return wf, nil } - w, err := wfStore.Get(ctx, namespace, notUpdatedName) + w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, notUpdatedName) assert.NoError(t, err) assert.NotNil(t, w) assert.Equal(t, "r1", w.ResourceVersion) @@ -239,11 +250,9 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) { mockClient := createFakeClientSet().FlyteworkflowV1alpha1() - namespace := "ns" - name := "name" resourceVersion := "r1" - wf := dummyWf(namespace, name) + wf := dummyWf(resourceVersionCachingNamespace, resourceVersionCachingName) wf.ResourceVersion = resourceVersion _, err := mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{}) @@ -254,14 +263,16 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) { scope := promutils.NewTestScope() l := &mockWFNamespaceLister{} - wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})) + passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}) + trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore) + wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore) assert.NoError(t, err) // Insert a new workflow with R1 _, err = wfStore.Update(ctx, newWf, PriorityClassCritical) assert.NoError(t, err) rvStore := wfStore.(*resourceVersionCaching) - v, ok := rvStore.lastUpdatedResourceVersionCache.Load(resourceVersionKey(namespace, name)) + v, ok := rvStore.lastUpdatedResourceVersionCache.Load(resourceVersionKey(resourceVersionCachingNamespace, resourceVersionCachingName)) assert.True(t, ok) assert.Equal(t, resourceVersion, v.(string)) @@ -270,15 +281,12 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) { _, err = wfStore.Update(ctx, wf2, PriorityClassCritical) assert.NoError(t, err) - v, ok = rvStore.lastUpdatedResourceVersionCache.Load(resourceVersionKey(namespace, name)) + v, ok = rvStore.lastUpdatedResourceVersionCache.Load(resourceVersionKey(resourceVersionCachingNamespace, resourceVersionCachingName)) assert.False(t, ok) assert.Nil(t, v) // validate that terminated workflows are not retrievable - terminated := rvStore.terminatedFilter.Contains(ctx, []byte(resourceVersionKey(namespace, name))) - assert.True(t, terminated) - - terminatedWf, err := wfStore.Get(ctx, namespace, name) + terminatedWf, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName) assert.Nil(t, terminatedWf) assert.True(t, IsWorkflowTerminated(err)) } diff --git a/pkg/controller/workflowstore/terminated_tracking.go b/pkg/controller/workflowstore/terminated_tracking.go new file mode 100644 index 000000000..b2c75c97d --- /dev/null +++ b/pkg/controller/workflowstore/terminated_tracking.go @@ -0,0 +1,75 @@ +package workflowstore + +import ( + "context" + "fmt" + + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flytestdlib/fastcheck" + + "github.com/flyteorg/flytestdlib/promutils" +) + +func workflowKey(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + +// A specialized store that stores a LRU cache of all the workflows that are in a terminal phase. +// Terminated workflows are ignored (Get returns a nil). +// Processing terminated FlyteWorkflows can occur when workflow updates are reported after a workflow has already completed. +type terminatedTracking struct { + w FlyteWorkflow + terminatedFilter fastcheck.Filter +} + +func (t *terminatedTracking) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) { + if t.terminatedFilter.Contains(ctx, []byte(workflowKey(namespace, name))) { + return nil, ErrWorkflowTerminated + } + + return t.w.Get(ctx, namespace, name) +} + +func (t *terminatedTracking) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( + newWF *v1alpha1.FlyteWorkflow, err error) { + newWF, err = t.w.UpdateStatus(ctx, workflow, priorityClass) + if err != nil { + return nil, err + } + + if newWF != nil { + if newWF.GetExecutionStatus().IsTerminated() { + t.terminatedFilter.Add(ctx, []byte(workflowKey(workflow.Namespace, workflow.Name))) + } + } + + return newWF, nil +} + +func (t *terminatedTracking) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( + newWF *v1alpha1.FlyteWorkflow, err error) { + newWF, err = t.w.Update(ctx, workflow, priorityClass) + if err != nil { + return nil, err + } + + if newWF != nil { + if newWF.GetExecutionStatus().IsTerminated() { + t.terminatedFilter.Add(ctx, []byte(workflowKey(workflow.Namespace, workflow.Name))) + } + } + + return newWF, nil +} + +func NewTerminatedTrackingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error) { + filter, err := fastcheck.NewLRUCacheFilter(1000, scope.NewSubScope("terminated_filter")) + if err != nil { + return nil, err + } + + return &terminatedTracking{ + w: workflowStore, + terminatedFilter: filter, + }, nil +} diff --git a/pkg/controller/workflowstore/terminated_tracking_test.go b/pkg/controller/workflowstore/terminated_tracking_test.go new file mode 100644 index 000000000..d88c8ed2d --- /dev/null +++ b/pkg/controller/workflowstore/terminated_tracking_test.go @@ -0,0 +1,103 @@ +package workflowstore + +import ( + "context" + "testing" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned/fake" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/stretchr/testify/assert" +) + +const ( + terminatedTrackingNamespace = "test-ns" +) + +func TestTerminatedTrackingStore_Update(t *testing.T) { + ctx := context.TODO() + + mockClient := fake.NewSimpleClientset().FlyteworkflowV1alpha1() + + scope := promutils.NewTestScope() + l := &mockWFNamespaceLister{} + passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}) + wfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore) + assert.NoError(t, err) + + t.Run("Succeeding", func(t *testing.T) { + name := "succeeding" + + wf := dummyWf(terminatedTrackingNamespace, name) + wf.Status.Phase = v1alpha1.WorkflowPhaseSucceeding + + _, err := mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{}) + assert.NoError(t, err) + + _, err = wfStore.Update(ctx, wf, PriorityClassCritical) + assert.NoError(t, err) + + l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) { + return wf, nil + } + succeedingWf, err := wfStore.Get(ctx, terminatedTrackingNamespace, name) + assert.NoError(t, err) + assert.Equal(t, succeedingWf, wf) + }) + + t.Run("Terminated", func(t *testing.T) { + name := "terminated" + + wf := dummyWf(terminatedTrackingNamespace, name) + wf.Status.Phase = v1alpha1.WorkflowPhaseAborted + + _, err := mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{}) + assert.NoError(t, err) + + _, err = wfStore.Update(ctx, wf, PriorityClassCritical) + assert.NoError(t, err) + + l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) { + return wf, nil + } + terminatedWf, err := wfStore.Get(ctx, terminatedTrackingNamespace, name) + assert.Nil(t, terminatedWf) + assert.True(t, IsWorkflowTerminated(err)) + }) +} + +func TestTerminatedTrackingStore_UpdateStatus(t *testing.T) { + ctx := context.TODO() + + mockClient := fake.NewSimpleClientset().FlyteworkflowV1alpha1() + + scope := promutils.NewTestScope() + l := &mockWFNamespaceLister{} + passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}) + wfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore) + assert.NoError(t, err) + + name := "name" + + wf := dummyWf(terminatedTrackingNamespace, name) + wf.Status.Phase = v1alpha1.WorkflowPhaseSucceeding + + _, err = mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{}) + assert.NoError(t, err) + + _, err = wfStore.Update(ctx, wf, PriorityClassCritical) + assert.NoError(t, err) + + wf.Status.Phase = v1alpha1.WorkflowPhaseAborted + _, err = wfStore.UpdateStatus(ctx, wf, PriorityClassCritical) + assert.NoError(t, err) + + l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) { + return wf, nil + } + terminatedWf, err := wfStore.Get(ctx, terminatedTrackingNamespace, name) + assert.Nil(t, terminatedWf) + assert.True(t, IsWorkflowTerminated(err)) +}