Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: apply custom cursor pagination where workflows and archived workflows are merged #11761

101 changes: 75 additions & 26 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,7 @@ import (
"fmt"
"io"
"sort"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"strconv"

"github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
Expand All @@ -35,6 +28,13 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/util"
"github.com/argoproj/argo-workflows/v3/workflow/validate"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

type workflowServer struct {
Expand Down Expand Up @@ -129,7 +129,50 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
return wf, nil
}

func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alpha1.WorkflowList, numWfsToKeep int) *v1alpha1.WorkflowList {
func cursorPaginationByResourceVersion(items []v1alpha1.Workflow, resourceVersion string, limit int64, wfList *v1alpha1.WorkflowList) {
// Use Kubernetes resourceVersion for cursor pagination.
// Sort the Kubernetes results in descending order by resourceVersion.
// To implement cursor pagination with filtering based on resourceVersion, start by sorting in descending order according to the resourceVersion.
sunyeongchoi marked this conversation as resolved.
Show resolved Hide resolved
sort.Slice(items, func(i, j int) bool {
itemIRV, _ := strconv.Atoi(items[i].ResourceVersion)
itemJRV, _ := strconv.Atoi(items[j].ResourceVersion)
return itemIRV > itemJRV
})

// Due to the descending sorting above, the items are filtered to have a resourceVersion smaller than the received value.
terrytangyuan marked this conversation as resolved.
Show resolved Hide resolved
// The data with values smaller than the received resourceVersion on the current page will be used for the next page.
if resourceVersion != "" {
var newItems []v1alpha1.Workflow
for _, item := range items {
targetRV, _ := strconv.Atoi(item.ResourceVersion)
receivedRV, _ := strconv.Atoi(resourceVersion)
if targetRV < receivedRV {
newItems = append(newItems, item)
}
items = newItems
}
}

// Indexing list by limit count
if limit != 0 {
endIndex := int(limit)
if endIndex > len(items) || limit == 0 {
endIndex = len(items)
}
wfList.Items = items[0:endIndex]
} else {
wfList.Items = items
}

// Calculate new offset for next batch
terrytangyuan marked this conversation as resolved.
Show resolved Hide resolved
// For the next pagination, the resourceVersion of the last item is set in the Continue field.
if limit != 0 && len(wfList.Items) == int(limit) {
lastIndex := len(wfList.Items) - 1
wfList.ListMeta.Continue = wfList.Items[lastIndex].ResourceVersion
}
}

func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alpha1.WorkflowList) *v1alpha1.WorkflowList {
var mergedWfs []v1alpha1.Workflow
var uidToWfs = map[types.UID][]v1alpha1.Workflow{}
for _, item := range liveWfs.Items {
Expand All @@ -152,43 +195,49 @@ func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alp
}
}
}
mergedWfsList := v1alpha1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
sort.Sort(mergedWfsList.Items)
numWfs := 0
var finalWfs []v1alpha1.Workflow
for _, item := range mergedWfsList.Items {
if numWfsToKeep == 0 || numWfs < numWfsToKeep {
finalWfs = append(finalWfs, item)
numWfs += 1
}
}
return &v1alpha1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta}
return &v1alpha1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
terrytangyuan marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) {
wfClient := auth.GetWfClient(ctx)

listOption := &metav1.ListOptions{}
options := &metav1.ListOptions{}
if req.ListOptions != nil {
listOption = req.ListOptions
options = req.ListOptions
}
s.instanceIDService.With(listOption)
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *listOption)

// Save the original Continue and Limit.
resourceVersion := options.Continue
limit := options.Limit

// Search whole with Limit 0.
// Reset the Continue "" to prevent Kubernetes native pagination.
Comment on lines +217 to +218
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this approach require fetching the entire list every time user clicks another page?

Copy link
Member Author

@sunyeongchoi sunyeongchoi Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the entire list is fetched every time the page changed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have written and uploaded test code for cursorPaginationByResourceVersion. 859c0b2

I am concerned about the potential performance impact of my implementation because it retrieves the entire dataset with each page change. However, I can't think of a better way to solve this problem on the server-side other than this method.

In my opinion, unless there's a better approach, the server should send the entire list to the front-end once, and then pagination can be handled on the front-end. However, one drawback of this approach is that the data won't be updated on the front-end without refreshing the page. To achieve real-time data updates, we might need to consider using websockets or polling.

Do you have any suggestions or better ideas on how to handle this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a better idea. I think correctness is important right now. Performance-wise, we should do something from the front-end like what you said to only refetch the whole list when refreshing. This can be something that's configurable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, after this PR, I will think about the front page pagination and give it a try.

Also, since I am curious about the performance of the currently implemented pagination, I will think about ways to test its performance.

I will share the test results later when they come out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

options.Continue = ""
options.Limit = 0

s.instanceIDService.With(options)
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *options)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

// Search whole with Limit 0.
// Reset the Continue "0" to prevent archive workflow pagination.
options.Continue = "0"
options.Limit = 0
archivedWfList, err := s.wfArchiveServer.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{
ListOptions: listOption,
ListOptions: options,
NamePrefix: "",
Namespace: req.Namespace,
})
if err != nil {
log.Warnf("unable to list archived workflows:%v", err)
} else {
if archivedWfList != nil {
wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList, int(listOption.Limit))
wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList)
}
}
cursorPaginationByResourceVersion(wfList.Items, resourceVersion, limit, wfList)

cleaner := fields.NewCleaner(req.Fields)
if s.offloadNodeStatusRepo.IsEnabled() && !cleaner.WillExclude("items.status.nodes") {
Expand Down
41 changes: 36 additions & 5 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,11 +662,42 @@ func TestMergeWithArchivedWorkflows(t *testing.T) {
wf3 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "3", CreationTimestamp: metav1.Time{Time: timeNow.Add(3 * time.Second)}}}
liveWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2}}
archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Archived, wf3, wf2}}
expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Live}}
expectedShortWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2}}
assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 0).Items)
assert.Equal(t, expectedShortWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 2).Items)
archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Archived}}
expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2, wf3}}
assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList).Items)
}

func TestCursorPaginationByResourceVersion(t *testing.T) {
wf1 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1", Name: "wf1"}}
wf2 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2", Name: "wf2"}}
wf3 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3", Name: "wf3"}}
wf4 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4", Name: "wf4"}}
wf5 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5", Name: "wf5"}}
wf6 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6", Name: "wf6"}}
wf7 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "7", Name: "wf7"}}
wf8 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "8", Name: "wf8"}}
wf9 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "9", Name: "wf9"}}
wf10 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "10", Name: "wf10"}}

items := []v1alpha1.Workflow{wf2, wf1, wf4, wf3, wf6, wf5, wf8, wf7, wf10, wf9}
wfList := &v1alpha1.WorkflowList{}

cursorPaginationByResourceVersion(items, "8", 5, wfList)
expectedWfList := &v1alpha1.WorkflowList{}
expectedWfList.Items = []v1alpha1.Workflow{wf7, wf6, wf5, wf4, wf3}
expectedWfList.ListMeta.Continue = "3"

assert.Equal(t, expectedWfList, wfList)
}

func TestWatchWorkflows(t *testing.T) {
Expand Down
Loading