From 0a096e66ba185d02caa5172fa677087dd4aba065 Mon Sep 17 00:00:00 2001 From: Jiacheng Xu Date: Sat, 11 May 2024 06:31:14 -0700 Subject: [PATCH] feat: add sqlite-based memory store for live workflows. Fixes #12025 (#13021) --- go.mod | 11 +- go.sum | 20 +- persist/sqldb/archived_workflow_labels.go | 26 +- .../sqldb/archived_workflow_labels_test.go | 2 +- persist/sqldb/db_type.go | 1 + persist/sqldb/mocks/WorkflowArchive.go | 39 +-- persist/sqldb/null_workflow_archive.go | 7 +- persist/sqldb/selector.go | 89 +++++ persist/sqldb/workflow_archive.go | 62 ++-- pkg/apiclient/argo-kube-client.go | 10 +- server/apiserver/argoserver.go | 15 +- server/utils/list_options.go | 133 ++++++++ server/workflow/store/lister.go | 41 +++ server/workflow/store/sqlite_store.go | 318 ++++++++++++++++++ server/workflow/store/sqlite_store_test.go | 155 +++++++++ server/workflow/workflow_server.go | 173 ++++++---- server/workflow/workflow_server_test.go | 66 ++-- .../archived_workflow_server.go | 89 +---- .../archived_workflow_server_test.go | 20 +- test/e2e/fixtures/e2e_suite.go | 6 +- .../estimation/estimator_factory.go | 10 +- .../estimation/estimator_factory_test.go | 8 +- 22 files changed, 1025 insertions(+), 276 deletions(-) create mode 100644 persist/sqldb/selector.go create mode 100644 server/utils/list_options.go create mode 100644 server/workflow/store/lister.go create mode 100644 server/workflow/store/sqlite_store.go create mode 100644 server/workflow/store/sqlite_store_test.go diff --git a/go.mod b/go.mod index 3954f4c9f8d3..a78539644814 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( k8s.io/kubectl v0.26.15 k8s.io/utils v0.0.0-20221107191617-1a15be271d1d sigs.k8s.io/yaml v1.4.0 + zombiezen.com/go/sqlite v1.2.0 ) require ( @@ -105,7 +106,9 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/segmentio/fasthash v1.0.3 // indirect @@ -118,10 +121,14 @@ require ( go.opentelemetry.io/otel/metric v1.22.0 // indirect go.opentelemetry.io/otel/trace v1.22.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/mod v0.12.0 // indirect - golang.org/x/tools v0.13.0 // indirect + golang.org/x/mod v0.14.0 // indirect + golang.org/x/tools v0.17.0 // indirect google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect + modernc.org/libc v1.41.0 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/sqlite v1.29.1 // indirect ) require ( diff --git a/go.sum b/go.sum index c584480eb7c7..54896be505d1 100644 --- a/go.sum +++ b/go.sum @@ -648,6 +648,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d/go.mod h1:IuKpRQcYE1Tfu+oAQqaLisqDeXgjyyltCfsaoYN18NQ= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -701,6 +703,7 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= @@ -912,8 +915,8 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= -golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1042,8 +1045,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= -golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= +golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= +golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1197,16 +1200,23 @@ modernc.org/internal v1.1.0/go.mod h1:IFhfxUE81NbN8Riy+oHylA3PIYgAvIQ5eMufNzg7/Q modernc.org/lex v1.1.1/go.mod h1:6r8o8DLJkAnOsQaGi8fMoi+Vt6LTbDaCrkUK729D8xM= modernc.org/lexer v1.0.4/go.mod h1:tOajb8S4sdfOYitzCgXDFmbVJ/LE0v1fNJ7annTw36U= modernc.org/lexer v1.0.5/go.mod h1:8npHn3u/NxCEtlC/tRSY77x5+WB3HvHMzMVElQ76ayI= +modernc.org/libc v1.41.0 h1:g9YAc6BkKlgORsUWj+JwqoB1wU3o4DE3bM3yvA3k+Gk= +modernc.org/libc v1.41.0/go.mod h1:w0eszPsiXoOnoMJgrXjglgLuDy/bt5RR4y3QzUUeodY= modernc.org/lldb v1.0.4/go.mod h1:AKDI6wUJk7iJS8nRX54St8rq9wUIi3o5YGN3rlejR5o= modernc.org/lldb v1.0.8/go.mod h1:ybOcsZ/RNZo3q8fiGadQFRnD+1Jc+RWGcTPdeilCnUk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= +modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= modernc.org/ql v1.4.7/go.mod h1:I900l6z8ckpPy1y9VR0gu4pZ9hl9AhmQla4F8KERzdc= modernc.org/sortutil v1.1.0/go.mod h1:ZyL98OQHJgH9IEfN71VsamvJgrtRX9Dj2gX+vH86L1k= modernc.org/sortutil v1.1.1/go.mod h1:DTj/8BqjEBLZFVPYvEGDfFFg94SsfPxQ70R+SQJ98qA= modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= +modernc.org/sqlite v1.29.1 h1:19GY2qvWB4VPw0HppFlZCPAbmxFU41r+qjKZQdQ1ryA= +modernc.org/sqlite v1.29.1/go.mod h1:hG41jCYxOAOoO6BRK66AdRlmOcDzXf7qnwlwjUIOqa0= modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= modernc.org/zappy v1.0.5/go.mod h1:Q5T4ra3/JJNORGK16oe8rRAti7kWtRW4Z93fzin2gBc= @@ -1229,3 +1239,5 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= +zombiezen.com/go/sqlite v1.2.0 h1:jja0Ubpzpl6bjr/bSaPyvafHO+extoDJJXIaqXT7VOU= +zombiezen.com/go/sqlite v1.2.0/go.mod h1:yRl27//s/9aXU3RWs8uFQwjkTG9gYNGEls6+6SvrclY= diff --git a/persist/sqldb/archived_workflow_labels.go b/persist/sqldb/archived_workflow_labels.go index 04ce353ce209..add2bbad4bf3 100644 --- a/persist/sqldb/archived_workflow_labels.go +++ b/persist/sqldb/archived_workflow_labels.go @@ -52,9 +52,9 @@ func (r *workflowArchive) ListWorkflowsLabelValues(key string) (*wfv1.LabelValue return &wfv1.LabelValues{Items: labels}, nil } -func labelsClause(selector db.Selector, t dbType, requirements labels.Requirements) (db.Selector, error) { +func labelsClause(selector db.Selector, t dbType, requirements labels.Requirements, tableName, labelTableName string, hasClusterName bool) (db.Selector, error) { for _, req := range requirements { - cond, err := requirementToCondition(t, req) + cond, err := requirementToCondition(t, req, tableName, labelTableName, hasClusterName) if err != nil { return nil, err } @@ -63,36 +63,40 @@ func labelsClause(selector db.Selector, t dbType, requirements labels.Requiremen return selector, nil } -func requirementToCondition(t dbType, r labels.Requirement) (*db.RawExpr, error) { +func requirementToCondition(t dbType, r labels.Requirement, tableName, labelTableName string, hasClusterName bool) (*db.RawExpr, error) { + clusterNameSelector := "" + if hasClusterName { + clusterNameSelector = fmt.Sprintf("clustername = %s.clustername and", tableName) + } // Should we "sanitize our inputs"? No. // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ // Valid label values must be 63 characters or less and must be empty or begin and end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), and alphanumerics between. // https://kb.objectrocket.com/postgresql/casting-in-postgresql-570#string+to+integer+casting switch r.Operator() { case selection.DoesNotExist: - return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil + return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s')", labelTableName, clusterNameSelector, tableName, r.Key())), nil case selection.Equals, selection.DoubleEquals: - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value = '%s')", labelTableName, clusterNameSelector, tableName, r.Key(), r.Values().List()[0])), nil case selection.In: - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value in ('%s'))", labelTableName, clusterNameSelector, tableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil case selection.NotEquals: - return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil + return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value = '%s')", labelTableName, clusterNameSelector, tableName, r.Key(), r.Values().List()[0])), nil case selection.NotIn: - return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil + return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value in ('%s'))", labelTableName, clusterNameSelector, tableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil case selection.Exists: - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s')", labelTableName, clusterNameSelector, tableName, r.Key())), nil case selection.GreaterThan: i, err := strconv.Atoi(r.Values().List()[0]) if err != nil { return nil, err } - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) > %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and cast(value as %s) > %d)", labelTableName, clusterNameSelector, tableName, r.Key(), t.intType(), i)), nil case selection.LessThan: i, err := strconv.Atoi(r.Values().List()[0]) if err != nil { return nil, err } - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) < %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and cast(value as %s) < %d)", labelTableName, clusterNameSelector, tableName, r.Key(), t.intType(), i)), nil } return nil, fmt.Errorf("operation %v is not supported", r.Operator()) } diff --git a/persist/sqldb/archived_workflow_labels_test.go b/persist/sqldb/archived_workflow_labels_test.go index 144212cfae2e..61f0ca447d1f 100644 --- a/persist/sqldb/archived_workflow_labels_test.go +++ b/persist/sqldb/archived_workflow_labels_test.go @@ -31,7 +31,7 @@ func Test_labelsClause(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { for _, req := range tt.requirements { - got, err := requirementToCondition(tt.dbType, req) + got, err := requirementToCondition(tt.dbType, req, archiveTableName, archiveLabelsTableName, true) if assert.NoError(t, err) { assert.Equal(t, tt.want, *got) } diff --git a/persist/sqldb/db_type.go b/persist/sqldb/db_type.go index edf590ed7bf5..258eedb087f3 100644 --- a/persist/sqldb/db_type.go +++ b/persist/sqldb/db_type.go @@ -12,6 +12,7 @@ type dbType string const ( MySQL dbType = "mysql" Postgres dbType = "postgres" + SQLite dbType = "sqlite" ) func dbTypeFor(session db.Session) dbType { diff --git a/persist/sqldb/mocks/WorkflowArchive.go b/persist/sqldb/mocks/WorkflowArchive.go index bf9aa0c7a32c..19f9eed7b0cf 100644 --- a/persist/sqldb/mocks/WorkflowArchive.go +++ b/persist/sqldb/mocks/WorkflowArchive.go @@ -4,10 +4,11 @@ package mocks import ( mock "github.com/stretchr/testify/mock" - labels "k8s.io/apimachinery/pkg/labels" time "time" + utils "github.com/argoproj/argo-workflows/v3/server/utils" + v1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -34,9 +35,9 @@ func (_m *WorkflowArchive) ArchiveWorkflow(wf *v1alpha1.Workflow) error { return r0 } -// CountWorkflows provides a mock function with given fields: namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements -func (_m *WorkflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartAt time.Time, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) { - ret := _m.Called(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) +// CountWorkflows provides a mock function with given fields: options +func (_m *WorkflowArchive) CountWorkflows(options utils.ListOptions) (int64, error) { + ret := _m.Called(options) if len(ret) == 0 { panic("no return value specified for CountWorkflows") @@ -44,17 +45,17 @@ func (_m *WorkflowArchive) CountWorkflows(namespace string, name string, namePre var r0 int64 var r1 error - if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements) (int64, error)); ok { - return rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + if rf, ok := ret.Get(0).(func(utils.ListOptions) (int64, error)); ok { + return rf(options) } - if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements) int64); ok { - r0 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + if rf, ok := ret.Get(0).(func(utils.ListOptions) int64); ok { + r0 = rf(options) } else { r0 = ret.Get(0).(int64) } - if rf, ok := ret.Get(1).(func(string, string, string, time.Time, time.Time, labels.Requirements) error); ok { - r1 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + if rf, ok := ret.Get(1).(func(utils.ListOptions) error); ok { + r1 = rf(options) } else { r1 = ret.Error(1) } @@ -146,9 +147,9 @@ func (_m *WorkflowArchive) IsEnabled() bool { return r0 } -// ListWorkflows provides a mock function with given fields: namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset -func (_m *WorkflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartAt time.Time, maxStartAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (v1alpha1.Workflows, error) { - ret := _m.Called(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) +// ListWorkflows provides a mock function with given fields: options +func (_m *WorkflowArchive) ListWorkflows(options utils.ListOptions) (v1alpha1.Workflows, error) { + ret := _m.Called(options) if len(ret) == 0 { panic("no return value specified for ListWorkflows") @@ -156,19 +157,19 @@ func (_m *WorkflowArchive) ListWorkflows(namespace string, name string, namePref var r0 v1alpha1.Workflows var r1 error - if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements, int, int) (v1alpha1.Workflows, error)); ok { - return rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) + if rf, ok := ret.Get(0).(func(utils.ListOptions) (v1alpha1.Workflows, error)); ok { + return rf(options) } - if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements, int, int) v1alpha1.Workflows); ok { - r0 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) + if rf, ok := ret.Get(0).(func(utils.ListOptions) v1alpha1.Workflows); ok { + r0 = rf(options) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(v1alpha1.Workflows) } } - if rf, ok := ret.Get(1).(func(string, string, string, time.Time, time.Time, labels.Requirements, int, int) error); ok { - r1 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) + if rf, ok := ret.Get(1).(func(utils.ListOptions) error); ok { + r1 = rf(options) } else { r1 = ret.Error(1) } diff --git a/persist/sqldb/null_workflow_archive.go b/persist/sqldb/null_workflow_archive.go index e8e37b481c9f..e3f4863bcc7c 100644 --- a/persist/sqldb/null_workflow_archive.go +++ b/persist/sqldb/null_workflow_archive.go @@ -4,9 +4,8 @@ import ( "fmt" "time" - "k8s.io/apimachinery/pkg/labels" - wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + sutils "github.com/argoproj/argo-workflows/v3/server/utils" ) var NullWorkflowArchive WorkflowArchive = &nullWorkflowArchive{} @@ -21,11 +20,11 @@ func (r *nullWorkflowArchive) ArchiveWorkflow(*wfv1.Workflow) error { return nil } -func (r *nullWorkflowArchive) ListWorkflows(string, string, string, time.Time, time.Time, labels.Requirements, int, int) (wfv1.Workflows, error) { +func (r *nullWorkflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { return wfv1.Workflows{}, nil } -func (r *nullWorkflowArchive) CountWorkflows(string, string, string, time.Time, time.Time, labels.Requirements) (int64, error) { +func (r *nullWorkflowArchive) CountWorkflows(options sutils.ListOptions) (int64, error) { return 0, nil } diff --git a/persist/sqldb/selector.go b/persist/sqldb/selector.go new file mode 100644 index 000000000000..5e2b9cbb53ca --- /dev/null +++ b/persist/sqldb/selector.go @@ -0,0 +1,89 @@ +package sqldb + +import ( + "github.com/upper/db/v4" + + "github.com/argoproj/argo-workflows/v3/server/utils" +) + +func BuildArchivedWorkflowSelector(selector db.Selector, tableName, labelTableName string, t dbType, options utils.ListOptions, count bool) (db.Selector, error) { + selector = selector. + And(namespaceEqual(options.Namespace)). + And(nameEqual(options.Name)). + And(namePrefixClause(options.NamePrefix)). + And(startedAtFromClause(options.MinStartedAt)). + And(startedAtToClause(options.MaxStartedAt)) + + selector, err := labelsClause(selector, t, options.LabelRequirements, tableName, labelTableName, true) + if err != nil { + return nil, err + } + if count { + return selector, nil + } + // If we were passed 0 as the limit, then we should load all available archived workflows + // to match the behavior of the `List` operations in the Kubernetes API + if options.Limit == 0 { + options.Limit = -1 + options.Offset = -1 + } + return selector. + OrderBy("-startedat"). + Limit(options.Limit). + Offset(options.Offset), nil +} + +func BuildWorkflowSelector(in string, inArgs []any, tableName, labelTableName string, t dbType, options utils.ListOptions, count bool) (out string, outArgs []any, err error) { + var clauses []*db.RawExpr + if options.Namespace != "" { + clauses = append(clauses, db.Raw("namespace = ?", options.Namespace)) + } + if options.Name != "" { + clauses = append(clauses, db.Raw("name = ?", options.Name)) + } + if options.NamePrefix != "" { + clauses = append(clauses, db.Raw("name like ?", options.NamePrefix+"%")) + } + if !options.MinStartedAt.IsZero() { + clauses = append(clauses, db.Raw("startedat >= ?", options.MinStartedAt)) + } + if !options.MaxStartedAt.IsZero() { + clauses = append(clauses, db.Raw("startedat <= ?", options.MaxStartedAt)) + } + for _, r := range options.LabelRequirements { + q, err := requirementToCondition(t, r, tableName, labelTableName, false) + if err != nil { + return "", nil, err + } + clauses = append(clauses, q) + } + out = in + outArgs = inArgs + for _, c := range clauses { + if c == nil || c.Empty() { + continue + } + out += " and " + c.Raw() + outArgs = append(outArgs, c.Arguments()...) + } + if count { + return out, outArgs, nil + } + if options.StartedAtAscending { + out += " order by startedat asc" + } else { + out += " order by startedat desc" + } + + // If we were passed 0 as the limit, then we should load all available archived workflows + // to match the behavior of the `List` operations in the Kubernetes API + if options.Limit == 0 { + options.Limit = -1 + options.Offset = -1 + } + out += " limit ?" + outArgs = append(outArgs, options.Limit) + out += " offset ?" + outArgs = append(outArgs, options.Offset) + return out, outArgs, nil +} diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index fce2ff97b432..55d4800cfe89 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -9,7 +9,6 @@ import ( "github.com/upper/db/v4" "google.golang.org/grpc/codes" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -59,8 +58,8 @@ type archivedWorkflowCount struct { type WorkflowArchive interface { ArchiveWorkflow(wf *wfv1.Workflow) error // list workflows, with the most recently started workflows at the beginning (i.e. index 0 is the most recent) - ListWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error) - CountWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) + ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) + CountWorkflows(options sutils.ListOptions) (int64, error) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) DeleteWorkflow(uid string) error DeleteExpiredWorkflows(ttl time.Duration) error @@ -146,16 +145,9 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { }) } -func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) { +func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { var archivedWfs []archivedWorkflowMetadata - // If we were passed 0 as the limit, then we should load all available archived workflows - // to match the behavior of the `List` operations in the Kubernetes API - if limit == 0 { - limit = -1 - offset = -1 - } - selectQuery, err := selectArchivedWorkflowQuery(r.dbType) if err != nil { return nil, err @@ -164,22 +156,14 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi selector := r.session.SQL(). Select(selectQuery). From(archiveTableName). - Where(r.clusterManagedNamespaceAndInstanceID()). - And(namespaceEqual(namespace)). - And(nameEqual(name)). - And(namePrefixClause(namePrefix)). - And(startedAtFromClause(minStartedAt)). - And(startedAtToClause(maxStartedAt)) + Where(r.clusterManagedNamespaceAndInstanceID()) - selector, err = labelsClause(selector, r.dbType, labelRequirements) + selector, err = BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) if err != nil { return nil, err } - err = selector. - OrderBy("-startedat"). - Limit(limit). - Offset(offset). - All(&archivedWfs) + + err = selector.All(&archivedWfs) if err != nil { return nil, err } @@ -218,20 +202,15 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi return wfs, nil } -func (r *workflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements) (int64, error) { +func (r *workflowArchive) CountWorkflows(options sutils.ListOptions) (int64, error) { total := &archivedWorkflowCount{} selector := r.session.SQL(). Select(db.Raw("count(*) as total")). From(archiveTableName). - Where(r.clusterManagedNamespaceAndInstanceID()). - And(namespaceEqual(namespace)). - And(nameEqual(name)). - And(namePrefixClause(namePrefix)). - And(startedAtFromClause(minStartedAt)). - And(startedAtToClause(maxStartedAt)) + Where(r.clusterManagedNamespaceAndInstanceID()) - selector, err := labelsClause(selector, r.dbType, labelRequirements) + selector, err := BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, true) if err != nil { return 0, err } @@ -253,40 +232,37 @@ func (r *workflowArchive) clusterManagedNamespaceAndInstanceID() *db.AndExpr { func startedAtFromClause(from time.Time) db.Cond { if !from.IsZero() { - return db.Cond{"startedat > ": from} + return db.Cond{"startedat >=": from} } return db.Cond{} } func startedAtToClause(to time.Time) db.Cond { if !to.IsZero() { - return db.Cond{"startedat < ": to} + return db.Cond{"startedat <=": to} } return db.Cond{} } func namespaceEqual(namespace string) db.Cond { - if namespace == "" { - return db.Cond{} - } else { + if namespace != "" { return db.Cond{"namespace": namespace} } + return db.Cond{} } func nameEqual(name string) db.Cond { - if name == "" { - return db.Cond{} - } else { + if name != "" { return db.Cond{"name": name} } + return db.Cond{} } func namePrefixClause(namePrefix string) db.Cond { - if namePrefix == "" { - return db.Cond{} - } else { - return db.Cond{"name LIKE ": namePrefix + "%"} + if namePrefix != "" { + return db.Cond{"name LIKE": namePrefix + "%"} } + return db.Cond{} } func (r *workflowArchive) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) { diff --git a/pkg/apiclient/argo-kube-client.go b/pkg/apiclient/argo-kube-client.go index b56deb251852..3ae83fe809db 100644 --- a/pkg/apiclient/argo-kube-client.go +++ b/pkg/apiclient/argo-kube-client.go @@ -11,6 +11,8 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "github.com/argoproj/argo-workflows/v3/server/workflow/store" + "github.com/argoproj/argo-workflows/v3" "github.com/argoproj/argo-workflows/v3/persist/sqldb" "github.com/argoproj/argo-workflows/v3/pkg/apiclient/clusterworkflowtemplate" @@ -25,7 +27,6 @@ import ( cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow" "github.com/argoproj/argo-workflows/v3/server/types" workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow" - "github.com/argoproj/argo-workflows/v3/server/workflowarchive" workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate" "github.com/argoproj/argo-workflows/v3/util/help" "github.com/argoproj/argo-workflows/v3/util/instanceid" @@ -38,6 +39,7 @@ var ( type argoKubeClient struct { instanceIDService instanceid.Service + wfClient workflow.Interface } var _ Client = &argoKubeClient{} @@ -84,13 +86,13 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, if err != nil { return nil, nil, err } - return ctx, &argoKubeClient{instanceIDService}, nil + return ctx, &argoKubeClient{instanceIDService, wfClient}, nil } func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient { wfArchive := sqldb.NullWorkflowArchive - wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, argoKubeOffloadNodeStatusRepo) - return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfaServer)}} + wfLister := store.NewKubeLister(a.wfClient) + return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil)}} } func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) { diff --git a/server/apiserver/argoserver.go b/server/apiserver/argoserver.go index 815698ea6eb9..27188374d229 100644 --- a/server/apiserver/argoserver.go +++ b/server/apiserver/argoserver.go @@ -55,6 +55,7 @@ import ( "github.com/argoproj/argo-workflows/v3/server/static" "github.com/argoproj/argo-workflows/v3/server/types" "github.com/argoproj/argo-workflows/v3/server/workflow" + "github.com/argoproj/argo-workflows/v3/server/workflow/store" "github.com/argoproj/argo-workflows/v3/server/workflowarchive" "github.com/argoproj/argo-workflows/v3/server/workflowtemplate" grpcutil "github.com/argoproj/argo-workflows/v3/util/grpc" @@ -230,7 +231,13 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st artifactRepositories := artifactrepositories.New(as.clients.Kubernetes, as.managedNamespace, &config.ArtifactRepository) artifactServer := artifacts.NewArtifactServer(as.gatekeeper, hydrator.New(offloadRepo), wfArchive, instanceIDService, artifactRepositories) eventServer := event.NewController(instanceIDService, eventRecorderManager, as.eventQueueSize, as.eventWorkerCount, as.eventAsyncDispatch) - grpcServer := as.newGRPCServer(instanceIDService, offloadRepo, wfArchive, eventServer, config.Links, config.Columns, config.NavColor) + wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadRepo) + wfStore, err := store.NewSQLiteStore(instanceIDService) + if err != nil { + log.Fatal(err) + } + workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore) + grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor) httpServer := as.newHTTPServer(ctx, port, artifactServer) // Start listener @@ -260,6 +267,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st grpcL := tcpm.Match(cmux.Any()) go eventServer.Run(as.stopCh) + go workflowServer.Run(as.stopCh) go func() { as.checkServeErr("grpcServer", grpcServer.Serve(grpcL)) }() go func() { as.checkServeErr("httpServer", httpServer.Serve(httpL)) }() go func() { as.checkServeErr("tcpm", tcpm.Serve()) }() @@ -276,7 +284,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st <-as.stopCh } -func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server { +func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workflowServer workflowpkg.WorkflowServiceServer, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server { serverLog := log.NewEntry(log.StandardLogger()) // "Prometheus histograms are a great way to measure latency distributions of your RPCs. However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default. To enable them please call the following in your server initialization code:" @@ -308,12 +316,11 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloa } grpcServer := grpc.NewServer(sOpts...) - wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadNodeStatusRepo) infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace, links, columns, navColor)) eventpkg.RegisterEventServiceServer(grpcServer, eventServer) eventsourcepkg.RegisterEventSourceServiceServer(grpcServer, eventsource.NewEventSourceServer()) sensorpkg.RegisterSensorServiceServer(grpcServer, sensor.NewSensorServer()) - workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflow.NewWorkflowServer(instanceIDService, offloadNodeStatusRepo, wfArchiveServer)) + workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflowServer) workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer(instanceIDService)) cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer(instanceIDService)) workflowarchivepkg.RegisterArchivedWorkflowServiceServer(grpcServer, wfArchiveServer) diff --git a/server/utils/list_options.go b/server/utils/list_options.go new file mode 100644 index 000000000000..69a03456cbd5 --- /dev/null +++ b/server/utils/list_options.go @@ -0,0 +1,133 @@ +package utils + +import ( + "fmt" + "strconv" + "strings" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +type ListOptions struct { + Namespace, Name, NamePrefix string + MinStartedAt, MaxStartedAt time.Time + LabelRequirements labels.Requirements + Limit, Offset int + ShowRemainingItemCount bool + StartedAtAscending bool +} + +func (l ListOptions) WithLimit(limit int) ListOptions { + l.Limit = limit + return l +} + +func (l ListOptions) WithOffset(offset int) ListOptions { + l.Offset = offset + return l +} + +func (l ListOptions) WithShowRemainingItemCount(showRemainingItemCount bool) ListOptions { + l.ShowRemainingItemCount = showRemainingItemCount + return l +} + +func (l ListOptions) WithMaxStartedAt(maxStartedAt time.Time) ListOptions { + l.MaxStartedAt = maxStartedAt + return l +} + +func (l ListOptions) WithMinStartedAt(minStartedAt time.Time) ListOptions { + l.MinStartedAt = minStartedAt + return l +} + +func (l ListOptions) WithStartedAtAscending(ascending bool) ListOptions { + l.StartedAtAscending = ascending + return l +} + +func BuildListOptions(options metav1.ListOptions, ns, namePrefix string) (ListOptions, error) { + if options.Continue == "" { + options.Continue = "0" + } + limit := int(options.Limit) + + offset, err := strconv.Atoi(options.Continue) + if err != nil { + // no need to use sutils here + return ListOptions{}, status.Error(codes.InvalidArgument, "listOptions.continue must be int") + } + if offset < 0 { + // no need to use sutils here + return ListOptions{}, status.Error(codes.InvalidArgument, "listOptions.continue must >= 0") + } + + // namespace is now specified as its own query parameter + // note that for backward compatibility, the field selector 'metadata.namespace' is also supported for now + namespace := ns // optional + name := "" + minStartedAt := time.Time{} + maxStartedAt := time.Time{} + showRemainingItemCount := false + for _, selector := range strings.Split(options.FieldSelector, ",") { + if len(selector) == 0 { + continue + } + if strings.HasPrefix(selector, "metadata.namespace=") { + // for backward compatibility, the field selector 'metadata.namespace' is supported for now despite the addition + // of the new 'namespace' query parameter, which is what the UI uses + fieldSelectedNamespace := strings.TrimPrefix(selector, "metadata.namespace=") + switch namespace { + case "": + namespace = fieldSelectedNamespace + case fieldSelectedNamespace: + break + default: + return ListOptions{}, status.Errorf(codes.InvalidArgument, + "'namespace' query param (%q) and fieldselector 'metadata.namespace' (%q) are both specified and contradict each other", namespace, fieldSelectedNamespace) + } + } else if strings.HasPrefix(selector, "metadata.name=") { + name = strings.TrimPrefix(selector, "metadata.name=") + } else if strings.HasPrefix(selector, "spec.startedAt>") { + minStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt>")) + if err != nil { + // startedAt is populated by us, it should therefore be valid. + return ListOptions{}, ToStatusError(err, codes.Internal) + } + } else if strings.HasPrefix(selector, "spec.startedAt<") { + maxStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt<")) + if err != nil { + // no need to use sutils here + return ListOptions{}, ToStatusError(err, codes.Internal) + } + } else if strings.HasPrefix(selector, "ext.showRemainingItemCount") { + showRemainingItemCount, err = strconv.ParseBool(strings.TrimPrefix(selector, "ext.showRemainingItemCount=")) + if err != nil { + // populated by us, it should therefore be valid. + return ListOptions{}, ToStatusError(err, codes.Internal) + } + } else { + return ListOptions{}, ToStatusError(fmt.Errorf("unsupported requirement %s", selector), codes.InvalidArgument) + } + } + requirements, err := labels.ParseToRequirements(options.LabelSelector) + if err != nil { + return ListOptions{}, ToStatusError(err, codes.InvalidArgument) + } + return ListOptions{ + Namespace: namespace, + Name: name, + NamePrefix: namePrefix, + MinStartedAt: minStartedAt, + MaxStartedAt: maxStartedAt, + LabelRequirements: requirements, + Limit: limit, + Offset: offset, + ShowRemainingItemCount: showRemainingItemCount, + }, nil +} diff --git a/server/workflow/store/lister.go b/server/workflow/store/lister.go new file mode 100644 index 000000000000..3a2dc0870a7a --- /dev/null +++ b/server/workflow/store/lister.go @@ -0,0 +1,41 @@ +package store + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" +) + +type WorkflowLister interface { + ListWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (*wfv1.WorkflowList, error) + CountWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (int64, error) +} + +type kubeLister struct { + wfClient versioned.Interface +} + +var _ WorkflowLister = &kubeLister{} + +func NewKubeLister(wfClient versioned.Interface) WorkflowLister { + return &kubeLister{wfClient: wfClient} +} + +func (k *kubeLister) ListWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (*wfv1.WorkflowList, error) { + wfList, err := k.wfClient.ArgoprojV1alpha1().Workflows(namespace).List(ctx, listOptions) + if err != nil { + return nil, err + } + return wfList, nil +} + +func (k *kubeLister) CountWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (int64, error) { + wfList, err := k.wfClient.ArgoprojV1alpha1().Workflows(namespace).List(ctx, listOptions) + if err != nil { + return 0, err + } + return int64(len(wfList.Items)), nil +} diff --git a/server/workflow/store/sqlite_store.go b/server/workflow/store/sqlite_store.go new file mode 100644 index 000000000000..c3518a8f5e16 --- /dev/null +++ b/server/workflow/store/sqlite_store.go @@ -0,0 +1,318 @@ +package store + +import ( + "context" + "encoding/json" + "fmt" + + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "zombiezen.com/go/sqlite" + "zombiezen.com/go/sqlite/sqlitex" + + sutils "github.com/argoproj/argo-workflows/v3/server/utils" + + "github.com/argoproj/argo-workflows/v3/persist/sqldb" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/instanceid" + "github.com/argoproj/argo-workflows/v3/workflow/common" +) + +const ( + workflowTableName = "argo_workflows" + workflowLabelsTableName = "argo_workflows_labels" + tableInitializationQuery = `create table if not exists argo_workflows ( + uid varchar(128) not null, + instanceid varchar(64), + name varchar(256), + namespace varchar(256), + phase varchar(25), + startedat timestamp, + finishedat timestamp, + workflow text, + primary key (uid) +); +create index if not exists idx_instanceid on argo_workflows (instanceid); +create table if not exists argo_workflows_labels ( + uid varchar(128) not null, + name varchar(317) not null, + value varchar(63) not null, + primary key (uid, name, value), + foreign key (uid) references argo_workflows (uid) on delete cascade +); +create index if not exists idx_name_value on argo_workflows_labels (name, value); +` + insertWorkflowQuery = `insert into argo_workflows (uid, instanceid, name, namespace, phase, startedat, finishedat, workflow) values (?, ?, ?, ?, ?, ?, ?, ?)` + insertWorkflowLabelQuery = `insert into argo_workflows_labels (uid, name, value) values (?, ?, ?)` + deleteWorkflowQuery = `delete from argo_workflows where uid = ?` +) + +func initDB() (*sqlite.Conn, error) { + conn, err := sqlite.OpenConn(":memory:", sqlite.OpenReadWrite) + if err != nil { + return nil, err + } + err = sqlitex.ExecuteTransient(conn, "pragma foreign_keys = on", nil) + if err != nil { + return nil, fmt.Errorf("failed to enable foreign key support: %w", err) + } + + err = sqlitex.ExecuteScript(conn, tableInitializationQuery, nil) + if err != nil { + return nil, err + } + return conn, nil +} + +type WorkflowStore interface { + cache.Store +} + +// SQLiteStore is a sqlite-based store. +type SQLiteStore struct { + conn *sqlite.Conn + instanceService instanceid.Service +} + +var _ WorkflowStore = &SQLiteStore{} +var _ WorkflowLister = &SQLiteStore{} + +func NewSQLiteStore(instanceService instanceid.Service) (*SQLiteStore, error) { + conn, err := initDB() + if err != nil { + return nil, err + } + return &SQLiteStore{conn: conn, instanceService: instanceService}, nil +} + +func (s *SQLiteStore) ListWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (*wfv1.WorkflowList, error) { + options, err := sutils.BuildListOptions(listOptions, namespace, namePrefix) + if err != nil { + return nil, err + } + query := `select workflow from argo_workflows +where instanceid = ? +` + args := []any{s.instanceService.InstanceID()} + + query, args, err = sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, false) + if err != nil { + return nil, err + } + + var workflows = wfv1.Workflows{} + err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{ + Args: args, + ResultFunc: func(stmt *sqlite.Stmt) error { + wf := stmt.ColumnText(0) + w := wfv1.Workflow{} + err := json.Unmarshal([]byte(wf), &w) + if err != nil { + log.WithFields(log.Fields{"workflow": wf}).Errorln("unable to unmarshal workflow from database") + } else { + workflows = append(workflows, w) + } + return nil + }, + }) + if err != nil { + return nil, err + } + + return &wfv1.WorkflowList{ + Items: workflows, + }, nil +} + +func (s *SQLiteStore) CountWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (int64, error) { + options, err := sutils.BuildListOptions(listOptions, namespace, namePrefix) + if err != nil { + return 0, err + } + query := `select count(*) as total from argo_workflows +where instanceid = ? +` + args := []any{s.instanceService.InstanceID()} + + options.Limit = 0 + options.Offset = 0 + query, args, err = sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, true) + if err != nil { + return 0, err + } + + var total int64 + err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{ + Args: args, + ResultFunc: func(stmt *sqlite.Stmt) error { + total = stmt.ColumnInt64(0) + return nil + }, + }) + if err != nil { + return 0, err + } + return total, nil +} + +func (s *SQLiteStore) Add(obj interface{}) error { + wf, ok := obj.(*wfv1.Workflow) + if !ok { + return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) + } + done := sqlitex.Transaction(s.conn) + err := s.upsertWorkflow(wf) + defer done(&err) + return err +} + +func (s *SQLiteStore) Update(obj interface{}) error { + wf, ok := obj.(*wfv1.Workflow) + if !ok { + return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) + } + done := sqlitex.Transaction(s.conn) + err := s.upsertWorkflow(wf) + defer done(&err) + return err +} + +func (s *SQLiteStore) Delete(obj interface{}) error { + wf, ok := obj.(*wfv1.Workflow) + if !ok { + return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) + } + return sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) +} + +func (s *SQLiteStore) Replace(list []interface{}, resourceVersion string) error { + wfs := make([]*wfv1.Workflow, 0, len(list)) + for _, obj := range list { + wf, ok := obj.(*wfv1.Workflow) + if !ok { + return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) + } + wfs = append(wfs, wf) + } + done := sqlitex.Transaction(s.conn) + err := s.replaceWorkflows(wfs) + defer done(&err) + return err +} + +func (s *SQLiteStore) Resync() error { + return nil +} + +func (s *SQLiteStore) List() []interface{} { + panic("not implemented") +} + +func (s *SQLiteStore) ListKeys() []string { + panic("not implemented") +} + +func (s *SQLiteStore) Get(obj interface{}) (item interface{}, exists bool, err error) { + panic("not implemented") +} + +func (s *SQLiteStore) GetByKey(key string) (item interface{}, exists bool, err error) { + panic("not implemented") +} + +func (s *SQLiteStore) upsertWorkflow(wf *wfv1.Workflow) error { + err := sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) + if err != nil { + return err + } + // if workflow is archived, we don't need to store it in the sqlite store, we get if from the archive store instead + if wf.GetLabels()[common.LabelKeyWorkflowArchivingStatus] == "Archived" { + return nil + } + workflow, err := json.Marshal(wf) + if err != nil { + return err + } + err = sqlitex.Execute(s.conn, insertWorkflowQuery, + &sqlitex.ExecOptions{ + Args: []any{string(wf.UID), s.instanceService.InstanceID(), wf.Name, wf.Namespace, wf.Status.Phase, wf.Status.StartedAt.Time, wf.Status.FinishedAt.Time, string(workflow)}, + }, + ) + if err != nil { + return err + } + stmt, err := s.conn.Prepare(insertWorkflowLabelQuery) + if err != nil { + return err + } + for key, value := range wf.GetLabels() { + if err = stmt.Reset(); err != nil { + return err + } + stmt.BindText(1, string(wf.UID)) + stmt.BindText(2, key) + stmt.BindText(3, value) + if _, err = stmt.Step(); err != nil { + return err + } + } + return nil +} + +func (s *SQLiteStore) replaceWorkflows(workflows []*wfv1.Workflow) error { + err := sqlitex.Execute(s.conn, `delete from argo_workflows`, nil) + if err != nil { + return err + } + wfs := make([]*wfv1.Workflow, 0, len(workflows)) + for _, wf := range workflows { + // if workflow is archived, we don't need to store it in the sqlite store, we get if from the archive store instead + if wf.GetLabels()[common.LabelKeyWorkflowArchivingStatus] != "Archived" { + wfs = append(wfs, wf) + } + } + // add all workflows to argo_workflows table + stmt, err := s.conn.Prepare(insertWorkflowQuery) + if err != nil { + return err + } + for _, wf := range wfs { + if err = stmt.Reset(); err != nil { + return err + } + stmt.BindText(1, string(wf.UID)) + stmt.BindText(2, s.instanceService.InstanceID()) + stmt.BindText(3, wf.Name) + stmt.BindText(4, wf.Namespace) + stmt.BindText(5, string(wf.Status.Phase)) + stmt.BindText(6, wf.Status.StartedAt.String()) + stmt.BindText(7, wf.Status.FinishedAt.String()) + workflow, err := json.Marshal(wf) + if err != nil { + return err + } + stmt.BindText(8, string(workflow)) + if _, err = stmt.Step(); err != nil { + return err + } + } + stmt, err = s.conn.Prepare(insertWorkflowLabelQuery) + if err != nil { + return err + } + for _, wf := range wfs { + for key, val := range wf.GetLabels() { + if err = stmt.Reset(); err != nil { + return err + } + stmt.BindText(1, string(wf.UID)) + stmt.BindText(2, key) + stmt.BindText(3, val) + if _, err = stmt.Step(); err != nil { + return err + } + } + } + return nil +} diff --git a/server/workflow/store/sqlite_store_test.go b/server/workflow/store/sqlite_store_test.go new file mode 100644 index 000000000000..086c755014a7 --- /dev/null +++ b/server/workflow/store/sqlite_store_test.go @@ -0,0 +1,155 @@ +package store + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "zombiezen.com/go/sqlite" + "zombiezen.com/go/sqlite/sqlitex" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/instanceid" +) + +func TestInitDB(t *testing.T) { + conn, err := initDB() + assert.NoError(t, err) + defer conn.Close() + t.Run("TestTablesCreated", func(t *testing.T) { + err = sqlitex.Execute(conn, `select name from sqlite_master where type='table'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + name := stmt.ColumnText(0) + assert.Contains(t, []string{workflowTableName, workflowLabelsTableName}, name) + return nil + }, + }) + require.NoError(t, err) + }) + t.Run("TestForeignKeysEnabled", func(t *testing.T) { + err = sqlitex.Execute(conn, `pragma foreign_keys`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, "1", stmt.ColumnText(0)) + return nil + }, + }) + require.NoError(t, err) + }) + t.Run("TestIndexesCreated", func(t *testing.T) { + var indexes []string + err = sqlitex.Execute(conn, `select name from sqlite_master where type='index'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + name := stmt.ColumnText(0) + indexes = append(indexes, name) + return nil + }, + }) + require.NoError(t, err) + assert.Contains(t, indexes, "idx_instanceid") + assert.Contains(t, indexes, "idx_name_value") + }) + t.Run("TestForeignKeysAdded", func(t *testing.T) { + err = sqlitex.Execute(conn, `pragma foreign_key_list('argo_workflows_labels')`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, "argo_workflows", stmt.ColumnText(2)) + assert.Equal(t, "uid", stmt.ColumnText(3)) + assert.Equal(t, "uid", stmt.ColumnText(4)) + assert.Equal(t, "CASCADE", stmt.ColumnText(6)) + return nil + }, + }) + require.NoError(t, err) + }) +} + +func TestStoreOperation(t *testing.T) { + instanceIdSvc := instanceid.NewService("my-instanceid") + conn, err := initDB() + require.NoError(t, err) + store := SQLiteStore{ + conn: conn, + instanceService: instanceIdSvc, + } + t.Run("TestAddWorkflow", func(t *testing.T) { + for i := 0; i < 10; i++ { + require.NoError(t, store.Add(generateWorkflow(i))) + } + num, err := store.CountWorkflows(context.Background(), "argo", "", metav1.ListOptions{}) + require.NoError(t, err) + assert.Equal(t, int64(10), num) + // Labels are also added + require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows_labels`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, 10*4, stmt.ColumnInt(0)) + return nil + }, + })) + }) + t.Run("TestUpdateWorkflow", func(t *testing.T) { + wf := generateWorkflow(0) + wf.Labels["test-label-2"] = "value-2" + require.NoError(t, store.Update(wf)) + // workflow is updated + require.NoError(t, sqlitex.Execute(conn, `select workflow from argo_workflows where uid = 'uid-0'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + w := stmt.ColumnText(0) + require.NoError(t, json.Unmarshal([]byte(w), &wf)) + assert.Len(t, wf.Labels, 5) + return nil + }, + })) + require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows_labels where name = 'test-label-2' and value = 'value-2'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, 1, stmt.ColumnInt(0)) + return nil + }, + })) + }) + t.Run("TestDeleteWorkflow", func(t *testing.T) { + wf := generateWorkflow(0) + require.NoError(t, store.Delete(wf)) + // workflow is deleted + require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows where uid = 'uid-0'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, 0, stmt.ColumnInt(0)) + return nil + }, + })) + // labels are also deleted + require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows_labels where uid = 'uid-0'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, 0, stmt.ColumnInt(0)) + return nil + }, + })) + }) + t.Run("TestListWorkflows", func(t *testing.T) { + wfList, err := store.ListWorkflows(context.Background(), "argo", "", metav1.ListOptions{Limit: 5}) + require.NoError(t, err) + assert.Len(t, wfList.Items, 5) + }) + t.Run("TestCountWorkflows", func(t *testing.T) { + num, err := store.CountWorkflows(context.Background(), "argo", "", metav1.ListOptions{}) + require.NoError(t, err) + assert.Equal(t, int64(9), num) + }) +} + +func generateWorkflow(uid int) *wfv1.Workflow { + return &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("uid-%d", uid)), + Name: fmt.Sprintf("workflow-%d", uid), + Namespace: "argo", + Labels: map[string]string{ + "workflows.argoproj.io/completed": "true", + "workflows.argoproj.io/phase": "Succeeded", + "workflows.argoproj.io/controller-instanceid": "my-instanceid", + "test-label": fmt.Sprintf("label-%d", uid), + }, + }} +} diff --git a/server/workflow/workflow_server.go b/server/workflow/workflow_server.go index 584278e21c6f..61ce865cb014 100644 --- a/server/workflow/workflow_server.go +++ b/server/workflow/workflow_server.go @@ -7,24 +7,29 @@ import ( "io" "sort" "sync" + "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" "github.com/argoproj/argo-workflows/v3/errors" "github.com/argoproj/argo-workflows/v3/persist/sqldb" workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow" - workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/argoproj/argo-workflows/v3/server/auth" sutils "github.com/argoproj/argo-workflows/v3/server/utils" + "github.com/argoproj/argo-workflows/v3/server/workflow/store" argoutil "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/util/fields" "github.com/argoproj/argo-workflows/v3/util/instanceid" @@ -37,18 +42,50 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/validate" ) +const ( + latestAlias = "@latest" + reSyncDuration = 20 * time.Minute +) + type workflowServer struct { instanceIDService instanceid.Service offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo hydrator hydrator.Interface - wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer + wfArchive sqldb.WorkflowArchive + wfLister store.WorkflowLister + wfReflector *cache.Reflector } -const latestAlias = "@latest" +var _ workflowpkg.WorkflowServiceServer = &workflowServer{} + +// NewWorkflowServer returns a new WorkflowServer +func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfLister store.WorkflowLister, wfStore store.WorkflowStore) *workflowServer { + ws := &workflowServer{ + instanceIDService: instanceIDService, + offloadNodeStatusRepo: offloadNodeStatusRepo, + hydrator: hydrator.New(offloadNodeStatusRepo), + wfArchive: wfArchive, + wfLister: wfLister, + } + if wfStore != nil { + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).List(context.Background(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).Watch(context.Background(), options) + }, + } + wfReflector := cache.NewReflector(lw, &wfv1.Workflow{}, wfStore, reSyncDuration) + ws.wfReflector = wfReflector + } + return ws +} -// NewWorkflowServer returns a new workflowServer -func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer) workflowpkg.WorkflowServiceServer { - return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo), wfArchiveServer} +func (s *workflowServer) Run(stopCh <-chan struct{}) { + if s.wfReflector != nil { + s.wfReflector.Run(stopCh) + } } func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*wfv1.Workflow, error) { @@ -129,65 +166,75 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf return wf, nil } -func mergeWithArchivedWorkflows(liveWfs wfv1.WorkflowList, archivedWfs wfv1.WorkflowList, numWfsToKeep int) *wfv1.WorkflowList { - var mergedWfs []wfv1.Workflow - var uidToWfs = map[types.UID][]wfv1.Workflow{} - for _, item := range liveWfs.Items { - uidToWfs[item.UID] = append(uidToWfs[item.UID], item) - } - for _, item := range archivedWfs.Items { - uidToWfs[item.UID] = append(uidToWfs[item.UID], item) +func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) { + listOption := metav1.ListOptions{} + if req.ListOptions != nil { + listOption = *req.ListOptions } + s.instanceIDService.With(&listOption) - for _, v := range uidToWfs { - // The archived workflow we saved in the database have "Persisted" as the archival status. - // Prioritize 'Archived' over 'Persisted' because 'Archived' means the workflow is in the cluster - if len(v) == 1 { - mergedWfs = append(mergedWfs, v[0]) - } else { - if ok := v[0].Labels[common.LabelKeyWorkflowArchivingStatus] == "Archived"; ok { - mergedWfs = append(mergedWfs, v[0]) - } else { - mergedWfs = append(mergedWfs, v[1]) - } - } + options, err := sutils.BuildListOptions(listOption, req.Namespace, "") + if err != nil { + return nil, err } - mergedWfsList := wfv1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta} - sort.Sort(mergedWfsList.Items) - numWfs := 0 - var finalWfs []wfv1.Workflow - for _, item := range mergedWfsList.Items { - if numWfsToKeep == 0 || numWfs < numWfsToKeep { - finalWfs = append(finalWfs, item) - numWfs += 1 - } + // verify if we have permission to list Workflows + allowed, err := auth.CanI(ctx, "list", workflow.WorkflowPlural, options.Namespace, "") + if err != nil { + return nil, sutils.ToStatusError(err, codes.Internal) } - return &wfv1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta} -} - -func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) { - wfClient := auth.GetWfClient(ctx) - - listOption := &metav1.ListOptions{} - if req.ListOptions != nil { - listOption = req.ListOptions + if !allowed { + return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("Permission denied, you are not allowed to list workflows in namespace \"%s\". Maybe you want to specify a namespace with query parameter `.namespace=%s`?", options.Namespace, options.Namespace)) } - s.instanceIDService.With(listOption) - wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *listOption) + + var wfs wfv1.Workflows + liveWfCount, err := s.wfLister.CountWorkflows(ctx, req.Namespace, "", listOption) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } - archivedWfList, err := s.wfArchiveServer.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{ - ListOptions: listOption, - NamePrefix: "", - Namespace: req.Namespace, - }) + archivedCount, err := s.wfArchive.CountWorkflows(options) if err != nil { - log.Warnf("unable to list archived workflows:%v", err) - } else { - if archivedWfList != nil { - wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList, int(listOption.Limit)) + return nil, sutils.ToStatusError(err, codes.Internal) + } + totalCount := liveWfCount + archivedCount + + // first fetch live workflows + liveWfList := &wfv1.WorkflowList{} + if liveWfCount > 0 && (options.Limit == 0 || options.Offset < int(liveWfCount)) { + liveWfList, err = s.wfLister.ListWorkflows(ctx, req.Namespace, "", listOption) + if err != nil { + return nil, sutils.ToStatusError(err, codes.Internal) + } + wfs = append(wfs, liveWfList.Items...) + } + + // then fetch archived workflows + if options.Limit == 0 || + int64(options.Offset+options.Limit) > liveWfCount { + archivedOffset := options.Offset - int(liveWfCount) + archivedLimit := options.Limit + if archivedOffset < 0 { + archivedOffset = 0 + archivedLimit = options.Limit - len(liveWfList.Items) + } + archivedWfList, err := s.wfArchive.ListWorkflows(options.WithLimit(archivedLimit).WithOffset(archivedOffset)) + if err != nil { + return nil, sutils.ToStatusError(err, codes.Internal) } + wfs = append(wfs, archivedWfList...) + } + meta := metav1.ListMeta{ResourceVersion: liveWfList.ResourceVersion} + if s.wfReflector != nil { + meta.ResourceVersion = s.wfReflector.LastSyncResourceVersion() + } + remainCount := totalCount - int64(options.Offset) - int64(len(wfs)) + if remainCount < 0 { + remainCount = 0 + } + if remainCount > 0 { + meta.Continue = fmt.Sprintf("%v", options.Offset+len(wfs)) + } + if options.ShowRemainingItemCount { + meta.RemainingItemCount = &remainCount } cleaner := fields.NewCleaner(req.Fields) @@ -196,10 +243,10 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } - for i, wf := range wfList.Items { + for i, wf := range wfs { if wf.Status.IsOffloadNodeStatus() { if s.offloadNodeStatusRepo.IsEnabled() { - wfList.Items[i].Status.Nodes = offloadedNodes[sqldb.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}] + wfs[i].Status.Nodes = offloadedNodes[sqldb.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}] } else { log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled) } @@ -208,9 +255,9 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor } // we make no promises about the overall list sorting, we just sort each page - sort.Sort(wfList.Items) + sort.Sort(wfs) - res := &wfv1.WorkflowList{ListMeta: metav1.ListMeta{Continue: wfList.Continue, ResourceVersion: wfList.ResourceVersion}, Items: wfList.Items} + res := &wfv1.WorkflowList{ListMeta: meta, Items: wfs} newRes := &wfv1.WorkflowList{} if ok, err := cleaner.Clean(res, &newRes); err != nil { return nil, sutils.ToStatusError(fmt.Errorf("unable to CleanFields in request: %w", err), codes.Internal) @@ -665,15 +712,15 @@ func (s *workflowServer) getWorkflow(ctx context.Context, wfClient versioned.Int var err error wf, origErr := wfClient.ArgoprojV1alpha1().Workflows(namespace).Get(ctx, name, options) if wf == nil || origErr != nil { - wf, err = s.wfArchiveServer.GetArchivedWorkflow(ctx, &workflowarchivepkg.GetArchivedWorkflowRequest{ - Namespace: namespace, - Name: name, - }) + wf, err = s.wfArchive.GetWorkflow("", namespace, name) if err != nil { log.Errorf("failed to get live workflow: %v; failed to get archived workflow: %v", origErr, err) // We only return the original error to preserve the original status code. return nil, sutils.ToStatusError(origErr, codes.Internal) } + if wf == nil { + return nil, status.Error(codes.NotFound, "not found") + } } return wf, nil } diff --git a/server/workflow/workflow_server_test.go b/server/workflow/workflow_server_test.go index e91e672ba1cb..b00afd269985 100644 --- a/server/workflow/workflow_server_test.go +++ b/server/workflow/workflow_server_test.go @@ -5,13 +5,14 @@ import ( "fmt" "testing" - "time" "github.com/go-jose/go-jose/v3/jwt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + authorizationv1 "k8s.io/api/authorization/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes/fake" @@ -25,7 +26,8 @@ import ( v1alpha "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo-workflows/v3/server/auth" "github.com/argoproj/argo-workflows/v3/server/auth/types" - "github.com/argoproj/argo-workflows/v3/server/workflowarchive" + sutils "github.com/argoproj/argo-workflows/v3/server/utils" + "github.com/argoproj/argo-workflows/v3/server/workflow/store" "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/util/instanceid" "github.com/argoproj/argo-workflows/v3/workflow/common" @@ -136,7 +138,7 @@ const wf2 = ` "namespace": "workflows", "resourceVersion": "52919656", "selfLink": "/apis/argoproj.io/v1alpha1/namespaces/workflows/workflows/hello-world-b6h5m", - "uid": "91066a6c-1ddc-11ea-b443-42010aa80075" + "uid": "91066a6c-1ddc-11ea-b443-42010aa80074" }, "spec": { @@ -199,7 +201,7 @@ const wf3 = ` "namespace": "test", "resourceVersion": "53020772", "selfLink": "/apis/argoproj.io/v1alpha1/namespaces/workflows/workflows/hello-world-9tql2", - "uid": "6522aff1-1e01-11ea-b443-42010aa80075" + "uid": "6522aff1-1e01-11ea-b443-42010aa80074" }, "spec": { @@ -325,7 +327,7 @@ const wf5 = ` "namespace": "workflows", "resourceVersion": "53020772", "selfLink": "/apis/argoproj.io/v1alpha1/namespaces/workflows/workflows/hello-world-9tql2", - "uid": "6522aff1-1e01-11ea-b443-42010aa80075" + "uid": "6522aff1-1e01-11ea-b443-42010aa80073" }, "spec": { @@ -574,7 +576,6 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { v1alpha1.MustUnmarshal(unlabelled, &unlabelledObj) v1alpha1.MustUnmarshal(wf1, &wfObj1) - v1alpha1.MustUnmarshal(wf1, &wfObj1) v1alpha1.MustUnmarshal(wf2, &wfObj2) v1alpha1.MustUnmarshal(wf3, &wfObj3) v1alpha1.MustUnmarshal(wf4, &wfObj4) @@ -590,7 +591,6 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { archivedRepo := &mocks.WorkflowArchive{} - wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo, offloadNodeStatusRepo) archivedRepo.On("GetWorkflow", "", "test", "hello-world-9tql2-test").Return(&v1alpha1.Workflow{ ObjectMeta: metav1.ObjectMeta{Name: "hello-world-9tql2-test", Namespace: "test"}, Spec: v1alpha1.WorkflowSpec{ @@ -604,11 +604,41 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { archivedRepo.On("GetWorkflow", "", "test", "unlabelled").Return(nil, nil) archivedRepo.On("GetWorkflow", "", "workflows", "latest").Return(nil, nil) archivedRepo.On("GetWorkflow", "", "workflows", "hello-world-9tql2-not").Return(nil, nil) - server := NewWorkflowServer(instanceid.NewService("my-instanceid"), offloadNodeStatusRepo, wfaServer) + r, err := labels.ParseToRequirements("workflows.argoproj.io/controller-instanceid=my-instanceid") + if err != nil { + panic(err) + } + archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "workflows", LabelRequirements: r}).Return(int64(2), nil) + archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "workflows", Limit: -2, LabelRequirements: r}).Return(v1alpha1.Workflows{wfObj2, failedWfObj}, nil) + archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "test", LabelRequirements: r}).Return(int64(1), nil) + archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "test", Limit: -1, LabelRequirements: r}).Return(v1alpha1.Workflows{wfObj4}, nil) + kubeClientSet := fake.NewSimpleClientset() + kubeClientSet.PrependReactor("create", "selfsubjectaccessreviews", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + return true, &authorizationv1.SelfSubjectAccessReview{ + Status: authorizationv1.SubjectAccessReviewStatus{Allowed: true}, + }, nil + }) wfClientset := v1alpha.NewSimpleClientset(&unlabelledObj, &wfObj1, &wfObj2, &wfObj3, &wfObj4, &wfObj5, &failedWfObj, &wftmpl, &cronwfObj, &cwfTmpl) wfClientset.PrependReactor("create", "workflows", generateNameReactor) ctx := context.WithValue(context.WithValue(context.WithValue(context.TODO(), auth.WfKey, wfClientset), auth.KubeKey, kubeClientSet), auth.ClaimsKey, &types.Claims{Claims: jwt.Claims{Subject: "my-sub"}}) + listOptions := &metav1.ListOptions{} + instanceIdSvc := instanceid.NewService("my-instanceid") + instanceIdSvc.With(listOptions) + wfStore, err := store.NewSQLiteStore(instanceIdSvc) + if err != nil { + panic(err) + } + if err = wfStore.Add(&wfObj1); err != nil { + panic(err) + } + if err = wfStore.Add(&wfObj3); err != nil { + panic(err) + } + if err = wfStore.Add(&wfObj5); err != nil { + panic(err) + } + server := NewWorkflowServer(instanceIdSvc, offloadNodeStatusRepo, archivedRepo, wfClientset, wfStore, wfStore) return server, ctx } @@ -650,26 +680,6 @@ func (t testWatchWorkflowServer) Send(*workflowpkg.WorkflowWatchEvent) error { panic("implement me") } -func TestMergeWithArchivedWorkflows(t *testing.T) { - timeNow := time.Now() - wf1Live := v1alpha1.Workflow{ - ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)}, - Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Archived"}}} - wf1Archived := v1alpha1.Workflow{ - ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)}, - Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Persisted"}}} - wf2 := v1alpha1.Workflow{ - ObjectMeta: metav1.ObjectMeta{UID: "2", CreationTimestamp: metav1.Time{Time: timeNow.Add(2 * time.Second)}}} - wf3 := v1alpha1.Workflow{ - ObjectMeta: metav1.ObjectMeta{UID: "3", CreationTimestamp: metav1.Time{Time: timeNow.Add(3 * time.Second)}}} - liveWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2}} - archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Archived, wf3, wf2}} - expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Live}} - expectedShortWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2}} - assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 0).Items) - assert.Equal(t, expectedShortWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 2).Items) -} - func TestWatchWorkflows(t *testing.T) { server, ctx := getWorkflowServer() wf := &v1alpha1.Workflow{ diff --git a/server/workflowarchive/archived_workflow_server.go b/server/workflowarchive/archived_workflow_server.go index e1a70e3bed7b..2f6c69369dce 100644 --- a/server/workflowarchive/archived_workflow_server.go +++ b/server/workflowarchive/archived_workflow_server.go @@ -6,9 +6,6 @@ import ( "os" "regexp" "sort" - "strconv" - "strings" - "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" @@ -43,108 +40,42 @@ func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive, offloadNodeStatus } func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *workflowarchivepkg.ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) { - options := req.ListOptions - namePrefix := req.NamePrefix - if options == nil { - options = &metav1.ListOptions{} - } - if options.Continue == "" { - options.Continue = "0" - } - limit := int(options.Limit) - offset, err := strconv.Atoi(options.Continue) - if err != nil { - // no need to use sutils here - return nil, status.Error(codes.InvalidArgument, "listOptions.continue must be int") - } - if offset < 0 { - // no need to use sutils here - return nil, status.Error(codes.InvalidArgument, "listOptions.continue must >= 0") - } - - // namespace is now specified as its own query parameter - // note that for backward compatibility, the field selector 'metadata.namespace' is also supported for now - namespace := req.Namespace // optional - name := "" - minStartedAt := time.Time{} - maxStartedAt := time.Time{} - showRemainingItemCount := false - for _, selector := range strings.Split(options.FieldSelector, ",") { - if len(selector) == 0 { - continue - } - if strings.HasPrefix(selector, "metadata.namespace=") { - // for backward compatibility, the field selector 'metadata.namespace' is supported for now despite the addition - // of the new 'namespace' query parameter, which is what the UI uses - fieldSelectedNamespace := strings.TrimPrefix(selector, "metadata.namespace=") - switch namespace { - case "": - namespace = fieldSelectedNamespace - case fieldSelectedNamespace: - break - default: - return nil, status.Errorf(codes.InvalidArgument, - "'namespace' query param (%q) and fieldselector 'metadata.namespace' (%q) are both specified and contradict each other", namespace, fieldSelectedNamespace) - } - } else if strings.HasPrefix(selector, "metadata.name=") { - name = strings.TrimPrefix(selector, "metadata.name=") - } else if strings.HasPrefix(selector, "spec.startedAt>") { - minStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt>")) - if err != nil { - // startedAt is populated by us, it should therefore be valid. - return nil, sutils.ToStatusError(err, codes.Internal) - } - } else if strings.HasPrefix(selector, "spec.startedAt<") { - maxStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt<")) - if err != nil { - // no need to use sutils here - return nil, sutils.ToStatusError(err, codes.Internal) - } - } else if strings.HasPrefix(selector, "ext.showRemainingItemCount") { - showRemainingItemCount, err = strconv.ParseBool(strings.TrimPrefix(selector, "ext.showRemainingItemCount=")) - if err != nil { - // populated by us, it should therefore be valid. - return nil, sutils.ToStatusError(err, codes.Internal) - } - } else { - return nil, sutils.ToStatusError(fmt.Errorf("unsupported requirement %s", selector), codes.InvalidArgument) - } - } - requirements, err := labels.ParseToRequirements(options.LabelSelector) + options, err := sutils.BuildListOptions(*req.ListOptions, req.Namespace, req.NamePrefix) if err != nil { - return nil, sutils.ToStatusError(err, codes.InvalidArgument) + return nil, err } // verify if we have permission to list Workflows - allowed, err := auth.CanI(ctx, "list", workflow.WorkflowPlural, namespace, "") + allowed, err := auth.CanI(ctx, "list", workflow.WorkflowPlural, options.Namespace, "") if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } if !allowed { - return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("Permission denied, you are not allowed to list workflows in namespace \"%s\". Maybe you want to specify a namespace with query parameter `.namespace=%s`?", namespace, namespace)) + return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("Permission denied, you are not allowed to list workflows in namespace \"%s\". Maybe you want to specify a namespace with query parameter `.namespace=%s`?", options.Namespace, options.Namespace)) } + limit := options.Limit + offset := options.Offset // When the zero value is passed, we should treat this as returning all results // to align ourselves with the behavior of the `List` endpoints in the Kubernetes API loadAll := limit == 0 - limitWithMore := 0 if !loadAll { // Attempt to load 1 more record than we actually need as an easy way to determine whether or not more // records exist than we're currently requesting - limitWithMore = limit + 1 + options.Limit += 1 } - items, err := w.wfArchive.ListWorkflows(namespace, name, namePrefix, minStartedAt, maxStartedAt, requirements, limitWithMore, offset) + items, err := w.wfArchive.ListWorkflows(options) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } meta := metav1.ListMeta{} - if showRemainingItemCount && !loadAll { - total, err := w.wfArchive.CountWorkflows(namespace, name, namePrefix, minStartedAt, maxStartedAt, requirements) + if options.ShowRemainingItemCount && !loadAll { + total, err := w.wfArchive.CountWorkflows(options) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } diff --git a/server/workflowarchive/archived_workflow_server_test.go b/server/workflowarchive/archived_workflow_server_test.go index 50d68d27db7b..7f26bbbb20eb 100644 --- a/server/workflowarchive/archived_workflow_server_test.go +++ b/server/workflowarchive/archived_workflow_server_test.go @@ -13,7 +13,6 @@ import ( authorizationv1 "k8s.io/api/authorization/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" kubefake "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" @@ -25,6 +24,7 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" argofake "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo-workflows/v3/server/auth" + sutils "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/workflow/common" ) @@ -54,18 +54,20 @@ func Test_archivedWorkflowServer(t *testing.T) { }, nil }) // two pages of results for limit 1 - repo.On("ListWorkflows", "", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}, {}}, nil) - repo.On("ListWorkflows", "", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 1).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}, {}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Limit: 2, Offset: 1}).Return(wfv1.Workflows{{}}, nil) minStartAt, _ := time.Parse(time.RFC3339, "2020-01-01T00:00:00Z") maxStartAt, _ := time.Parse(time.RFC3339, "2020-01-02T00:00:00Z") createdTime := metav1.Time{Time: time.Now().UTC()} finishedTime := metav1.Time{Time: createdTime.Add(time.Second * 2)} - repo.On("ListWorkflows", "", "", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", "", "my-name", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", "", "", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", "", "my-name", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", "user-ns", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}, {}}, nil) - repo.On("CountWorkflows", "", "my-name", "my-", minStartAt, maxStartAt, labels.Requirements(nil)).Return(int64(5), nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "", NamePrefix: "", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0, ShowRemainingItemCount: true}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "user-ns", Name: "", NamePrefix: "", MinStartedAt: time.Time{}, MaxStartedAt: time.Time{}, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}, {}}, nil) + repo.On("CountWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(int64(5), nil) + repo.On("CountWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0, ShowRemainingItemCount: true}).Return(int64(5), nil) repo.On("GetWorkflow", "", "", "").Return(nil, nil) repo.On("GetWorkflow", "my-uid", "", "").Return(&wfv1.Workflow{ ObjectMeta: metav1.ObjectMeta{Name: "my-name"}, diff --git a/test/e2e/fixtures/e2e_suite.go b/test/e2e/fixtures/e2e_suite.go index c518f7a51062..350398faf74d 100644 --- a/test/e2e/fixtures/e2e_suite.go +++ b/test/e2e/fixtures/e2e_suite.go @@ -8,6 +8,7 @@ import ( "os" "time" + "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/util/secrets" apierr "k8s.io/apimachinery/pkg/api/errors" @@ -177,7 +178,10 @@ func (s *E2ESuite) DeleteResources() { archive := s.Persistence.workflowArchive parse, err := labels.ParseToRequirements(Label) s.CheckError(err) - workflows, err := archive.ListWorkflows(Namespace, "", "", time.Time{}, time.Time{}, parse, 0, 0) + workflows, err := archive.ListWorkflows(utils.ListOptions{ + Namespace: Namespace, + LabelRequirements: parse, + }) s.CheckError(err) for _, w := range workflows { err := archive.DeleteWorkflow(string(w.UID)) diff --git a/workflow/controller/estimation/estimator_factory.go b/workflow/controller/estimation/estimator_factory.go index 60311eb8f5c3..984e76d98cdc 100644 --- a/workflow/controller/estimation/estimator_factory.go +++ b/workflow/controller/estimation/estimator_factory.go @@ -2,7 +2,6 @@ package estimation import ( "fmt" - "time" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -10,6 +9,7 @@ import ( "github.com/argoproj/argo-workflows/v3/persist/sqldb" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" "github.com/argoproj/argo-workflows/v3/workflow/hydrator" @@ -76,7 +76,13 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) { if err != nil { return defaultEstimator, fmt.Errorf("failed to parse selector to requirements: %v", err) } - workflows, err := f.wfArchive.ListWorkflows(wf.Namespace, "", "", time.Time{}, time.Time{}, requirements, 1, 0) + workflows, err := f.wfArchive.ListWorkflows( + utils.ListOptions{ + Namespace: wf.Namespace, + LabelRequirements: requirements, + Limit: 1, + Offset: 0, + }) if err != nil { return defaultEstimator, fmt.Errorf("failed to list archived workflows: %v", err) } diff --git a/workflow/controller/estimation/estimator_factory_test.go b/workflow/controller/estimation/estimator_factory_test.go index aeef2a7128c3..c4bb0ab06981 100644 --- a/workflow/controller/estimation/estimator_factory_test.go +++ b/workflow/controller/estimation/estimator_factory_test.go @@ -2,7 +2,6 @@ package estimation import ( "testing" - "time" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -10,6 +9,7 @@ import ( sqldbmocks "github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/server/utils" testutil "github.com/argoproj/argo-workflows/v3/test/util" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" @@ -53,7 +53,11 @@ metadata: wfArchive := &sqldbmocks.WorkflowArchive{} r, err := labels.ParseToRequirements("workflows.argoproj.io/phase=Succeeded,workflows.argoproj.io/workflow-template=my-archived-wftmpl") assert.NoError(t, err) - wfArchive.On("ListWorkflows", "my-ns", "", "", time.Time{}, time.Time{}, labels.Requirements(r), 1, 0).Return(wfv1.Workflows{ + wfArchive.On("ListWorkflows", utils.ListOptions{ + Namespace: "my-ns", + LabelRequirements: r, + Limit: 1, + }).Return(wfv1.Workflows{ *testutil.MustUnmarshalWorkflow(` metadata: name: my-archived-wftmpl-baseline`),