Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Mark webapi task failure as retry limit exceeded (#392)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw authored Sep 14, 2023
1 parent ff9f55b commit 9bf0fb9
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 10 deletions.
23 changes: 15 additions & 8 deletions go/tasks/pluginmachinery/internal/webapi/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,38 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) (
logger.Debugf(ctx, "Sync loop - processing resource with cache key [%s]",
resource.GetID())

if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures {
logger.Infof(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.",
cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures)
cacheItem.State.Phase = PhaseSystemFailure
}

if cacheItem.State.Phase.IsTerminal() {
logger.Debugf(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]",
resource.GetID())

resp = append(resp, cache.ItemSyncResponse{
ID: resource.GetID(),
Item: resource.GetItem(),
Item: cacheItem,
Action: cache.Unchanged,
})

continue
}

if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures {
logger.Debugf(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.",
cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures)
cacheItem.State.Phase = PhaseSystemFailure
resp = append(resp, cache.ItemSyncResponse{
ID: resource.GetID(),
Item: cacheItem,
Action: cache.Update,
})

continue
}

// Get an updated status
logger.Debugf(ctx, "Querying AsyncPlugin for %s", resource.GetID())
newResource, err := q.client.Get(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", nil))
if err != nil {
logger.Infof(ctx, "Error retrieving resource [%s]. Error: %v", resource.GetID(), err)
cacheItem.SyncFailureCount++
cacheItem.ErrorMessage = err.Error()

// Make sure we don't return nil for the first argument, because that deletes it from the cache.
resp = append(resp, cache.ItemSyncResponse{
Expand Down
30 changes: 30 additions & 0 deletions go/tasks/pluginmachinery/internal/webapi/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,36 @@ func TestResourceCache_SyncResource(t *testing.T) {
assert.Equal(t, cacheItem, newCacheItem[0].Item)
})

t.Run("Retry limit exceeded", func(t *testing.T) {
mockCache := &cacheMocks.AutoRefresh{}
mockClient := &mocks.Client{}

q := ResourceCache{
AutoRefresh: mockCache,
client: mockClient,
cfg: webapi.CachingConfig{
MaxSystemFailures: 2,
},
}

cacheItem := CacheItem{
State: State{
SyncFailureCount: 5,
ErrorMessage: "some error",
},
}

iw := &cacheMocks.ItemWrapper{}
iw.OnGetItem().Return(cacheItem)
iw.OnGetID().Return("some-id")

newCacheItem, err := q.SyncResource(ctx, []cache.ItemWrapper{iw})
assert.NoError(t, err)
assert.Equal(t, cache.Update, newCacheItem[0].Action)
cacheItem.State.Phase = PhaseSystemFailure
assert.Equal(t, cacheItem, newCacheItem[0].Item)
})

t.Run("move to success", func(t *testing.T) {
mockCache := &cacheMocks.AutoRefresh{}
mockClient := &mocks.Client{}
Expand Down
12 changes: 10 additions & 2 deletions go/tasks/pluginmachinery/internal/webapi/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach
errors.CacheFailed, "Failed to cast [%v]", cacheItem)
}

// If the cache has not syncd yet, just return
// If the cache has not synced yet, just return
if cacheItem.Resource == nil {
if cacheItem.Phase.IsTerminal() {
err = cache.DeleteDelayed(cacheItemID)
if err != nil {
logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v",
cacheItemID, err)
}
return state, core.PhaseInfoFailure(errors.CacheFailed, cacheItem.ErrorMessage, nil), nil
}
return state, core.PhaseInfoRunning(0, nil), nil
}

Expand All @@ -54,7 +62,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach
// Queue item for deletion in the cache.
err = cache.DeleteDelayed(cacheItemID)
if err != nil {
logger.Warnf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v",
logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v",
cacheItemID, err)
}
}
Expand Down
3 changes: 3 additions & 0 deletions go/tasks/pluginmachinery/internal/webapi/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ type State struct {

// The time the execution first requests for an allocation token
AllocationTokenRequestStartTime time.Time `json:"allocationTokenRequestStartTime,omitempty"`

// ErrorMessage generated during cache synchronization.
ErrorMessage string `json:"error_message,omitempty"`
}

0 comments on commit 9bf0fb9

Please sign in to comment.