diff --git a/.github/workflows/ci-build.yaml b/.github/workflows/ci-build.yaml index 67d3ae6e7d0d..dfb99c9c8111 100644 --- a/.github/workflows/ci-build.yaml +++ b/.github/workflows/ci-build.yaml @@ -45,8 +45,13 @@ jobs: # kubelet is not included because it'd take ages to get it working methinks test: [ "smoke", "test-e2e", "test-cli", "test-e2e-cron" ] containerRuntimeExecutor: [ "docker", "k8sapi", "pns" ] + profile: ["minimal", "mysql"] # ok, so we're only running `smoke` for all CREs, exclude: + - test: smoke + profile: mysql + - test: test-e2e-cron + profile: mysql - test: test-e2e containerRuntimeExecutor: k8sapi - test: test-e2e @@ -91,8 +96,7 @@ jobs: echo " token: xxxxxx" >> ~/.kube/config - name: Start Argo env: - GOPATH: /home/runner/go - PROFILE: mysql + GOPATH: /home/runner/go] run: | echo '127.0.0.1 dex' | sudo tee -a /etc/hosts echo '127.0.0.1 minio' | sudo tee -a /etc/hosts @@ -100,7 +104,7 @@ jobs: echo '127.0.0.1 mysql' | sudo tee -a /etc/hosts mkdir -p /tmp/log/argo-e2e git fetch --tags - KUBECONFIG=~/.kube/config make start PROFILE=$PROFILE E2E_EXECUTOR=${{matrix.containerRuntimeExecutor}} DEV_IMAGE=true STATIC_FILES=false 2>&1 > /tmp/log/argo-e2e/argo.log & + KUBECONFIG=~/.kube/config make -d start PROFILE=${{matrix.profile}} E2E_EXECUTOR=${{matrix.containerRuntimeExecutor}} DEV_IMAGE=true STATIC_FILES=false 2>&1 > /tmp/log/argo-e2e/argo.log & - name: Install gotestsum run: go install gotest.tools/gotestsum - name: Wait for Argo Server to be ready diff --git a/Makefile b/Makefile index 41c38106e482..ba88e965ad1e 100644 --- a/Makefile +++ b/Makefile @@ -67,14 +67,7 @@ K3D := $(shell if [[ "`which kubectl`" != '' ]] && [[ "`kubect LOG_LEVEL := debug UPPERIO_DB_DEBUG := 0 NAMESPACED := true - -ALWAYS_OFFLOAD_NODE_STATUS := false -ifeq ($(PROFILE),mysql) -ALWAYS_OFFLOAD_NODE_STATUS := true -endif -ifeq ($(PROFILE),postgres) ALWAYS_OFFLOAD_NODE_STATUS := true -endif override LDFLAGS += \ -X github.com/argoproj/argo.version=$(VERSION) \ diff --git a/config/config.go b/config/config.go index b99dbec66b10..35e4f13ff12b 100644 --- a/config/config.go +++ b/config/config.go @@ -137,7 +137,7 @@ type PersistConfig struct { NodeStatusOffload bool `json:"nodeStatusOffLoad,omitempty"` // Archive workflows to persistence. Archive bool `json:"archive,omitempty"` - // ArchivelabelSelector holds LabelSelector to determine workflow persistence. + // ArchiveLabelSelector holds LabelSelector to determine workflow persistence. ArchiveLabelSelector *metav1.LabelSelector `json:"archiveLabelSelector,omitempty"` // in days ArchiveTTL TTL `json:"archiveTTL,omitempty"` @@ -145,6 +145,7 @@ type PersistConfig struct { ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` PostgreSQL *PostgreSQLConfig `json:"postgresql,omitempty"` MySQL *MySQLConfig `json:"mysql,omitempty"` + S3 *S3Config `json:"s3,omitempty"` } func (c PersistConfig) GetArchiveLabelSelector() (labels.Selector, error) { @@ -161,12 +162,44 @@ func (c PersistConfig) GetClusterName() string { return "default" } +func (c PersistConfig) GetNodeStatusOffloadConfig() interface{} { + if c.S3 != nil && c.S3.NodeStatusOffloads != nil { + return c.S3.NodeStatusOffloads + } + return c.SQLConfig() +} + +func (c PersistConfig) GetArchiveConfig() interface{} { + sqlConfig := c.SQLConfig() + if sqlConfig != nil { + return sqlConfig + } + if c.S3 != nil && c.S3.Archive != nil { + return c.S3.Archive + } + return nil +} + +func (c PersistConfig) SQLConfig() interface{} { + if c.MySQL != nil { + return c.MySQL + } else if c.PostgreSQL != nil { + return c.PostgreSQL + } + return nil +} + type ConnectionPool struct { MaxIdleConns int `json:"maxIdleConns,omitempty"` MaxOpenConns int `json:"maxOpenConns,omitempty"` ConnMaxLifetime TTL `json:"connMaxLifetime,omitempty"` } +type S3Config struct { + NodeStatusOffloads *S3ArtifactRepository `json:"nodeStatusOffLoad,omitempty"` + Archive *S3ArtifactRepository `json:"archive,omitempty"` +} + type PostgreSQLConfig struct { Host string `json:"host"` Port int `json:"port"` @@ -200,6 +233,10 @@ type S3ArtifactRepository struct { KeyPrefix string `json:"keyPrefix,omitempty"` } +func (r S3ArtifactRepository) Secure() bool { + return r.Insecure == nil || !*r.Insecure +} + // OSSArtifactRepository defines the controller configuration for an OSS artifact repository type OSSArtifactRepository struct { wfv1.OSSBucket `json:",inline"` diff --git a/docs/offloading-large-workflows.md b/docs/offloading-large-workflows.md index d483ff7f46f6..0a70487a3ca2 100644 --- a/docs/offloading-large-workflows.md +++ b/docs/offloading-large-workflows.md @@ -6,13 +6,13 @@ Argo stores workflows as Kubernetes resources (i.e. within EtcD). This creates a limit to their size as resources must be under 1MB. Each resource includes the status of each node, which is stored in the `/status/nodes` field for the resource. This can be over 1MB. If this happens, we try and compress the node status and store it in `/status/compressedNodes`. If the status is still too large, we then try and store it in an SQL database. -To enable this feature, configure a Postgres or MySQL database under `persistence` in [your configuration](workflow-controller-configmap.yaml) and set `nodeStatusOffLoad: true`. +To enable this feature, configure a S3, Postgres or MySQL under `persistence` in [your configuration](workflow-controller-configmap.yaml) and set `nodeStatusOffLoad: true`. ## FAQ #### Why aren't my workflows appearing in the database? -Offloading is expensive and often unneccessary, so we only offload when we need to. Your workflows aren't probably large enough. +Offloading is expensive and often unnecessary, so we only offload when we need to. Your workflows aren't probably large enough. #### Error "Failed to submit workflow: etcdserver: request is too large." diff --git a/docs/workflow-archive.md b/docs/workflow-archive.md index c39da1cc770d..2c5dc80c55c8 100644 --- a/docs/workflow-archive.md +++ b/docs/workflow-archive.md @@ -6,4 +6,4 @@ For many uses, you may wish to keep workflows for a long time. Argo can save completed workflows to an SQL database. -To enable this feature, configure a Postgres or MySQL (>= 5.7.8) database under `persistence` in [your configuration](workflow-controller-configmap.yaml) and set `archive: true`. +To enable this feature, configure a S3, Postgres or MySQL (>= 5.7.8) under `persistence` in [your configuration](workflow-controller-configmap.yaml) and set `archive: true`. diff --git a/docs/workflow-controller-configmap.yaml b/docs/workflow-controller-configmap.yaml index e7ae1b69bdab..ab4060907aca 100644 --- a/docs/workflow-controller-configmap.yaml +++ b/docs/workflow-controller-configmap.yaml @@ -162,7 +162,9 @@ data: nodeStatusOffLoad: false # save completed workloads to the workflow archive archive: false - # the number of days to keep archived workflows (the default is forever) + # The number of days to keep archived workflows (the default is forever). + # Archive TTL is ignored for S3 as it cannot be after the bucket is created. + # Instead, set the retention policy when you create the bucket. archiveTTL: 180d # LabelSelector determines the workflow that matches with the matchlabels or matchrequirements, will be archived. @@ -173,6 +175,27 @@ data: # Optional name of the cluster I'm running in. This must be unique for your cluster. clusterName: default + + # Storage: s3, mysql, or postgres maybe configured. + # + # If mulitple are configured then the following rules are used: + # + # 1. For node status offloading, S3 is most preference, for archiving, least prefered. + # 2. MySQL is prefered to PostgreSQL + # + # S3 is most prefered (and therefore implicitly recommeded) as it can be expected to be faster, + # and relies less on the querying capability of an SQL database. + # + # S3 is least prefered for workflow archiving. While is is easier to set-up that an SQL database, S3 does not + # support (out of the box) indexed backed (i.e. fast) querying capabilities for listing archived workflows. + # List operations will be slow when any significant number of workflows are archived. + + # Unlike MySQL or Postgres, S3 enforces TTLs by setting them on the bucket. + # You MUST configure different buckets for the archive and offloads if you wish to TTL archived workflows. + s3: | + nodeStatusOffload: # ... as artifactRepository.s3 + archive: # ... as artifactRepository.s3 + postgresql: host: localhost port: 5432 diff --git a/go.mod b/go.mod index bdb2803a4073..29d6f9167d65 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/klauspost/compress v1.9.7 // indirect github.com/lib/pq v1.3.0 // indirect github.com/mattn/goreman v0.3.5 + github.com/minio/minio-go/v7 v7.0.2 github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.0.0 diff --git a/hack/crdgen.sh b/hack/crdgen.sh index 1c660d2d5eaa..19af60d63c96 100755 --- a/hack/crdgen.sh +++ b/hack/crdgen.sh @@ -1,5 +1,5 @@ #!/bin/bash -set -eu -o pipefail +set -eux -o pipefail cd "$(dirname "$0")/.." diff --git a/manifests/quick-start-minimal.yaml b/manifests/quick-start-minimal.yaml index 5468fc25df56..bedc4d31c967 100644 --- a/manifests/quick-start-minimal.yaml +++ b/manifests/quick-start-minimal.yaml @@ -490,6 +490,30 @@ data: enabled: true path: /metrics port: 9090 + persistence: | + nodeStatusOffLoad: true + archive: true + s3: + archive: + bucket: my-workflow-archive + endpoint: minio:9000 + insecure: true + accessKeySecret: + name: my-minio-cred + key: accesskey + secretKeySecret: + name: my-minio-cred + key: secretkey + nodeStatusOffload: + bucket: my-node-status-offloads + endpoint: minio:9000 + insecure: true + accessKeySecret: + name: my-minio-cred + key: accesskey + secretKeySecret: + name: my-minio-cred + key: secretkey kind: ConfigMap metadata: name: workflow-controller-configmap @@ -670,6 +694,8 @@ spec: - mkdir - -p - /data/my-bucket + - /data/my-node-status-offloads + - /data/my-workflow-archive livenessProbe: httpGet: path: /minio/health/live diff --git a/manifests/quick-start-mysql.yaml b/manifests/quick-start-mysql.yaml index 4021bb39726d..c10707c1a36a 100644 --- a/manifests/quick-start-mysql.yaml +++ b/manifests/quick-start-mysql.yaml @@ -759,6 +759,8 @@ spec: - mkdir - -p - /data/my-bucket + - /data/my-node-status-offloads + - /data/my-workflow-archive livenessProbe: httpGet: path: /minio/health/live diff --git a/manifests/quick-start-postgres.yaml b/manifests/quick-start-postgres.yaml index 12688edc07bf..38070b40eee9 100644 --- a/manifests/quick-start-postgres.yaml +++ b/manifests/quick-start-postgres.yaml @@ -751,6 +751,8 @@ spec: - mkdir - -p - /data/my-bucket + - /data/my-node-status-offloads + - /data/my-workflow-archive livenessProbe: httpGet: path: /minio/health/live diff --git a/manifests/quick-start/base/minio/minio-pod.yaml b/manifests/quick-start/base/minio/minio-pod.yaml index 707fea3835aa..cfcd37684606 100644 --- a/manifests/quick-start/base/minio/minio-pod.yaml +++ b/manifests/quick-start/base/minio/minio-pod.yaml @@ -19,7 +19,7 @@ spec: lifecycle: postStart: exec: - command: [mkdir, -p, /data/my-bucket] + command: [mkdir, -p, /data/my-bucket, /data/my-node-status-offloads, /data/my-workflow-archive] readinessProbe: httpGet: path: /minio/health/ready diff --git a/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml b/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml index 4a118d21dc98..7aed00887002 100644 --- a/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml +++ b/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml @@ -17,6 +17,30 @@ data: enabled: true path: /metrics port: 9090 + persistence: | + nodeStatusOffLoad: true + archive: true + s3: + archive: + bucket: my-workflow-archive + endpoint: minio:9000 + insecure: true + accessKeySecret: + name: my-minio-cred + key: accesskey + secretKeySecret: + name: my-minio-cred + key: secretkey + nodeStatusOffload: + bucket: my-node-status-offloads + endpoint: minio:9000 + insecure: true + accessKeySecret: + name: my-minio-cred + key: accesskey + secretKeySecret: + name: my-minio-cred + key: secretkey links: | - name: Example Workflow Link scope: workflow diff --git a/persist/sqldb/explosive_offload_node_status_repo.go b/persist/explosive_offload_node_status_repo.go similarity index 98% rename from persist/sqldb/explosive_offload_node_status_repo.go rename to persist/explosive_offload_node_status_repo.go index e0efc271f8f6..062365feae32 100644 --- a/persist/sqldb/explosive_offload_node_status_repo.go +++ b/persist/explosive_offload_node_status_repo.go @@ -1,4 +1,4 @@ -package sqldb +package persist import ( "fmt" diff --git a/persist/factory/factory.go b/persist/factory/factory.go new file mode 100644 index 000000000000..b8a42f2d98cc --- /dev/null +++ b/persist/factory/factory.go @@ -0,0 +1,124 @@ +package factory + +import ( + "context" + "fmt" + "reflect" + "time" + + log "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "upper.io/db.v3/lib/sqlbuilder" + + "github.com/argoproj/argo/config" + "github.com/argoproj/argo/persist" + "github.com/argoproj/argo/persist/s3" + "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/util/env" + "github.com/argoproj/argo/util/instanceid" +) + +var ( + // time to live - at what offloadTTL an offload becomes old + // this environment variable allows you to make Argo Workflows delete offloaded data more or less aggressively, + // useful for testing + offloadTTL = env.LookupEnvDurationOr("OFFLOAD_NODE_STATUS_TTL", 5*time.Minute) + archivedWorkflowGCPeriod = env.LookupEnvDurationOr("ARCHIVED_WORKFLOW_GC_PERIOD", 24*time.Hour) +) + +type Persist struct { + session sqlbuilder.Database + OffloadNodeStatusRepo persist.OffloadNodeStatusRepo + WorkflowArchive persist.WorkflowArchive + stopCh chan struct{} +} + +func New(kubeClient kubernetes.Interface, instanceIDService instanceid.Service, namespace string, c *config.PersistConfig, migrate bool) (*Persist, error) { + out := &Persist{ + OffloadNodeStatusRepo: persist.ExplosiveOffloadNodeStatusRepo, + WorkflowArchive: persist.NullWorkflowArchive, + stopCh: make(chan struct{}), + } + if c == nil { + return out, nil + } + + var session sqlbuilder.Database + var tableName string + + switch c.SQLConfig().(type) { + case *config.MySQLConfig, *config.PostgreSQLConfig: + var err error + session, tableName, err = sqldb.CreateDBSession(kubeClient, namespace, c) + if err != nil { + return nil, err + } + log.Info("Database Session created successfully") + if migrate { + err = sqldb.NewMigrate(session, c.GetClusterName(), tableName).Exec(context.Background()) + if err != nil { + return nil, err + } + } + } + + secretInterface := kubeClient.CoreV1().Secrets(namespace) + + if c.NodeStatusOffload { + switch storage := c.GetNodeStatusOffloadConfig().(type) { + case *config.S3ArtifactRepository: + x, err := s3.NewOffloadNodeStatusRepo(secretInterface, c.GetClusterName(), *storage, migrate, offloadTTL) + if err != nil { + return nil, err + } + out.OffloadNodeStatusRepo = x + case *config.MySQLConfig, *config.PostgreSQLConfig: + x, err := sqldb.NewOffloadNodeStatusRepo(session, c.GetClusterName(), tableName, offloadTTL) + if err != nil { + return nil, err + } + out.OffloadNodeStatusRepo = x + default: + return nil, fmt.Errorf("no status node offload storage configured: %v", reflect.TypeOf(storage)) + } + } + + if c.Archive { + ttl := time.Duration(c.ArchiveTTL) + switch storage := c.GetArchiveConfig().(type) { + case *config.S3ArtifactRepository: + if ttl > 0 { + log.Error("Archive TTL is not supported for S3 - ignoring") + } + x, err := s3.NewWorkflowArchive(secretInterface, c.GetClusterName(), *storage, migrate) + if err != nil { + return nil, err + } + out.WorkflowArchive = x + case *config.MySQLConfig, *config.PostgreSQLConfig: + out.WorkflowArchive = sqldb.NewWorkflowArchive(session, c.GetClusterName(), namespace, instanceIDService, ttl, archivedWorkflowGCPeriod) + default: + return nil, fmt.Errorf("no workflow archive configured: %v", reflect.TypeOf(storage)) + } + go out.WorkflowArchive.Run(out.stopCh) + } + + log.WithFields(log.Fields{ + "nodeOffloadStatusEnabled": out.OffloadNodeStatusRepo.IsEnabled(), + "workflowArchiveEnabled": out.WorkflowArchive.IsEnabled(), + }).Info() + + return out, nil + +} + +func (p *Persist) Close() error { + close(p.stopCh) + if p.session != nil { + err := p.session.Close() + if err != nil { + return err + } + } + return nil +} diff --git a/persist/sqldb/mocks/OffloadNodeStatusRepo.go b/persist/mocks/OffloadNodeStatusRepo.go similarity index 83% rename from persist/sqldb/mocks/OffloadNodeStatusRepo.go rename to persist/mocks/OffloadNodeStatusRepo.go index b8b03cffd306..32c08bd7afe4 100644 --- a/persist/sqldb/mocks/OffloadNodeStatusRepo.go +++ b/persist/mocks/OffloadNodeStatusRepo.go @@ -3,7 +3,7 @@ package mocks import ( - sqldb "github.com/argoproj/argo/persist/sqldb" + persist "github.com/argoproj/argo/persist" mock "github.com/stretchr/testify/mock" v1alpha1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -66,15 +66,15 @@ func (_m *OffloadNodeStatusRepo) IsEnabled() bool { } // List provides a mock function with given fields: namespace -func (_m *OffloadNodeStatusRepo) List(namespace string) (map[sqldb.UUIDVersion]v1alpha1.Nodes, error) { +func (_m *OffloadNodeStatusRepo) List(namespace string) (map[persist.UUIDVersion]v1alpha1.Nodes, error) { ret := _m.Called(namespace) - var r0 map[sqldb.UUIDVersion]v1alpha1.Nodes - if rf, ok := ret.Get(0).(func(string) map[sqldb.UUIDVersion]v1alpha1.Nodes); ok { + var r0 map[persist.UUIDVersion]v1alpha1.Nodes + if rf, ok := ret.Get(0).(func(string) map[persist.UUIDVersion]v1alpha1.Nodes); ok { r0 = rf(namespace) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[sqldb.UUIDVersion]v1alpha1.Nodes) + r0 = ret.Get(0).(map[persist.UUIDVersion]v1alpha1.Nodes) } } @@ -89,15 +89,15 @@ func (_m *OffloadNodeStatusRepo) List(namespace string) (map[sqldb.UUIDVersion]v } // ListOldOffloads provides a mock function with given fields: namespace -func (_m *OffloadNodeStatusRepo) ListOldOffloads(namespace string) ([]sqldb.UUIDVersion, error) { +func (_m *OffloadNodeStatusRepo) ListOldOffloads(namespace string) ([]persist.UUIDVersion, error) { ret := _m.Called(namespace) - var r0 []sqldb.UUIDVersion - if rf, ok := ret.Get(0).(func(string) []sqldb.UUIDVersion); ok { + var r0 []persist.UUIDVersion + if rf, ok := ret.Get(0).(func(string) []persist.UUIDVersion); ok { r0 = rf(namespace) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]sqldb.UUIDVersion) + r0 = ret.Get(0).([]persist.UUIDVersion) } } diff --git a/persist/sqldb/mocks/WorkflowArchive.go b/persist/mocks/WorkflowArchive.go similarity index 89% rename from persist/sqldb/mocks/WorkflowArchive.go rename to persist/mocks/WorkflowArchive.go index 0bbbd572c697..c337d20d06d1 100644 --- a/persist/sqldb/mocks/WorkflowArchive.go +++ b/persist/mocks/WorkflowArchive.go @@ -30,20 +30,6 @@ func (_m *WorkflowArchive) ArchiveWorkflow(wf *v1alpha1.Workflow) error { return r0 } -// DeleteExpiredWorkflows provides a mock function with given fields: ttl -func (_m *WorkflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error { - ret := _m.Called(ttl) - - var r0 error - if rf, ok := ret.Get(0).(func(time.Duration) error); ok { - r0 = rf(ttl) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // DeleteWorkflow provides a mock function with given fields: uid func (_m *WorkflowArchive) DeleteWorkflow(uid string) error { ret := _m.Called(uid) @@ -117,3 +103,8 @@ func (_m *WorkflowArchive) ListWorkflows(namespace string, minStartAt time.Time, return r0, r1 } + +// Run provides a mock function with given fields: stopCh +func (_m *WorkflowArchive) Run(stopCh <-chan struct{}) { + _m.Called(stopCh) +} diff --git a/persist/node_status_version.go b/persist/node_status_version.go new file mode 100644 index 000000000000..c7c8c4713484 --- /dev/null +++ b/persist/node_status_version.go @@ -0,0 +1,19 @@ +package persist + +import ( + "encoding/json" + "fmt" + "hash/fnv" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" +) + +func NodeStatusVersion(s wfv1.Nodes) (string, string, error) { + marshalled, err := json.Marshal(s) + if err != nil { + return "", "", err + } + h := fnv.New32() + _, _ = h.Write(marshalled) + return string(marshalled), fmt.Sprintf("fnv:%v", h.Sum32()), nil +} diff --git a/persist/sqldb/offload_node_status_repo_test.go b/persist/node_status_version_test.go similarity index 79% rename from persist/sqldb/offload_node_status_repo_test.go rename to persist/node_status_version_test.go index f979139a2b05..08ae02777362 100644 --- a/persist/sqldb/offload_node_status_repo_test.go +++ b/persist/node_status_version_test.go @@ -1,4 +1,4 @@ -package sqldb +package persist import ( "testing" @@ -10,14 +10,14 @@ import ( func Test_nodeStatusVersion(t *testing.T) { t.Run("Empty", func(t *testing.T) { - marshalled, version, err := nodeStatusVersion(nil) + marshalled, version, err := NodeStatusVersion(nil) if assert.NoError(t, err) { assert.NotEmpty(t, marshalled) assert.Equal(t, "fnv:784127654", version) } }) t.Run("NonEmpty", func(t *testing.T) { - marshalled, version, err := nodeStatusVersion(wfv1.Nodes{"my-node": wfv1.NodeStatus{}}) + marshalled, version, err := NodeStatusVersion(wfv1.Nodes{"my-node": wfv1.NodeStatus{}}) if assert.NoError(t, err) { assert.NotEmpty(t, marshalled) assert.Equal(t, "fnv:2308444803", version) diff --git a/persist/sqldb/null_workflow_archive.go b/persist/null_workflow_archive.go similarity index 88% rename from persist/sqldb/null_workflow_archive.go rename to persist/null_workflow_archive.go index 0fc604c01de4..363c90fdfccf 100644 --- a/persist/sqldb/null_workflow_archive.go +++ b/persist/null_workflow_archive.go @@ -1,4 +1,4 @@ -package sqldb +package persist import ( "fmt" @@ -34,6 +34,4 @@ func (r *nullWorkflowArchive) DeleteWorkflow(string) error { return fmt.Errorf("deleting archived workflows not supported") } -func (r *nullWorkflowArchive) DeleteExpiredWorkflows(time.Duration) error { - return nil -} +func (r *nullWorkflowArchive) Run(<-chan struct{}) {} diff --git a/persist/offload_node_status_repo.go b/persist/offload_node_status_repo.go new file mode 100644 index 000000000000..1644104673a9 --- /dev/null +++ b/persist/offload_node_status_repo.go @@ -0,0 +1,23 @@ +package persist + +import ( + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" +) + +//go:generate mockery -name OffloadNodeStatusRepo + +// A place to save workflow node status. +// Implementations do not need to be fault tolerant. Expect this to be handled higher up the call stack. +// Implementations must be idempotent. +type OffloadNodeStatusRepo interface { + // Save a node and return its version. + Save(uid, namespace string, nodes wfv1.Nodes) (string, error) + Get(uid, version string) (wfv1.Nodes, error) + List(namespace string) (map[UUIDVersion]wfv1.Nodes, error) + // List any old offloads. + ListOldOffloads(namespace string) ([]UUIDVersion, error) + Delete(uid, version string) error + IsEnabled() bool +} + +const OffloadNodeStatusDisabled = "Workflow has offloaded nodes, but offloading has been disabled" diff --git a/persist/s3/offload_node_status_repo.go b/persist/s3/offload_node_status_repo.go new file mode 100644 index 000000000000..bec793b9bf2e --- /dev/null +++ b/persist/s3/offload_node_status_repo.go @@ -0,0 +1,120 @@ +package s3 + +import ( + "bytes" + "context" + "encoding/json" + "time" + + "github.com/minio/minio-go/v7" + log "github.com/sirupsen/logrus" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + + "github.com/argoproj/argo/config" + "github.com/argoproj/argo/persist" + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" +) + +type offloadNodeStatusRepo struct { + storage + ttl time.Duration +} + +func NewOffloadNodeStatusRepo(secretInterface corev1.SecretInterface, clusterName string, config config.S3ArtifactRepository, migrate bool, ttl time.Duration) (*offloadNodeStatusRepo, error) { + s, err := newStorage(secretInterface, clusterName, config) + if err != nil { + return nil, err + } + if migrate { + log.WithFields(log.Fields{"bucket": s.bucket, "ttl": ttl}).Info("Offloading node status to S3") + } + return &offloadNodeStatusRepo{*s, ttl}, nil +} + +func (r *offloadNodeStatusRepo) Save(uid, namespace string, nodes wfv1.Nodes) (string, error) { + marshalled, version, err := persist.NodeStatusVersion(nodes) + if err != nil { + return "", err + } + _, err = r.client.PutObject( + context.Background(), + r.bucket, + r.objectName(uid, version), + bytes.NewBufferString(marshalled), + int64(len(marshalled)), + minio.PutObjectOptions{ + UserMetadata: map[string]string{ + // must be HTTP header format + "Namespace": namespace, + "Uid": uid, + "Version": version, + }, + }, + ) + if err != nil { + return "", err + } + return version, nil +} + +func (r *offloadNodeStatusRepo) objectName(uid, version string) string { + return r.prefix + "/" + uid + "-" + version + "-node-status.json" +} + +func (r *offloadNodeStatusRepo) Get(uid, version string) (wfv1.Nodes, error) { + object, err := r.client.GetObject( + context.Background(), + r.bucket, + r.objectName(uid, version), + minio.GetObjectOptions{}, + ) + if err != nil { + return nil, err + } + nodes := wfv1.Nodes{} + return nodes, json.NewDecoder(object).Decode(&nodes) +} + +func (r *offloadNodeStatusRepo) List(namespace string) (map[persist.UUIDVersion]wfv1.Nodes, error) { + out := make(map[persist.UUIDVersion]wfv1.Nodes) + for object := range r.client.ListObjects(context.Background(), r.bucket, minio.ListObjectsOptions{WithMetadata: true}) { + userMetadata := object.UserMetadata + if userMetadata["X-Amz-Meta-Namespace"] != namespace { + continue + } + uid := userMetadata["X-Amz-Meta-Uid"] + version := userMetadata["X-Amz-Meta-Version"] + nodes, err := r.Get(uid, version) + if err != nil { + return nil, err + } + out[persist.UUIDVersion{UID: uid, Version: version}] = nodes + } + return out, nil +} + +func (r *offloadNodeStatusRepo) ListOldOffloads(string) ([]persist.UUIDVersion, error) { + var out []persist.UUIDVersion + for object := range r.client.ListObjects(context.Background(), r.bucket, minio.ListObjectsOptions{WithMetadata: true}) { + if object.LastModified.After(time.Now().Add(r.ttl)) { + continue + } + uid := object.UserMetadata["X-Amz-Meta-Uid"] + version := object.UserMetadata["X-Amz-Meta-Version"] + out = append(out, persist.UUIDVersion{UID: uid, Version: version}) + } + return out, nil +} + +func (r *offloadNodeStatusRepo) Delete(uid, version string) error { + return r.client.RemoveObject( + context.Background(), + r.bucket, + r.objectName(uid, version), + minio.RemoveObjectOptions{VersionID: version}, + ) +} + +func (r *offloadNodeStatusRepo) IsEnabled() bool { + return true +} diff --git a/persist/s3/storage.go b/persist/s3/storage.go new file mode 100644 index 000000000000..e3214647bd5e --- /dev/null +++ b/persist/s3/storage.go @@ -0,0 +1,64 @@ +package s3 + +import ( + "os" + + argos3 "github.com/argoproj/pkg/s3" + "github.com/minio/minio-go/v7" + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + + "github.com/argoproj/argo/config" + "github.com/argoproj/argo/workflow/common" +) + +type storage struct { + bucket string + prefix string + client *minio.Client +} + +func newStorage(secretInterface corev1.SecretInterface, clusterName string, config config.S3ArtifactRepository) (*storage, error) { + secret, err := secretInterface.Get(config.AccessKeySecret.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + accessKey := secret.Data[config.AccessKeySecret.Key] + secret, err = secretInterface.Get(config.SecretKeySecret.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + secretKey := secret.Data[config.SecretKeySecret.Key] + opts := argos3.S3ClientOpts{ + Endpoint: config.Endpoint, + Region: config.Region, + Secure: config.Secure(), + AccessKey: string(accessKey), + SecretKey: string(secretKey), + RoleARN: config.RoleARN, + Trace: os.Getenv(common.EnvVarArgoTrace) == "1", + UseSDKCreds: config.UseSDKCreds, + } + credentials, err := argos3.GetCredentials(opts) + if err != nil { + return nil, err + } + minioOpts := &minio.Options{Creds: credentials, Secure: opts.Secure, Region: opts.Region} + client, err := minio.New(opts.Endpoint, minioOpts) + if err != nil { + return nil, err + } + if opts.Trace { + client.TraceOn(log.StandardLogger().Out) + } + return &storage{client: client, bucket: config.Bucket, prefix: clusterName}, nil +} + +func noSuchKeyErr(err error) bool { + switch v := err.(type) { + case minio.ErrorResponse: + return v.Code == "NoSuchKey" + } + return false +} diff --git a/persist/s3/workflow_archive.go b/persist/s3/workflow_archive.go new file mode 100644 index 000000000000..b7e761c9f537 --- /dev/null +++ b/persist/s3/workflow_archive.go @@ -0,0 +1,151 @@ +package s3 + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "sort" + "time" + + "github.com/minio/minio-go/v7" + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + + "github.com/argoproj/argo/config" + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo/workflow/common" +) + +type workflowArchive struct { + storage +} + +func NewWorkflowArchive(secretInterface corev1.SecretInterface, clusterName string, config config.S3ArtifactRepository, migrate bool) (*workflowArchive, error) { + s, err := newStorage(secretInterface, clusterName, config) + if err != nil { + return nil, err + } + if migrate { + log.WithFields(log.Fields{"bucket": s.bucket}).Info("Archiving workflows to S3") + } + return &workflowArchive{*s}, nil +} + +func (a *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { + data, err := json.Marshal(wf) + if err != nil { + return err + } + // should be idempotent replace + // write meta-data can allow us to change key in the future + _, err = a.client.PutObject( + context.Background(), + a.bucket, + a.objectName(string(wf.UID)), + bytes.NewBuffer(data), + int64(len(data)), + minio.PutObjectOptions{ + UserMetadata: map[string]string{ + // meta-data must be HTTP header format + "Namespace": wf.Namespace, + "Name": wf.Name, + "Uid": string(wf.UID), + "Started-At": wf.Status.StartedAt.Format(time.RFC3339), + "Finished-At": wf.Status.FinishedAt.Format(time.RFC3339), + "Labels": labels.Set(wf.Labels).String(), + }, + }, + ) + if err != nil { + return err + } + return nil +} + +func (a *workflowArchive) objectName(uid string) string { + return fmt.Sprintf("%s/%v-workflow.json", a.prefix, uid) +} + +func (a *workflowArchive) ListWorkflows(namespace string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error) { + out := make(wfv1.Workflows, 0) +WorkflowForEach: + for object := range a.client.ListObjects( + context.Background(), + a.bucket, + minio.ListObjectsOptions{WithMetadata: true, Prefix: a.prefix + "/", Recursive: true}, + ) { + userMetadata := object.UserMetadata + if namespace != "" && userMetadata["X-Amz-Meta-Namespace"] != namespace { + continue + } + // we treat missing started at as zero + startedAt, _ := time.Parse(time.RFC3339, userMetadata["X-Amz-Meta-Started-At"]) + if !minStartAt.IsZero() && startedAt.Before(minStartAt) { + continue + } + if !maxStartAt.IsZero() && startedAt.After(maxStartAt) { + continue + } + labels, _ := labels.ConvertSelectorToLabelsMap(userMetadata["X-Amz-Meta-Labels"]) + for _, r := range labelRequirements { + if !r.Matches(labels) { + continue WorkflowForEach + } + } + finishedAt, _ := time.Parse(time.RFC3339, userMetadata["X-Amz-Meta-Finished-At"]) + out = append(out, wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: userMetadata["X-Amz-Meta-Name"], + Namespace: userMetadata["X-Amz-Meta-Namespace"], + UID: types.UID(userMetadata["X-Amz-Meta-Uid"]), + }, + Status: wfv1.WorkflowStatus{ + Phase: wfv1.NodePhase(labels.Get(common.LabelKeyPhase)), + StartedAt: metav1.NewTime(startedAt), + FinishedAt: metav1.NewTime(finishedAt), + }, + }) + } + sort.Sort(out) + if offset > 0 && offset < len(out) { + out = out[offset:] + } + if limit > 0 && len(out) > limit { + out = out[:limit] + } + return out, nil +} + +func (a *workflowArchive) GetWorkflow(uid string) (*wfv1.Workflow, error) { + object, err := a.client.GetObject(context.Background(), a.bucket, a.objectName(uid), minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + wf := &wfv1.Workflow{} + err = json.NewDecoder(object).Decode(&wf) + if noSuchKeyErr(err) { // yes, this can be returned from Decode! + return nil, nil + } + return wf, err +} + +func (a *workflowArchive) DeleteWorkflow(uid string) error { + wf, err := a.GetWorkflow(uid) + if err != nil { + return err + } + if wf == nil { + return nil + } + return a.client.RemoveObject(context.Background(), a.bucket, a.objectName(uid), minio.RemoveObjectOptions{}) +} + +func (a *workflowArchive) Run(<-chan struct{}) {} + +func (a *workflowArchive) IsEnabled() bool { + return true +} diff --git a/persist/sqldb/backfill_nodes.go b/persist/sqldb/backfill_nodes.go index 741c9e796872..bb1122e996cc 100644 --- a/persist/sqldb/backfill_nodes.go +++ b/persist/sqldb/backfill_nodes.go @@ -8,6 +8,7 @@ import ( "upper.io/db.v3" "upper.io/db.v3/lib/sqlbuilder" + "github.com/argoproj/argo/persist" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" ) @@ -39,7 +40,7 @@ func (s backfillNodes) apply(session sqlbuilder.Database) error { if err != nil { return err } - marshalled, version, err := nodeStatusVersion(wf.Status.Nodes) + marshalled, version, err := persist.NodeStatusVersion(wf.Status.Nodes) if err != nil { return err } diff --git a/persist/sqldb/offload_node_status_repo.go b/persist/sqldb/offload_node_status_repo.go index 30b0bcc684d5..3a46cbc6209d 100644 --- a/persist/sqldb/offload_node_status_repo.go +++ b/persist/sqldb/offload_node_status_repo.go @@ -3,8 +3,6 @@ package sqldb import ( "encoding/json" "fmt" - "hash/fnv" - "os" "strings" "time" @@ -12,43 +10,17 @@ import ( "upper.io/db.v3" "upper.io/db.v3/lib/sqlbuilder" + "github.com/argoproj/argo/persist" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" ) -const OffloadNodeStatusDisabled = "Workflow has offloaded nodes, but offloading has been disabled" - -type UUIDVersion struct { - UID string `db:"uid"` - Version string `db:"version"` -} - -type OffloadNodeStatusRepo interface { - Save(uid, namespace string, nodes wfv1.Nodes) (string, error) - Get(uid, version string) (wfv1.Nodes, error) - List(namespace string) (map[UUIDVersion]wfv1.Nodes, error) - ListOldOffloads(namespace string) ([]UUIDVersion, error) - Delete(uid, version string) error - IsEnabled() bool -} - -func NewOffloadNodeStatusRepo(session sqlbuilder.Database, clusterName, tableName string) (OffloadNodeStatusRepo, error) { - // this environment variable allows you to make Argo Workflows delete offloaded data more or less aggressively, - // useful for testing - text, ok := os.LookupEnv("OFFLOAD_NODE_STATUS_TTL") - if !ok { - text = "5m" - } - ttl, err := time.ParseDuration(text) - if err != nil { - return nil, err - } - log.WithField("ttl", ttl).Info("Node status offloading config") +func NewOffloadNodeStatusRepo(session sqlbuilder.Database, clusterName, tableName string, ttl time.Duration) (persist.OffloadNodeStatusRepo, error) { return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl}, nil } type nodesRecord struct { ClusterName string `db:"clustername"` - UUIDVersion + persist.UUIDVersion Namespace string `db:"namespace"` Nodes string `db:"nodes"` } @@ -57,35 +29,23 @@ type nodeOffloadRepo struct { session sqlbuilder.Database clusterName string tableName string - // time to live - at what ttl an offload becomes old - ttl time.Duration + ttl time.Duration } func (wdc *nodeOffloadRepo) IsEnabled() bool { return true } -func nodeStatusVersion(s wfv1.Nodes) (string, string, error) { - marshalled, err := json.Marshal(s) - if err != nil { - return "", "", err - } - - h := fnv.New32() - _, _ = h.Write(marshalled) - return string(marshalled), fmt.Sprintf("fnv:%v", h.Sum32()), nil -} - func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (string, error) { - marshalled, version, err := nodeStatusVersion(nodes) + marshalled, version, err := persist.NodeStatusVersion(nodes) if err != nil { return "", err } record := &nodesRecord{ ClusterName: wdc.clusterName, - UUIDVersion: UUIDVersion{ + UUIDVersion: persist.UUIDVersion{ UID: uid, Version: version, }, @@ -160,7 +120,7 @@ func (wdc *nodeOffloadRepo) Get(uid, version string) (wfv1.Nodes, error) { return *nodes, nil } -func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes, error) { +func (wdc *nodeOffloadRepo) List(namespace string) (map[persist.UUIDVersion]wfv1.Nodes, error) { log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing offloaded nodes") var records []nodesRecord err := wdc.session. @@ -173,22 +133,22 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes, return nil, err } - res := make(map[UUIDVersion]wfv1.Nodes) + res := make(map[persist.UUIDVersion]wfv1.Nodes) for _, r := range records { nodes := &wfv1.Nodes{} err = json.Unmarshal([]byte(r.Nodes), nodes) if err != nil { return nil, err } - res[UUIDVersion{UID: r.UID, Version: r.Version}] = *nodes + res[persist.UUIDVersion{UID: r.UID, Version: r.Version}] = *nodes } return res, nil } -func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, error) { +func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]persist.UUIDVersion, error) { log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing old offloaded nodes") - var records []UUIDVersion + var records []persist.UUIDVersion err := wdc.session. Select("uid", "version"). From(wdc.tableName). diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index 3cbfbec8b326..0905cfa0e975 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -13,6 +13,7 @@ import ( "upper.io/db.v3" "upper.io/db.v3/lib/sqlbuilder" + "github.com/argoproj/argo/persist" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/util/instanceid" ) @@ -44,24 +45,14 @@ type archivedWorkflowLabelRecord struct { Value string `db:"value"` } -//go:generate mockery -name WorkflowArchive - -type WorkflowArchive interface { - ArchiveWorkflow(wf *wfv1.Workflow) error - // list workflows, with the most recently started workflows at the beginning (i.e. index 0 is the most recent) - ListWorkflows(namespace string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error) - GetWorkflow(uid string) (*wfv1.Workflow, error) - DeleteWorkflow(uid string) error - DeleteExpiredWorkflows(ttl time.Duration) error - IsEnabled() bool -} - type workflowArchive struct { - session sqlbuilder.Database - clusterName string - managedNamespace string - instanceIDService instanceid.Service - dbType dbType + session sqlbuilder.Database + clusterName string + managedNamespace string + instanceIDService instanceid.Service + dbType dbType + ttl time.Duration + archivedWorkflowGCPeriod time.Duration } func (r *workflowArchive) IsEnabled() bool { @@ -69,8 +60,8 @@ func (r *workflowArchive) IsEnabled() bool { } // NewWorkflowArchive returns a new workflowArchive -func NewWorkflowArchive(session sqlbuilder.Database, clusterName, managedNamespace string, instanceIDService instanceid.Service) WorkflowArchive { - return &workflowArchive{session: session, clusterName: clusterName, managedNamespace: managedNamespace, instanceIDService: instanceIDService, dbType: dbTypeFor(session)} +func NewWorkflowArchive(session sqlbuilder.Database, clusterName, managedNamespace string, instanceIDService instanceid.Service, ttl, archivedWorkflowGCPeriod time.Duration) persist.WorkflowArchive { + return &workflowArchive{session: session, clusterName: clusterName, managedNamespace: managedNamespace, instanceIDService: instanceIDService, dbType: dbTypeFor(session), ttl: ttl, archivedWorkflowGCPeriod: archivedWorkflowGCPeriod} } func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { @@ -237,11 +228,11 @@ func (r *workflowArchive) DeleteWorkflow(uid string) error { return nil } -func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error { +func (r *workflowArchive) DeleteExpiredWorkflows() error { rs, err := r.session. DeleteFrom(archiveTableName). Where(r.clusterManagedNamespaceAndInstanceID()). - And(fmt.Sprintf("finishedat < current_timestamp - interval '%d' second", int(ttl.Seconds()))). + And(fmt.Sprintf("finishedat < current_timestamp - interval '%d' second", int(r.ttl.Seconds()))). Exec() if err != nil { return err @@ -253,3 +244,27 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error { log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows") return nil } + +func (r *workflowArchive) Run(stopCh <-chan struct{}) { + periodicity := r.archivedWorkflowGCPeriod + ttl := r.ttl + if ttl == 0 { + log.Info("Archive TTL is zero - so archived workflow GC disabled - you must restart the controller if you enable this") + return + } + log.WithFields(log.Fields{"ttl": ttl, "periodicity": periodicity}).Info("Performing archived workflow GC") + ticker := time.NewTicker(periodicity) + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ticker.C: + log.Info("Performing archived workflow GC") + err := r.DeleteExpiredWorkflows() + if err != nil { + log.WithField("err", err).Error("Failed to delete archived workflows") + } + } + } +} diff --git a/persist/uuid_version.go b/persist/uuid_version.go new file mode 100644 index 000000000000..d8302946a0fe --- /dev/null +++ b/persist/uuid_version.go @@ -0,0 +1,6 @@ +package persist + +type UUIDVersion struct { + UID string `db:"uid"` + Version string `db:"version"` +} diff --git a/persist/workflow_archive.go b/persist/workflow_archive.go new file mode 100644 index 000000000000..60291a600199 --- /dev/null +++ b/persist/workflow_archive.go @@ -0,0 +1,29 @@ +package persist + +import ( + "time" + + "k8s.io/apimachinery/pkg/labels" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" +) + +//go:generate mockery -name WorkflowArchive + +// A place to save workflow node status. +// Implementations do not need to be fault tolerant. Expect this to be handled higher up the call stack. +// Implementations must be idempotent. +type WorkflowArchive interface { + // Archive the workflow. + ArchiveWorkflow(wf *wfv1.Workflow) error + // List workflows. The most recently started workflows at the beginning (i.e. index 0 is the most recent). + ListWorkflows(namespace string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error) + // Will return nil if not found rather than an error. + // Should return only "meta.name", "meta.namespace", "meta.uid", "status.phase", "status.startedAt", "status.finishedAt" + GetWorkflow(uid string) (*wfv1.Workflow, error) + DeleteWorkflow(uid string) error + // Perform any periodic clean-up. E.g. Delete any archived workflows that are older than the TTL. + Run(stopCh <-chan struct{}) + // Whether or not archiving is possible. Non-null implementations should always return true. + IsEnabled() bool +} diff --git a/pkg/apiclient/argo-kube-client.go b/pkg/apiclient/argo-kube-client.go index db17109493b5..daa18d905cc1 100644 --- a/pkg/apiclient/argo-kube-client.go +++ b/pkg/apiclient/argo-kube-client.go @@ -7,7 +7,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" "github.com/argoproj/argo/pkg/apiclient/clusterworkflowtemplate" "github.com/argoproj/argo/pkg/apiclient/cronworkflow" infopkg "github.com/argoproj/argo/pkg/apiclient/info" @@ -24,7 +24,7 @@ import ( "github.com/argoproj/argo/util/instanceid" ) -var argoKubeOffloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo +var argoKubeOffloadNodeStatusRepo = persist.ExplosiveOffloadNodeStatusRepo var NoArgoServerErr = fmt.Errorf("this is impossible if you are not using the Argo Server, see " + help.CLI) type argoKubeClient struct { diff --git a/server/apiserver/argoserver.go b/server/apiserver/argoserver.go index 2f5864d233eb..a948032d5f7b 100644 --- a/server/apiserver/argoserver.go +++ b/server/apiserver/argoserver.go @@ -21,7 +21,8 @@ import ( "github.com/argoproj/argo" "github.com/argoproj/argo/config" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" + "github.com/argoproj/argo/persist/factory" clusterwftemplatepkg "github.com/argoproj/argo/pkg/apiclient/clusterworkflowtemplate" cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow" eventpkg "github.com/argoproj/argo/pkg/apiclient/event" @@ -143,24 +144,14 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st config := v.(*Config) log.WithFields(log.Fields{"version": argo.GetVersion().Version, "instanceID": config.InstanceID}).Info("Starting Argo Server") instanceIDService := instanceid.NewService(config.InstanceID) - var offloadRepo = sqldb.ExplosiveOffloadNodeStatusRepo - var wfArchive = sqldb.NullWorkflowArchive - persistence := config.Persistence - if persistence != nil { - session, tableName, err := sqldb.CreateDBSession(as.kubeClientset, as.namespace, persistence) - if err != nil { - log.Fatal(err) - } - // we always enable node offload, as this is read-only for the Argo Server, i.e. you can turn it off if you - // like and the controller won't offload newly created workflows, but you can still read them - offloadRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) - if err != nil { - log.Fatal(err) - } - // we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can - // disable the archiving - and still read old records - wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, instanceIDService) + + p, err := factory.New(as.kubeClientset, instanceIDService, as.namespace, config.Persistence, false) + if err != nil { + log.Fatal(err) } + offloadRepo := p.OffloadNodeStatusRepo + wfArchive := p.WorkflowArchive + eventRecorderManager := events.NewEventRecorderManager(as.kubeClientset) artifactServer := artifacts.NewArtifactServer(as.gatekeeper, hydrator.New(offloadRepo), wfArchive, instanceIDService) eventServer := event.NewController(instanceIDService, eventRecorderManager, as.eventQueueSize, as.eventWorkerCount) @@ -208,7 +199,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st <-as.stopCh } -func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, eventServer *event.Controller, links []*v1alpha1.Link) *grpc.Server { +func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloadNodeStatusRepo persist.OffloadNodeStatusRepo, wfArchive persist.WorkflowArchive, eventServer *event.Controller, links []*v1alpha1.Link) *grpc.Server { serverLog := log.NewEntry(log.StandardLogger()) sOpts := []grpc.ServerOption{ // Set both the send and receive the bytes limit to be 100MB diff --git a/server/artifacts/artifact_server.go b/server/artifacts/artifact_server.go index 98458dacb05b..815899542c59 100644 --- a/server/artifacts/artifact_server.go +++ b/server/artifacts/artifact_server.go @@ -14,7 +14,7 @@ import ( "google.golang.org/grpc/status" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/server/auth" "github.com/argoproj/argo/util/instanceid" @@ -25,11 +25,11 @@ import ( type ArtifactServer struct { gatekeeper auth.Gatekeeper hydrator hydrator.Interface - wfArchive sqldb.WorkflowArchive + wfArchive persist.WorkflowArchive instanceIDService instanceid.Service } -func NewArtifactServer(authN auth.Gatekeeper, hydrator hydrator.Interface, wfArchive sqldb.WorkflowArchive, instanceIDService instanceid.Service) *ArtifactServer { +func NewArtifactServer(authN auth.Gatekeeper, hydrator hydrator.Interface, wfArchive persist.WorkflowArchive, instanceIDService instanceid.Service) *ArtifactServer { return &ArtifactServer{authN, hydrator, wfArchive, instanceIDService} } diff --git a/server/artifacts/artifact_server_test.go b/server/artifacts/artifact_server_test.go index 3abf8265148d..360ca6d6f916 100644 --- a/server/artifacts/artifact_server_test.go +++ b/server/artifacts/artifact_server_test.go @@ -12,7 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubefake "k8s.io/client-go/kubernetes/fake" - "github.com/argoproj/argo/persist/sqldb/mocks" + "github.com/argoproj/argo/persist/mocks" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" fakewfv1 "github.com/argoproj/argo/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo/server/auth" diff --git a/server/workflow/workflow_server.go b/server/workflow/workflow_server.go index d029b7881b54..9437cc6ea438 100644 --- a/server/workflow/workflow_server.go +++ b/server/workflow/workflow_server.go @@ -12,7 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/argoproj/argo/errors" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow" "github.com/argoproj/argo/pkg/apis/workflow" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -31,14 +31,14 @@ import ( type workflowServer struct { instanceIDService instanceid.Service - offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo + offloadNodeStatusRepo persist.OffloadNodeStatusRepo hydrator hydrator.Interface } const latestAlias = "@latest" // NewWorkflowServer returns a new workflowServer -func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowpkg.WorkflowServiceServer { +func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo persist.OffloadNodeStatusRepo) workflowpkg.WorkflowServiceServer { return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)} } @@ -128,9 +128,9 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor for i, wf := range wfList.Items { if wf.Status.IsOffloadNodeStatus() { if s.offloadNodeStatusRepo.IsEnabled() { - wfList.Items[i].Status.Nodes = offloadedNodes[sqldb.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}] + wfList.Items[i].Status.Nodes = offloadedNodes[persist.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}] } else { - log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled) + log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(persist.OffloadNodeStatusDisabled) } } } diff --git a/server/workflow/workflow_server_test.go b/server/workflow/workflow_server_test.go index 0e0824bdbda9..1c04bc46ed57 100644 --- a/server/workflow/workflow_server_test.go +++ b/server/workflow/workflow_server_test.go @@ -16,8 +16,8 @@ import ( "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" - "github.com/argoproj/argo/persist/sqldb" - "github.com/argoproj/argo/persist/sqldb/mocks" + "github.com/argoproj/argo/persist" + "github.com/argoproj/argo/persist/mocks" workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow" "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/pkg/client/clientset/versioned" @@ -581,7 +581,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { offloadNodeStatusRepo := &mocks.OffloadNodeStatusRepo{} offloadNodeStatusRepo.On("IsEnabled", mock.Anything).Return(true) - offloadNodeStatusRepo.On("List", mock.Anything).Return(map[sqldb.UUIDVersion]v1alpha1.Nodes{}, nil) + offloadNodeStatusRepo.On("List", mock.Anything).Return(map[persist.UUIDVersion]v1alpha1.Nodes{}, nil) server := NewWorkflowServer(instanceid.NewService("my-instanceid"), offloadNodeStatusRepo) kubeClientSet := fake.NewSimpleClientset() wfClientset := v1alpha.NewSimpleClientset(&unlabelledObj, &wfObj1, &wfObj2, &wfObj3, &wfObj4, &wfObj5, &failedWfObj, &wftmpl, &cronwfObj, &cwfTmpl) diff --git a/server/workflowarchive/archived_workflow_server.go b/server/workflowarchive/archived_workflow_server.go index a0494c0b0892..0bb1a5881485 100644 --- a/server/workflowarchive/archived_workflow_server.go +++ b/server/workflowarchive/archived_workflow_server.go @@ -13,7 +13,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" workflowarchivepkg "github.com/argoproj/argo/pkg/apiclient/workflowarchive" "github.com/argoproj/argo/pkg/apis/workflow" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -21,11 +21,11 @@ import ( ) type archivedWorkflowServer struct { - wfArchive sqldb.WorkflowArchive + wfArchive persist.WorkflowArchive } // NewWorkflowArchiveServer returns a new archivedWorkflowServer -func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive) workflowarchivepkg.ArchivedWorkflowServiceServer { +func NewWorkflowArchiveServer(wfArchive persist.WorkflowArchive) workflowarchivepkg.ArchivedWorkflowServiceServer { return &archivedWorkflowServer{wfArchive: wfArchive} } diff --git a/server/workflowarchive/archived_workflow_server_test.go b/server/workflowarchive/archived_workflow_server_test.go index dbdd2257807e..9f423ae0ea4b 100644 --- a/server/workflowarchive/archived_workflow_server_test.go +++ b/server/workflowarchive/archived_workflow_server_test.go @@ -16,7 +16,7 @@ import ( kubefake "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" - "github.com/argoproj/argo/persist/sqldb/mocks" + "github.com/argoproj/argo/persist/mocks" workflowarchivepkg "github.com/argoproj/argo/pkg/apiclient/workflowarchive" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" argofake "github.com/argoproj/argo/pkg/client/clientset/versioned/fake" diff --git a/test/e2e/argo_server_test.go b/test/e2e/argo_server_test.go index a0f5b73a496f..0e2f5f9f2550 100644 --- a/test/e2e/argo_server_test.go +++ b/test/e2e/argo_server_test.go @@ -321,6 +321,7 @@ func (s *ArgoServerSuite) TestUnauthorized() { Expect(). Status(401) } + func (s *ArgoServerSuite) TestCookieAuth() { token := s.bearerToken defer func() { s.bearerToken = token }() @@ -508,7 +509,7 @@ func (s *ArgoServerSuite) TestPermission() { Status(403) }) - if s.Persistence.IsEnabled() { + if s.Persist.WorkflowArchive.IsEnabled() { // Simply wait 10 seconds for the wf to be completed s.Given(). @@ -535,7 +536,7 @@ func (s *ArgoServerSuite) TestPermission() { // we've now deleted the workflow, but it is still in the archive, testing the archive // after deleting the workflow makes sure that we are no dependant of the workflow for authorization - if s.Persistence.IsEnabled() { + if s.Persist.WorkflowArchive.IsEnabled() { // Test list archived WFs with good token s.bearerToken = goodToken s.Run("ListArchivedWFsGoodToken", func() { @@ -1018,9 +1019,7 @@ spec: // make sure we can download an artifact func (s *ArgoServerSuite) TestArtifactServer() { - if !s.Persistence.IsEnabled() { - s.T().SkipNow() - } + s.NeedsArchive() var uid types.UID var name string s.Given(). @@ -1185,9 +1184,7 @@ func (s *ArgoServerSuite) TestWorkflowServiceStream() { } func (s *ArgoServerSuite) TestArchivedWorkflowService() { - if !s.Persistence.IsEnabled() { - s.T().SkipNow() - } + s.NeedsArchive() var uid types.UID s.Given(). Workflow(` diff --git a/test/e2e/cli_test.go b/test/e2e/cli_test.go index ef27338d1391..ac8b33989bee 100644 --- a/test/e2e/cli_test.go +++ b/test/e2e/cli_test.go @@ -92,6 +92,11 @@ func (s *CLISuite) NeedsOffloading() { s.needsServer() } +func (s *CLISuite) NeedsArchive() { + s.E2ESuite.NeedsArchive() + s.needsServer() +} + func (s *CLISuite) TestCompletion() { s.Given().RunCli([]string{"completion", "bash"}, func(t *testing.T, output string, err error) { assert.NoError(t, err) @@ -1356,7 +1361,7 @@ func (s *CLISuite) TestAuthToken() { } func (s *CLISuite) TestArchive() { - s.NeedsOffloading() + s.NeedsArchive() var uid types.UID s.Given(). Workflow("@smoke/basic.yaml"). diff --git a/test/e2e/fixtures/e2e_suite.go b/test/e2e/fixtures/e2e_suite.go index c1c810c8d91c..471f801b87a7 100644 --- a/test/e2e/fixtures/e2e_suite.go +++ b/test/e2e/fixtures/e2e_suite.go @@ -23,9 +23,11 @@ import ( "k8s.io/client-go/rest" "github.com/argoproj/argo/config" + "github.com/argoproj/argo/persist/factory" "github.com/argoproj/argo/pkg/apis/workflow" "github.com/argoproj/argo/pkg/client/clientset/versioned" "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" + "github.com/argoproj/argo/util/instanceid" "github.com/argoproj/argo/util/kubeconfig" "github.com/argoproj/argo/workflow/hydrator" ) @@ -37,7 +39,7 @@ const defaultTimeout = 30 * time.Second type E2ESuite struct { suite.Suite Config *config.Config - Persistence *Persistence + Persist *factory.Persist RestConfig *rest.Config wfClient v1alpha1.WorkflowInterface wfebClient v1alpha1.WorkflowEventBindingInterface @@ -62,13 +64,15 @@ func (s *E2ESuite) SetupSuite() { s.wfebClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowEventBindings(Namespace) s.wfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowTemplates(Namespace) s.cronClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().CronWorkflows(Namespace) - s.Persistence = newPersistence(s.KubeClient, s.Config) - s.hydrator = hydrator.New(s.Persistence.offloadNodeStatusRepo) + s.Persist, err = factory.New(s.KubeClient, instanceid.NewService(""), Namespace, s.Config.Persistence, false) + s.CheckError(err) + s.hydrator = hydrator.New(s.Persist.OffloadNodeStatusRepo) s.cwfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().ClusterWorkflowTemplates() } func (s *E2ESuite) TearDownSuite() { - s.Persistence.Close() + err := s.Persist.Close() + s.CheckError(err) } func (s *E2ESuite) BeforeTest(string, string) { @@ -97,8 +101,8 @@ func (s *E2ESuite) DeleteResources() { } // delete archived workflows from the archive - if s.Persistence.IsEnabled() { - archive := s.Persistence.workflowArchive + if s.Persist.WorkflowArchive.IsEnabled() { + archive := s.Persist.WorkflowArchive parse, err := labels.ParseToRequirements(Label) s.CheckError(err) workflows, err := archive.ListWorkflows(Namespace, time.Time{}, time.Time{}, parse, 0, 0) @@ -128,11 +132,17 @@ func (s *E2ESuite) NeedsCI() { } func (s *E2ESuite) NeedsOffloading() { - if !s.Persistence.IsEnabled() { + if !s.Persist.OffloadNodeStatusRepo.IsEnabled() { s.T().Skip("test needs offloading, but persistence not enabled") } } +func (s *E2ESuite) NeedsArchive() { + if !s.Persist.WorkflowArchive.IsEnabled() { + s.T().Skip("test needs archive, but persistence not enabled") + } +} + func (s *E2ESuite) dynamicFor(r schema.GroupVersionResource) dynamic.ResourceInterface { resourceInterface := dynamic.NewForConfigOrDie(s.RestConfig).Resource(r) if r.Resource == workflow.ClusterWorkflowTemplatePlural { diff --git a/test/e2e/fixtures/persistence.go b/test/e2e/fixtures/persistence.go deleted file mode 100644 index f8f788c71587..000000000000 --- a/test/e2e/fixtures/persistence.go +++ /dev/null @@ -1,54 +0,0 @@ -package fixtures - -import ( - "k8s.io/client-go/kubernetes" - "upper.io/db.v3/lib/sqlbuilder" - - "github.com/argoproj/argo/config" - "github.com/argoproj/argo/persist/sqldb" - "github.com/argoproj/argo/util/instanceid" -) - -type Persistence struct { - session sqlbuilder.Database - offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo - workflowArchive sqldb.WorkflowArchive -} - -func newPersistence(kubeClient kubernetes.Interface, wcConfig *config.Config) *Persistence { - persistence := wcConfig.Persistence - if persistence != nil { - if persistence.PostgreSQL != nil { - persistence.PostgreSQL.Host = "localhost" - } - if persistence.MySQL != nil { - persistence.MySQL.Host = "localhost" - } - session, tableName, err := sqldb.CreateDBSession(kubeClient, Namespace, persistence) - if err != nil { - panic(err) - } - offloadNodeStatusRepo, err := sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) - if err != nil { - panic(err) - } - instanceIDService := instanceid.NewService(wcConfig.InstanceID) - workflowArchive := sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), Namespace, instanceIDService) - return &Persistence{session, offloadNodeStatusRepo, workflowArchive} - } else { - return &Persistence{offloadNodeStatusRepo: sqldb.ExplosiveOffloadNodeStatusRepo, workflowArchive: sqldb.NullWorkflowArchive} - } -} - -func (s *Persistence) IsEnabled() bool { - return s.offloadNodeStatusRepo.IsEnabled() -} - -func (s *Persistence) Close() { - if s.IsEnabled() { - err := s.session.Close() - if err != nil { - panic(err) - } - } -} diff --git a/workflow/controller/config.go b/workflow/controller/config.go index 29a7a168cdc2..a8a296bdac20 100644 --- a/workflow/controller/config.go +++ b/workflow/controller/config.go @@ -1,8 +1,6 @@ package controller import ( - "context" - log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -10,7 +8,8 @@ import ( "github.com/argoproj/argo/config" "github.com/argoproj/argo/errors" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" + "github.com/argoproj/argo/persist/factory" "github.com/argoproj/argo/util/instanceid" "github.com/argoproj/argo/workflow/hydrator" ) @@ -33,47 +32,16 @@ func (wfc *WorkflowController) updateConfig(v interface{}) error { } } wfc.session = nil - wfc.offloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo - wfc.wfArchive = sqldb.NullWorkflowArchive + wfc.offloadNodeStatusRepo = persist.ExplosiveOffloadNodeStatusRepo + wfc.wfArchive = persist.NullWorkflowArchive wfc.archiveLabelSelector = labels.Everything() - persistence := wfc.Config.Persistence - if persistence != nil { - log.Info("Persistence configuration enabled") - session, tableName, err := sqldb.CreateDBSession(wfc.kubeclientset, wfc.namespace, persistence) - if err != nil { - return err - } - log.Info("Persistence Session created successfully") - err = sqldb.NewMigrate(session, persistence.GetClusterName(), tableName).Exec(context.Background()) - if err != nil { - return err - } - - wfc.session = session - if persistence.NodeStatusOffload { - wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) - if err != nil { - return err - } - log.Info("Node status offloading is enabled") - } else { - log.Info("Node status offloading is disabled") - } - if persistence.Archive { - instanceIDService := instanceid.NewService(wfc.Config.InstanceID) - - wfc.archiveLabelSelector, err = persistence.GetArchiveLabelSelector() - if err != nil { - return err - } - wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wfc.managedNamespace, instanceIDService) - log.Info("Workflow archiving is enabled") - } else { - log.Info("Workflow archiving is disabled") - } - } else { - log.Info("Persistence configuration disabled") + p, err := factory.New(wfc.kubeclientset, instanceid.NewService(wfc.Config.InstanceID), wfc.namespace, config.Persistence, true) + if err != nil { + return err } + wfc.session = p + wfc.offloadNodeStatusRepo = p.OffloadNodeStatusRepo + wfc.wfArchive = p.WorkflowArchive wfc.hydrator = hydrator.New(wfc.offloadNodeStatusRepo) wfc.updateEstimatorFactory() return nil diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 4e574778c169..bd92b59c7aa4 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "os" "strconv" "strings" @@ -29,12 +30,11 @@ import ( "k8s.io/client-go/tools/cache" apiwatch "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/workqueue" - "upper.io/db.v3/lib/sqlbuilder" "github.com/argoproj/argo" "github.com/argoproj/argo/config" argoErr "github.com/argoproj/argo/errors" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1" @@ -93,10 +93,10 @@ type WorkflowController struct { gcPods chan string // pods to be deleted depend on GC strategy throttler sync.Throttler workflowKeyLock syncpkg.KeyLock // used to lock workflows for exclusive modification or access - session sqlbuilder.Database - offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo + session io.Closer + offloadNodeStatusRepo persist.OffloadNodeStatusRepo hydrator hydrator.Interface - wfArchive sqldb.WorkflowArchive + wfArchive persist.WorkflowArchive estimatorFactory estimation.EstimatorFactory syncManager *sync.Manager metrics *metrics.Metrics @@ -153,7 +153,7 @@ func (wfc *WorkflowController) newThrottler() sync.Throttler { return sync.NewThrottler(wfc.Config.Parallelism, func(key string) { wfc.wfQueue.Add(key) }) } -// RunTTLController runs the workflow TTL controller +// RunTTLController runs the workflow OffloadTTL controller func (wfc *WorkflowController) runTTLController(ctx context.Context) { ttlCtrl := ttlcontroller.NewController(wfc.wfclientset, wfc.wfInformer) err := ttlCtrl.Run(ctx.Done()) @@ -197,7 +197,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in go wfc.podLabeler(ctx.Done()) go wfc.podGarbageCollector(ctx.Done()) go wfc.workflowGarbageCollector(ctx.Done()) - go wfc.archivedWorkflowGarbageCollector(ctx.Done()) go wfc.runTTLController(ctx) go wfc.runCronController(ctx) @@ -472,46 +471,6 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{}) } } -func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan struct{}) { - value, ok := os.LookupEnv("ARCHIVED_WORKFLOW_GC_PERIOD") - periodicity := 24 * time.Hour - if ok { - var err error - periodicity, err = time.ParseDuration(value) - if err != nil { - log.WithFields(log.Fields{"err": err, "value": value}).Fatal("Failed to parse ARCHIVED_WORKFLOW_GC_PERIOD") - } - } - if wfc.Config.Persistence == nil { - log.Info("Persistence disabled - so archived workflow GC disabled - you must restart the controller if you enable this") - return - } - if !wfc.Config.Persistence.Archive { - log.Info("Archive disabled - so archived workflow GC disabled - you must restart the controller if you enable this") - return - } - ttl := wfc.Config.Persistence.ArchiveTTL - if ttl == config.TTL(0) { - log.Info("Archived workflows TTL zero - so archived workflow GC disabled - you must restart the controller if you enable this") - return - } - log.WithFields(log.Fields{"ttl": ttl, "periodicity": periodicity}).Info("Performing archived workflow GC") - ticker := time.NewTicker(periodicity) - defer ticker.Stop() - for { - select { - case <-stopCh: - return - case <-ticker.C: - log.Info("Performing archived workflow GC") - err := wfc.wfArchive.DeleteExpiredWorkflows(time.Duration(ttl)) - if err != nil { - log.WithField("err", err).Error("Failed to delete archived workflows") - } - } - } -} - func (wfc *WorkflowController) runWorker() { for wfc.processNextItem() { } diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 3667f58c21a9..67a5df324b6a 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -23,7 +23,7 @@ import ( "sigs.k8s.io/yaml" "github.com/argoproj/argo/config" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" fakewfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo/pkg/client/clientset/versioned/scheme" @@ -152,7 +152,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfclientset: wfclientset, completedPods: make(chan string, 16), workflowKeyLock: sync.NewKeyLock(), - wfArchive: sqldb.NullWorkflowArchive, + wfArchive: persist.NullWorkflowArchive, hydrator: hydratorfake.Noop, estimatorFactory: estimation.DummyEstimatorFactory, eventRecorderManager: &testEventRecorderManager{eventRecorder: record.NewFakeRecorder(16)}, @@ -450,13 +450,6 @@ spec: }) } -func TestWorkflowController_archivedWorkflowGarbageCollector(t *testing.T) { - cancel, controller := newController() - defer cancel() - - controller.archivedWorkflowGarbageCollector(make(chan struct{})) -} - const wfWithTmplRef = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow diff --git a/workflow/controller/estimation/estimator_factory.go b/workflow/controller/estimation/estimator_factory.go index 8996401388b2..451b9860cb86 100644 --- a/workflow/controller/estimation/estimator_factory.go +++ b/workflow/controller/estimation/estimator_factory.go @@ -8,7 +8,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/common" "github.com/argoproj/argo/workflow/controller/indexes" @@ -24,12 +24,12 @@ type EstimatorFactory interface { type estimatorFactory struct { wfInformer cache.SharedIndexInformer hydrator hydrator.Interface - wfArchive sqldb.WorkflowArchive + wfArchive persist.WorkflowArchive } var _ EstimatorFactory = &estimatorFactory{} -func NewEstimatorFactory(wfInformer cache.SharedIndexInformer, hydrator hydrator.Interface, wfArchive sqldb.WorkflowArchive) EstimatorFactory { +func NewEstimatorFactory(wfInformer cache.SharedIndexInformer, hydrator hydrator.Interface, wfArchive persist.WorkflowArchive) EstimatorFactory { return &estimatorFactory{wfInformer, hydrator, wfArchive} } diff --git a/workflow/controller/estimation/estimator_factory_test.go b/workflow/controller/estimation/estimator_factory_test.go index 77fe294d261a..1030d826613e 100644 --- a/workflow/controller/estimation/estimator_factory_test.go +++ b/workflow/controller/estimation/estimator_factory_test.go @@ -8,7 +8,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - sqldbmocks "github.com/argoproj/argo/persist/sqldb/mocks" + sqldbmocks "github.com/argoproj/argo/persist/mocks" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" testutil "github.com/argoproj/argo/test/util" "github.com/argoproj/argo/workflow/common" diff --git a/workflow/controller/operator_persist_test.go b/workflow/controller/operator_persist_test.go index 8eead48a8ddf..af26612ec672 100644 --- a/workflow/controller/operator_persist_test.go +++ b/workflow/controller/operator_persist_test.go @@ -9,7 +9,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/argoproj/argo/errors" - "github.com/argoproj/argo/persist/sqldb/mocks" + "github.com/argoproj/argo/persist/mocks" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/hydrator" "github.com/argoproj/argo/workflow/packer" diff --git a/workflow/hydrator/hydrator.go b/workflow/hydrator/hydrator.go index 1d18d63f6081..5d7911efe14b 100644 --- a/workflow/hydrator/hydrator.go +++ b/workflow/hydrator/hydrator.go @@ -7,7 +7,7 @@ import ( log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/wait" - "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/persist" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/packer" ) @@ -23,7 +23,7 @@ type Interface interface { HydrateWithNodes(wf *wfv1.Workflow, nodes wfv1.Nodes) } -func New(offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) Interface { +func New(offloadNodeStatusRepo persist.OffloadNodeStatusRepo) Interface { return &hydrator{offloadNodeStatusRepo} } @@ -34,7 +34,7 @@ func init() { } type hydrator struct { - offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo + offloadNodeStatusRepo persist.OffloadNodeStatusRepo } func (h hydrator) IsHydrated(wf *wfv1.Workflow) bool { diff --git a/workflow/hydrator/hydrator_test.go b/workflow/hydrator/hydrator_test.go index 901aafc972c2..26c39e6fe8c0 100644 --- a/workflow/hydrator/hydrator_test.go +++ b/workflow/hydrator/hydrator_test.go @@ -7,8 +7,8 @@ import ( "github.com/stretchr/testify/mock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/argoproj/argo/persist/sqldb" - sqldbmocks "github.com/argoproj/argo/persist/sqldb/mocks" + "github.com/argoproj/argo/persist" + sqldbmocks "github.com/argoproj/argo/persist/mocks" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/packer" ) @@ -88,7 +88,7 @@ func TestHydrator(t *testing.T) { }) t.Run("OffloadingDisabled", func(t *testing.T) { offloadNodeStatusRepo := &sqldbmocks.OffloadNodeStatusRepo{} - offloadNodeStatusRepo.On("Get", "my-uid", "my-offload-version").Return(nil, sqldb.OffloadNotSupportedError) + offloadNodeStatusRepo.On("Get", "my-uid", "my-offload-version").Return(nil, persist.OffloadNotSupportedError) hydrator := New(offloadNodeStatusRepo) wf := &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{UID: "my-uid"}, Status: wfv1.WorkflowStatus{OffloadNodeStatusVersion: "my-offload-version"},